Event algebra using Prova reaction groups

Event algebras are a formalism for describing composite events that are groups of other events satisfying specified algebraic constraints. We wish to embed event algebra into the Prova language in the simplest, most natural way, allowing Prova rulebases to detect situations arising from events arriving according to pre-defined temporal patterns.

Prova is well equipped for dealing with reactive behaviour as it has basic constructs for receving messages that can be used as an event delivery mechanism. The rcvMsg and rcvMult primitives receive either single or multiple messages from within the body of the rules. As discussed elsewhere in the Prova User Guide, these primitives are non blocking, and instead store a continuation to the remaining trail of literals, which is resurrected once the matching message arrives. This message receiving capability is very much like that in Erlang, including the ability to use guards on reactions. However, Prova is quite different from Erlang. For example, Prova does not immediately consume the message but allows it to be accepted by many alternative reactions. There is a way to absorb a message by using the Prova (Prolog) CUT after the reaction.

Let us now talk more concretely about the event algebra. We need a way to group more than one reaction using a logical operator, for example, AND, requiring that all events matching the reactions belonging to the group arrive (typically, within the specified period of time). This means we need to designate reactions as belonging to a group and a way to indicate somewhere the operator to be used. But there is more: the result of a composite event detection has to be represented somehow, so that it could be captured in the way the composed events are, as well as allow the resulting composite event to be used in other event groups with possibly different operators, creating a recursive operator expression.

The previous posts described simple examples showing how the event groups are represented and composite events are captured, so I give here a more extended example from the latest Prova code. We start with the code for the server that detects event patterns.

:- eval(server()). 

server() :-
	% Start detection on each new login
	rcvMult(XID,Protocol,From,request,login(User,IP)),
	server_1(XID,User,IP).

server_1(XID,User,IP) :-
	@group(g1)
	rcvMsg(XID,Protocol,From,request,login(User,IP2)) [IP2!=IP].
server_1(XID,User,IP) :-
	@group(g1)
	rcvMsg(XID,Protocol,From,request,logout(User,IP)).
server_1(XID,User,IP) :-
	@and(g1) @timeout(2500) @group(g2)
	rcvMsg(XID,Protocol,From,and,Events),
	println(["AND detected: ",Events," "]).
server_1(XID,User,IP) :-
	@group(g2)
	rcvMsg(XID,Protocol,From,request,update(User,IP)).
server_1(XID,User,IP) :-
	@or(g2) @timeout(1000)
	rcvMsg(XID,Protocol,From,or,Events),
	println(["Pattern detected: ",Events," "]).

Once the initiator event (a user login) is detected by rcvMult in the rule for server(), the Prolog/Prova code for the predicate server_1 creates five reactions that simultaneously wait for subsequent events. For each new initiator event, more reactions will become active. However, as already mentioned, Prova does not block on any active reactions but instead keeps them in memory ready to match when qualifying inbound messages are detected. Now look closer at the annotations on those five reactions. The first two belong to the group g1, indicated by annotation @group(g1). The third reaction is the result (composite) reaction corresponding to the operator AND (@and(g1)) applied to the first two operands. This means that the whole group will terminate when both composed reactions are detected or timeout expires. Positive detection sends the composite event to the third reaction that is also annotated with @group(g2). The fourth reaction is another (primitive) reaction that belongs to group g2. The fifth reaction indicates the operator OR for the group g2. The variables Events in the third and third reactions capture the composite events recorded as the trace of all detected messages resulting in the pattern detection.

Informally, we have something like this: OR(e3,AND(e1,e2)). The third reaction is the result of AND and the fifth reaction is the result of the enclosing OR. The timeout on the OR-operator is smaller than that for the embedded AND-operator intentionally, to test that the whole group is removed by the timeout correctly, while allowing for the same code to be used in other tests using the following inbound events.

client() :- 
	% Send all the test messages from a separate thread
	switch_thread(),

	% Use user-id as conversation-id (XID) for partitioning so that each user is processed sequentially
	sendMsg(user1,async,0,request,login(user1,'10.10.10.10')),
	sendMsg(user3,async,0,request,login(user3,'80.80.80.80')),
	% Wait synchronously, could have waited asynchronously instead
	java.lang.Thread.sleep(500L),
	sendMsg(user2,async,0,request,login(user2,'30.30.30.30')),
	sendMsg(user3,async,0,request,logout(user3,'80.80.80.80')),
	sendMsg(user1,async,0,request,logout(user1,'10.10.10.10')),
	sendMsg(user1,async,0,request,login(user1,'20.20.20.20')),
	java.lang.Thread.sleep(700L),
	% This is ignored due to timeout on @or that propagates to the child @and
	sendMsg(user3,async,0,request,login(user3,'90.90.90.90')),
	sendMsg(user2,async,0,request,login(user2,'40.40.40.40')),
	sendMsg(user2,async,0,request,update(user2,'30.30.30.30')),
	% This is ignored as OR will have fired
	sendMsg(user2,async,0,request,logout(user2,'30.30.30.30')).

switch_thread() :-
	sendMsgSync(XID,task,0,switch,[]),
	rcvMsg(XID,task,From,switch,[]).

When run, the rulebase prints the following:

% AND detected: [[[user1,async,0,request,[logout,user1,10.10.10.10]], [user1,async,0,request,[login,user1,20.20.20.20]]]] 
% Pattern detected: [[[user1,async,0,and,[[[user1,async,0,request,[logout,user1,10.10.10.10]], [user1,async,0,request,[login,user1,20.20.20.20]]]]]]] 
% Pattern detected: [[[user2,async,0,request,[update,user2,30.30.30.30]]]]

Observe how the "update" event results in immediate pattern detection due to OR. Also note that when a full AND is detected, the final composite event includes the actual structural grouping and for the AND sub-event.

To summarize, the event groups are specified using the @group annotations and operators like @and or @or on the reactions corresponding to the composite events. Each composite event may then again be a member of another group and so on, allowing for arbitrary nesting of event groups. Sequences of events are easily captured by reactions following one another, typically with @timeout annotations (see the previous post here: http://www.prova.ws/csp/node/22). There is or will be implemented a variety of extensions to this approach. So far, @not and @count are implemented, the first allowing to terminate the pattern detection upon arrival of a matching terminating event and the second used for specifying that more than one matching event should arrive. More details and examples of this will be given later.