Overview of ACEIS
The Automatic Complex Event Implementation System (ACEIS) is a middleware for complex event services. It is implemented to fulfill large-scale data analysis requirements in the CityPulse project and is responsible for event service discovery, composition, deployment, execution and adaptation. In the CityPulse project, data streams are annotated as event services, using the Complex Event Service Ontology. Event service discovery and composition refer to finding a data stream or a composite set of data streams to address the user-defined event request. Event service deployment refers to transforming the event service composition results (a.k.a., composition plans) into RDF Stream Processing (RSP) queries and registering these queries to relevant RSP engines, e.g., CQELS and C-SPARQL. Event service execution refers to establishing the input and output connections for the RSP engines to allow them consuming real-time data from lower-level data providers and delivering query results to upper-level data consumers. Event services adaptation ensures the quality constraints over event services are ensured at run-time, by detecting quality changes and make adjustments automatically. The figure below illustrates the architecture of ACEIS.
ACEIS can be used in scenarios where on-demand discovery and composition of data streams are needed, and RSP is used for evaluating queries over the data streams e.g.:
- Scenario 1: monitoring traffic conditions over different streets and regions in a city for a city administrator.
- Scenario 2: planning the travel routes for a citizen based on his/her functional and non-functional requirements.
ACEIS cannot be used in situations where queries or streams used for the queries are fixed, i.e., no on-demand stream discovery and composition possible/necessary.
Known limitations
ACEIS has the following limitations:
- Event Semantics: The expressiveness of event requests with regard to temporal and logical correlations is limited to AND, OR, SEQUENCE and REPETITIONS, as described in this paper, and the events modeled in ACEIS are instant events, i.e., only one timestamp allowed for each event. Interval-based events are not supported.
- RSP Engine Types: Currently only CQELS and C-SPARQL engines are supported for event service execution. However, a third-party developer can integrate new engines by extending the query transformation module.
- Concurrent Queries: Existing RSP engines are still in their early stages and there is room for performance optimization. Currently, the data federation component can handle approximately 1000 CQELS or 90 C-SPARQL queries in parallel, by applying a load-balancing technique. A larger number of concurrent queries may (depending on the query complexity) result in unstable engine status.
Possible Extensions
ACEIS can be extended to:
- support more expressive event semantics by extending the event models, event reusability definitions and the event service composition algorithms,
- support more RSP engines by extending the query transformation algorithms and
- employ more advanced adaptation algorithms.
User Guide
A sample main class can be found at "org.insight_centre.citypulse.main.MultipleInstanceMain.java", below we provide some basic examples and explanations of the code, in order to demonstrate how it works.
public static void main(String[] args) throws Exception {
//load the property file
Properties prop = new Properties();
File in = new File("aceis.properties");
FileInputStream fis = new FileInputStream(in);
prop.load(fis);
fis.close();
//load run-time parameters
HashMap parameters = new HashMap();
for (String s : args) {
parameters.put(s.split("=")[0], s.split("=")[1]);
}
//start server
MultipleInstanceMain mim = new MultipleInstanceMain(prop, parameters);
mim.startServer();
}
configurable properties ("aceis.properties"):
- hostIp: the IP address of the server hosting data federation server,
- port: the main port used by the data federation server,
- ontology: the folder storing the ontology files (used only if local dataset files are used),
- streams: the folder storing the stream data files for simulation (used only if operating on simulated streams),
- dataset: the location of stream meta-data, could be a local file or an URI for the virtuoso endpoint provided by the resource management component,
- request: the location of sample test queries.
run-time parameters:
- cqelsCnt: number of CQELS engine instances,
- csparqlCnt: number of CSPARQL engine instances,
- smode: load balancing strategy, could be "elastic", "balancedLatency", "balancedQueries", "rotation" or "elastic",
- qCnt: number of concurrent queries (used only in simulation mode),
- step: interval between registering new queries (used only in simulation mode),
- query: path to the query file (used only in simulation mode),
- duration: life-time of the ACEIS server instance, 0 means indefinite.
The startServer()
method in the constructor of MultipleInstanceMain
has the following steps:
private void startServer() throws Exception {
// Initialize ACEIS engine(s) using ACEISScheduler
ACEISScheduler.smode = this.smode;
ACEISScheduler.initACEISScheduler(cqelsCnt, csparqlCnt, dataset);
// start websocket servers
Server server = new Server(this.hostIp, this.port, "/", null, MultipleEngineServerEndpoint.class); // main server endpoint
//Server server2 = new Server(this.hostIp, 8002, "/", null, SubscriberServerEndpoint2.class); // obselete server for mobile travel planner
Server server3 = new Server(this.hostIp, 8020, "/", null, ConflictResolutionServerEndpoint.class); // conflict resolution server endpoint
Session session = null;
try {
server.start();
//server2.start();
server3.start();
// for local tests
// sendRequest(step);
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
System.out.print("Please press a key to stop the server.");
reader.readLine();
} catch (Exception e) {
e.printStackTrace();
throw new RuntimeException(e);
} finally {
logger.info("Stopping Server.");
session.close();
server.stop();
System.exit(0);
}
}
At run-time, when a new message is received by the main server endpoint (MultipleEngineServerEndpoint.java
), the onMessage
method is invoked, which takes the following actions:
@OnMessage
public String onMessage(String message, Session session) throws Exception {
if (message.equals("quit")) {// when message contains "quit", close the session for this client
try {
session.close(new CloseReason(CloseCodes.NORMAL_CLOSURE, "Subscriber ended"));
} catch (Exception e) {
logger.error("Error occur while stopping streams.");
e.printStackTrace();
session.getBasicRemote().sendText("FAULT: " + e.getMessage());
logger.info("Sending error msg: " + e.getMessage());
throw new RuntimeException(e);
}
} else if (message.contains("stop")) {// when message contains "stop", deregister the query but keep the session
try {
ACEISEngine engine = engineMap.get(session);
SubscriptionManagerFactory.getSubscriptionManager().deregisterEventRequest(sessionPlanMap.get(session),
false);
} catch (Exception e) {
logger.error("Error occur while stopping streams.");
e.printStackTrace();
session.getBasicRemote().sendText("FAULT: Error occur while stopping streams. " + e.getMessage());
logger.info("Sending error msg: " + e.getMessage());
}
} else {// process normal queries
try {
// logger.info("Handling single query");
this.parseMsg(message, session);
//
if (!MultipleInstanceMain.getMonitor().isStarted()) {
new Thread(MultipleInstanceMain.getMonitor()).start();
logger.info("Monitor thread started.");
}
} catch (Exception e) {
e.printStackTrace();
session.getBasicRemote().sendText("FAULT: " + e.getMessage());
logger.info("Sending error msg: " + e.getMessage());
}
}
return null;
}
The method parseMsg()
is used to analyse the received message
private void parseMsg(String msgStr, Session session) throws Exception {
// generate an EventRequest from the msgStr in Json format
EventRequest request = new Gson().fromJson(msgStr, EventRequest.class);
logger.info("Engine type: " + request.getEngineType());
// choose proper RSP engine instance for this request
ACEISEngine engine = ACEISScheduler.getBestEngineInstance(request.getEngineType());
engineMap.put(session, engine);
if (request.isContinuous()) {
// if the request asks for continuous results, register the query on the RSP engine
this.registerQueryFromEventRequest(request, session);
} else {
// if it is a "one-time" request, return the snapshot for the relevant sensor data using the data-fetching server.
logger.info("Parsing data fetching request.");
List sids = new ArrayList();
for (EventDeclaration e : request.getEp().getEds())
sids.add(e.getServiceId());
DataFederationResult result = new DataFederationResult();
for (String s : sids) {
logger.info("sid: " + s);
EventDeclaration ed = engine.getRepo().getEds().get(s);
HashMap snapshot = VirtuosoDataManager.getSnapShot(ed);
for (Entry en : snapshot.entrySet()) {
result.getResult().put(en.getKey(), new ArrayList());
result.getResult().get(en.getKey()).add(en.getValue());
}
}
String resultStr = new Gson().toJson(result);
logger.info("sending response: " + resultStr);
session.getBasicRemote().sendText(resultStr);
}
}
The event requests are modelled in package org.insight_centre.aceis.eventmodel
, which has the following components:
- ep: the query pattern
- constraint: the qos constraint vector, can be null
- weight: the qos weight vector, can be null
- continuous: specify if it is a continous query or not, to be set 'true' in this scenario
- engineType: type of RDF processing engine to be used, can be 'CQELS' or 'CSPARQL'
- aggOp: aggregation operator, can be avg, min or max
- eds: list of event declarations, i.e., sensors used in the query
- eos: list of event operators, to be set to 'and' in this scenario
- filters: list of filters, empty in this scenario
- ID: random pattern id
- idCnt: initial size of pattern, to be ignored
- temporalMap: temporal relation between event operators and event declarations (id strings as keys and values, same for provenancemap)
- provenanceMap: provenance relation between event operators and event declarations
- isQuery: indicate if the pattern is a query or not, to be set to 'true' in the request
- timewindow: the time window of the pattern in milliseconds, can be set to a recommended '3000' as in the sample
- trafficDemand: placeholder for network traffic estimation, to be set to '0' in the request
- selections: selected properties from sensors
- distance: placeholder field, ignore
- eventType: domain specific sensor type
- foi: feature of interest, for traffic sensors the format is (startlatitude,startlongituede-endlatitude,endlongitude)
- nodeId: random sensorId
- propertyName: random name/id of the property
- providedBy: the id of the provider for this property
- foi: the foi of the property
- propertyType: the domain specific type of the property
- originalED: the object reference of the provider, i.e., the event declaration/sensor
API
Javadoc for ACEIS can be found at here
Authors and Contributors
ACEIS is developed by Feng Gao (@fenggao86) at 2015. Author page and relevant publications can be found here
Support or Contact
Author contacts: feng.gao@insight-centre.org, frostfel0@gmail.com