Overview of ACEIS

The Automatic Complex Event Implementation System (ACEIS) is a middleware for complex event services. It is implemented to fulfill large-scale data analysis requirements in the CityPulse project and is responsible for event service discovery, composition, deployment, execution and adaptation. In the CityPulse project, data streams are annotated as event services, using the Complex Event Service Ontology. Event service discovery and composition refer to finding a data stream or a composite set of data streams to address the user-defined event request. Event service deployment refers to transforming the event service composition results (a.k.a., composition plans) into RDF Stream Processing (RSP) queries and registering these queries to relevant RSP engines, e.g., CQELS and C-SPARQL. Event service execution refers to establishing the input and output connections for the RSP engines to allow them consuming real-time data from lower-level data providers and delivering query results to upper-level data consumers. Event services adaptation ensures the quality constraints over event services are ensured at run-time, by detecting quality changes and make adjustments automatically. The figure below illustrates the architecture of ACEIS.

Architecture of ACEIS

ACEIS can be used in scenarios where on-demand discovery and composition of data streams are needed, and RSP is used for evaluating queries over the data streams e.g.:

ACEIS cannot be used in situations where queries or streams used for the queries are fixed, i.e., no on-demand stream discovery and composition possible/necessary.

Known limitations

ACEIS has the following limitations:

Possible Extensions

ACEIS can be extended to:

User Guide

A sample main class can be found at "org.insight_centre.citypulse.main.MultipleInstanceMain.java", below we provide some basic examples and explanations of the code, in order to demonstrate how it works.


  public static void main(String[] args) throws Exception {
  	//load the property file
  	Properties prop = new Properties();
  	File in = new File("aceis.properties");
	FileInputStream fis = new FileInputStream(in);
	prop.load(fis);
	fis.close();
	
	//load run-time parameters
	HashMap parameters = new HashMap();
	for (String s : args) {
		parameters.put(s.split("=")[0], s.split("=")[1]);
	}
	
	//start server
	MultipleInstanceMain mim = new MultipleInstanceMain(prop, parameters);
	mim.startServer();
	}
  

configurable properties ("aceis.properties"):

run-time parameters:

The startServer() method in the constructor of MultipleInstanceMain has the following steps:


private void startServer() throws Exception {
	// Initialize ACEIS engine(s) using ACEISScheduler
	ACEISScheduler.smode = this.smode;
	ACEISScheduler.initACEISScheduler(cqelsCnt, csparqlCnt, dataset);
	
	// start websocket servers
	Server server = new Server(this.hostIp, this.port, "/", null, MultipleEngineServerEndpoint.class); // main server endpoint
	//Server server2 = new Server(this.hostIp, 8002, "/", null, SubscriberServerEndpoint2.class); // obselete server for mobile travel planner
	Server server3 = new Server(this.hostIp, 8020, "/", null, ConflictResolutionServerEndpoint.class); // conflict resolution server endpoint
	Session session = null;
	try {
		server.start();
		//server2.start();
		server3.start();
		
		// for local tests
		// sendRequest(step);
		
		BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
		System.out.print("Please press a key to stop the server.");
		reader.readLine();
	} catch (Exception e) {
		e.printStackTrace();
		throw new RuntimeException(e);
	} finally {
		logger.info("Stopping Server.");
		session.close();
		server.stop();
		System.exit(0);
		}

	}
  		
  	

At run-time, when a new message is received by the main server endpoint (MultipleEngineServerEndpoint.java), the onMessage method is invoked, which takes the following actions:


@OnMessage
public String onMessage(String message, Session session) throws Exception {
	if (message.equals("quit")) {// when message contains "quit", close the session for this client
		try {
			session.close(new CloseReason(CloseCodes.NORMAL_CLOSURE, "Subscriber ended"));
		} catch (Exception e) {
			logger.error("Error occur while stopping streams.");
			e.printStackTrace();
			session.getBasicRemote().sendText("FAULT: " + e.getMessage());
			logger.info("Sending error msg: " + e.getMessage());
			throw new RuntimeException(e);
		}
	} else if (message.contains("stop")) {// when message contains "stop", deregister the query but keep the session
		try {
			ACEISEngine engine = engineMap.get(session);
			SubscriptionManagerFactory.getSubscriptionManager().deregisterEventRequest(sessionPlanMap.get(session),
					false);
		} catch (Exception e) {
			logger.error("Error occur while stopping streams.");
			e.printStackTrace();
			session.getBasicRemote().sendText("FAULT: Error occur while stopping streams. " + e.getMessage());
			logger.info("Sending error msg: " + e.getMessage());
		}
	} else {// process normal queries
		try {
			// logger.info("Handling single query");
			this.parseMsg(message, session);
			//
			if (!MultipleInstanceMain.getMonitor().isStarted()) {
				new Thread(MultipleInstanceMain.getMonitor()).start();
				logger.info("Monitor thread started.");
			}
		} catch (Exception e) {
			e.printStackTrace();
			session.getBasicRemote().sendText("FAULT: " + e.getMessage());
			logger.info("Sending error msg: " + e.getMessage());
		}
	}
	return null;
}
	

The method parseMsg() is used to analyse the received message


private void parseMsg(String msgStr, Session session) throws Exception {
	// generate an EventRequest from the msgStr in Json format
	EventRequest request = new Gson().fromJson(msgStr, EventRequest.class);
	logger.info("Engine type: " + request.getEngineType());
	
	// choose proper RSP engine instance for this request
	ACEISEngine engine = ACEISScheduler.getBestEngineInstance(request.getEngineType());
	engineMap.put(session, engine);
	
	if (request.isContinuous()) {
		// if the request asks for continuous results, register the query on the RSP engine
		this.registerQueryFromEventRequest(request, session);
	} else { 
		// if it is a "one-time" request, return the snapshot for the relevant sensor data using the data-fetching server.
		logger.info("Parsing data fetching request.");
		List sids = new ArrayList();
		for (EventDeclaration e : request.getEp().getEds())
			sids.add(e.getServiceId());
		DataFederationResult result = new DataFederationResult();
		for (String s : sids) {
			logger.info("sid: " + s);
			EventDeclaration ed = engine.getRepo().getEds().get(s);
			HashMap snapshot = VirtuosoDataManager.getSnapShot(ed);
			for (Entry en : snapshot.entrySet()) {
				result.getResult().put(en.getKey(), new ArrayList());
				result.getResult().get(en.getKey()).add(en.getValue());
			}
		}
		String resultStr = new Gson().toJson(result);
		logger.info("sending response: " + resultStr);
		session.getBasicRemote().sendText(resultStr);
	}
}
		

The event requests are modelled in package org.insight_centre.aceis.eventmodel, which has the following components:

EventRequest: EventPattern: EventDeclaration: Selection:

API

Javadoc for ACEIS can be found at here

Authors and Contributors

ACEIS is developed by Feng Gao (@fenggao86) at 2015. Author page and relevant publications can be found here

Support or Contact

Author contacts: feng.gao@insight-centre.org, frostfel0@gmail.com