[REVISED] Using reaction groups for detecting complex event patterns

This post is updated according to a revised approach to event algebra. More details will be given shortly. The previous article http://prova.ws/csp/node/20 described the use of the @timeout annotation for imposing a time constraint on arrival of messages. This allows sequential event patterns to be defined quite easily. A further extension of this idea is to use annotations to group reactions. This post describes one such annotation that we call an OR-group (defined using the @or operator annotation). Expanding on the example from the previous post, within a given time window, we want to detect a login for the same user with a different IP address OR a logout event matching the original login in user and IP address. We need branching logic to capture that.

The next fragment from the new example rules/reloaded/or_group_2.prova (run from the test ProvaMessagingTest.java) shows the updated test client.

client() :- 
	% Send all the test messages from a separate thread
	switch_thread(),

	% Use user-id as conversation-id (XID) for partitioning so that each user is processed sequentially
	sendMsg(user1,async,0,request,login(user1,'10.10.10.10')),
	java.lang.Thread.sleep(500L),
	sendMsg(user2,async,0,request,login(user2,'30.30.30.30')),
	java.lang.Thread.sleep(700L),
	sendMsg(user1,async,0,request,logout(user1,'10.10.10.10')),
	sendMsg(user1,async,0,request,login(user1,'20.20.20.20')),
	sendMsg(user2,async,0,request,login(user2,'40.40.40.40')).

switch_thread() :-
	sendMsgSync(XID,task,0,switch,[]),
	rcvMsg(XID,task,From,switch,[]).

The switch_thread() "call" is analogous to fork() in UNIX that in the Prova case will continue with the remainder of the client goal on a separate thread taken from the Prova task pool. Another concurrency consideration is that we use user-id as the conversation-id (the first parameter to sendMsg) for the messages injected into the server. These messages are sent on the 'async' protocol, which results in the appropriate reactions to be executed on the Prova async pool, ensuring that all processing for each user is executed strictly sequentially given the order of inbound messages, while allowing for concurrent processing of events for different users (within the limits of the pool size). The difference with the guard.prova example in the previous post is that user1 now logs out before it logs in from another IP. We want to make sure that this 'breaks' the pattern of user logging in from a different IP address within the specified time window.

Here is the server implementation.

% This will print:
%
% Logout user1 10.10.10.10
% Suspicious login user2 30.30.30.30 40.40.40.40
% Pattern detected: [[[user2,async,0,request,[login,user2,40.40.40.40]]]] 
% Pattern detected: [[[user1,async,0,request,[logout,user1,10.10.10.10]]]] 

:- eval(server()). 

server() :-
	% Start detection on each new login
	rcvMult(XID,Protocol,From,request,login(User,IP)),
	server_1(XID,User,IP).

server_1(XID,User,IP) :-
	% Wait for a right follow-up while ignoring anything that does not match
	@group(g1)
	rcvMsg(XID,Protocol,From,request,login(User,IP2)) [IP2!=IP],
	% Once the full match has occurred, the above rcvMsg reaction is removed
	%   as well as all active reactions in the @or group
	println(["Suspicious login",User,IP,IP2]," ").
server_1(XID,User,IP) :-
	% Wait for a right follow-up while ignoring anything that does not match
	@group(g1)
	rcvMsg(XID,Protocol,From,request,logout(User,IP)),
	% Once the full match has occurred, the above rcvMsg reaction is removed
	%   as well as all active reactions in the @or group
	println(["Logout",User,IP]," ").
server_1(XID,User,IP) :-
	@or(g1) @timeout(2000)
	rcvMsg(XID,Protocol,From,or,Events),
	println(["Pattern detected: ",Events," "]).

Reactions to events are specified algebraically, by defining operators on event groups. The implementation supports AND and OR operators, denoted by @and and @or annotations. Furthermore, individual group member reactions can be specified as AND NOT terminating events using the @not annotation. When a goal visits alternative rcvMsg reactions from the same group g1, the reactions annotated with @group define the operands to the event group corresponding to the operator in the @and or @or reaction activated in the same goal. The third server_1 rule contains such group reaction that detects the OR-pattern for the composed events. In the case of an OR-group, a single matching event is posted as the result to the @or reaction. In the case of an AND-group, all events must be detected before the event group detection result is posted to the @and reaction.

The annotation @or can be compared to the Erlang receive primitive that selectively accepts a message matching one of the specified message patterns. The two rules server_1/3 result in two rcvMsg operators being engaged at the same time, one waiting for the next login, the other waiting for a logout message. Membership in the OR-group 'g1' is indicated by @group(g1). Internally, for each group name, the system transparently generates a group-id that is unique for each pattern detection instance in progress (initiated by the first login captured by rcvMult). In contrast to Erlang, Prova does not block on rcvMsg but immediately releases the thread storing the continuation with all current variables and remaining goal literals. Once a message matching one of the 'waiting' rcvMsg arrives, the corresponding continuation is resumed while the whole OR-group is terminated and all temporal data is removed. Furthermore, @timeout on the group reaction terminates the whole group when the timer expires.

One final point to observe is that the event pattern detection organised in this way allows one to easily define "multiple exit points" from the detection. Whether the 'suspicious' pattern is detected or a logout message provides an escape clause, the code prints the relevant message. Clearly, we could easily have posted an appropriate notification or modified a database instead. In the likes of Esper event processing languages, an event pattern detects one outcome. The style described above is more comparable to workflow processing as it easily accommodates a description of an evolving "story". This is exactly why I believe this workflow-based event pattern detection could be useful in situations that involve multiple outcomes, evolving workflows, multiple participants/agents or advanced reasoning capabilities.