Dynamic event channels, negation and termination in Prova event patterns

This post gives more details on the approach to event patterns now being added to the new Prova version. As explained in the previous article, the main idea is to group event reactions and associate an operator to the group. I use a term event channel to refer to each reaction that is a member of a reaction group. The new version introduces a variety of annotations that fine-tune the semantics of a reaction group. We make a distinction between control channels and ordinary signal channels. The control channels affect the lifecycle of the whole group or of individual channels. The signal channels deliver the actual content events. There is a fairly large number of various annotations to cover so let's begin.

The first interesting annotation is @not. This annotation can be attached to event channels to describe the situation when an arrival of a matching event invalidates this reaction. If the channel is part of an @or group, this reaction simply goes away without triggering the group pattern detection. If the channel is part of an @and group, the whole @and group fails. There is a reverse side of the coin to consider. What happens if the reaction annotated with @not is not detected? Clearly, this question cannot be answered uniequivocally without necessary context. If the whole group has a timeout (@timeout) associated with it, not detecting an event means that it is not detected within the specified timeout. In this case, the @not reaction becomes true and a positive event detection contributes toward the whole group. If the group in question is an @or group, the group succeeds and an event pattern is output. If the group is an @and group, the group succeeds only if events on all other group channels have also been detected.

Now consider the case when the containing group does not have a timeout. In the case of an @or group, if the @not event is not detected but an event is detected on another positive channel, the group pattern succeeds. If the @not event is part of an @and group, the pattern can only succeed if all other positive events are detected and the only remaining undetected channels are either @not channels or control channels, as defined below. This semantics is different from the one used, for example, in Esper for the not primitive. The Esper manual suggests that a non-arrival of a negated event in the AND NOT case before other events arrive result in the positive pattern detection. This is only true in Prova if there is no timeout on the whole pattern. If there is a timeout, the non-arrival of an event negated with @not can only be proved if it does not arrive before the timeout expires.

This is a full example and_not.prova demonstrating @not reactions. The example adds an extra subtlety by including a timeout on one composed event channel. Also note how a fully qualified negated event is included in the detcted pattern (shown in the comment) allowing for easy inspection of events that lead to the pattern detection.

% Demonstrate AND(E1 timeout 250ms,NOT E2) timeout 400ms.
%
% This will print:
%
% Suspicious login user3 50.50.50.50 60.60.60.60
% Pattern detected: [[[user3,async,0,request,[login,user3,60.60.60.60]], [not,[user3,async,0,request,[logout,user3,50.50.50.50]]]]] 

:- eval(server()). 

server() :-
	% Start detection on each new login
	rcvMult(XID,Protocol,From,request,login(User,IP)),
	server_1(XID,User,IP).

server_1(XID,User,IP) :-
	@group(g1) @timeout(250)
	rcvMsg(XID,Protocol,From,request,login(User,IP2)) [IP2!=IP],
	println(["Suspicious login",User,IP,IP2]," ").
server_1(XID,User,IP) :-
	% This reaction succeeds immediately if all other events in the AND group arrive and the overall group has no timeout,
	%    but will wait for the group timeout to expire before releasing result, if the group has a timeout (it has here).
	% However, if the matching event below arrives after other reactions but before the group timeout, the pattern is not detected.
	@group(g1) @not
	rcvMsg(XID,Protocol,From,request,logout(User,IP)),
	println(["Logout",User,IP]," ").
server_1(XID,User,IP) :-
	@and(g1) @timeout(400)
	rcvMsg(XID,Protocol,From,and,Events),
	println(["Pattern detected: ",Events," "]).

:- eval(client()). 

client() :- 
	% Send all the test messages from a separate thread
	switch_thread(),

	% Use user-id as conversation-id (XID) for partitioning so that each user is processed sequentially
	sendMsg(user1,async,0,request,login(user1,'10.10.10.10')),
	sendMsg(user3,async,0,request,login(user3,'50.50.50.50')),
	% Wait synchronously, could have waited asynchronously instead
	java.lang.Thread.sleep(200L),
	sendMsg(user2,async,0,request,login(user2,'30.30.30.30')),
	sendMsg(user3,async,0,request,login(user3,'60.60.60.60')),
	java.lang.Thread.sleep(300L),
	sendMsg(user1,async,0,request,logout(user1,'10.10.10.10')),
	sendMsg(user1,async,0,request,login(user1,'20.20.20.20')),
	sendMsg(user2,async,0,request,login(user2,'40.40.40.40')).

switch_thread() :-
	sendMsgSync(XID,task,0,switch,[]),
	rcvMsg(XID,task,From,switch,[]).

Now consider pattern termination. If a channel is annotated with @stop (without any arguments in brackets) and a matching event arrives before the pattern succeeds, the pattern instance immediately fails (and all temporal data is removed from memory). The @stop annotation works the same way in @or groups and @and groups. It is an example of a control channel that acts decisively to abandon the detection of the current pattern instance. Here is an example or_stop.prova. The update event is a @stop terminator.

% Demonstrate OR(E1,E2,STOP E3). STOP is a group terminator that works differently from NOT.
%
% This will print:
%
% Logout user1 10.10.10.10
% Update user2 30.30.30.30
% Pattern detected: [[[user1,async,0,request,[logout,user1,10.10.10.10]]]] 

:- eval(server()). 

server() :-
	% Start detection on each new login
	rcvMult(XID,Protocol,From,request,login(User,IP)),
	server_1(XID,User,IP).

server_1(XID,User,IP) :-
	@group(g1)
	rcvMsg(XID,Protocol,From,request,login(User,IP2)) [IP2!=IP],
	println(["Suspicious login",User,IP,IP2]," ").
server_1(XID,User,IP) :-
	@group(g1)
	rcvMsg(XID,Protocol,From,request,logout(User,IP)),
	println(["Logout",User,IP]," ").
server_1(XID,User,IP) :-
	@group(g1) @stop
	rcvMsg(XID,Protocol,From,request,update(User,IP)),
	println(["Update",User,IP]," ").
server_1(XID,User,IP) :-
	@or(g1) @timeout(1000)
	rcvMsg(XID,Protocol,From,or,Events),
	println(["Pattern detected: ",Events," "]).

:- eval(client()). 

client() :- 
	% Send all the test messages from a separate thread
	switch_thread(),

	% Use user-id as conversation-id (XID) for partitioning so that each user is processed sequentially
	sendMsg(user1,async,0,request,login(user1,'10.10.10.10')),
	java.lang.Thread.sleep(200L),
	sendMsg(user2,async,0,request,login(user2,'30.30.30.30')),
	java.lang.Thread.sleep(300L),
	sendMsg(user1,async,0,request,logout(user1,'10.10.10.10')),
	sendMsg(user1,async,0,request,login(user1,'20.20.20.20')),
	% This is a terminator
	sendMsg(user2,async,0,request,update(user2,'30.30.30.30')),
	sendMsg(user2,async,0,request,login(user2,'40.40.40.40')).

switch_thread() :-
	sendMsgSync(XID,task,0,switch,[]),
	rcvMsg(XID,task,From,switch,[]).

Now let us talk about dynamic event channels. We start by adding an annotation @id(ID) that associates a (unique within the group) name with an event channel. Once we have a way to refer to a specific channel, we can use events arriving on control channels to affect the named channel(s). There are quite a few natural things we can do. We can permanently stop listed channels by using a @stop(IDLIST) annotation on a control channel. We also can pause and resume detection of events on channels with @pause(IDLIST) and @resume(IDLIST) on control channels. A paused event channel ignores any events that would have matched the paused reaction. Finally, a channel can start in a paused state (denoted by the @paused annotation), in which case, it will take an event arrival on a control @resume(IDLIST) channel to unpause the initially paused channel. Here is an example or_paused.prova demonstrating the described functionality. Obviously, we cannot show all combination here but the ProvaMetadataTest.java included in the Prova code runs an extensive suite of tests demonstrating a large number of situations.

% Demonstrate channels that are initially paused with @paused and then resumed with @resume.
%
% This will print:
%
% Logout user1 10.10.10.10
% Suspicious login user2 30.30.30.30 40.40.40.40
% Update user2 30.30.30.30
% Pattern detected: [[[user1,async,0,request,[logout,user1,10.10.10.10]]]] 
% Suspicious login user2 30.30.30.30 50.50.50.50
% Suspicious login user2 40.40.40.40 50.50.50.50
% Pattern detected: [[[user2,async,0,request,[update,user2,30.30.30.30]], [user2,async,0,request,[login,user2,50.50.50.50]]]] 
%
% Note that the pair of logins from 30.30.30.30 followed by 40.40.40.40 is not the pattern
%    as detection of further logins is initially paused until an 'update' event is detected.

:- eval(server()). 

server() :-
	% Start detection on each new login
	rcvMult(XID,Protocol,From,request,login(User,IP)),
	server_1(XID,User,IP).

server_1(XID,User,IP) :-
	@group(g1) @id(id1) @paused
	rcvMsg(XID,Protocol,From,request,login(User,IP2)) [IP2!=IP],
	println(["Suspicious login",User,IP,IP2]," ").
server_1(XID,User,IP) :-
	@group(g1)
	rcvMsg(XID,Protocol,From,request,logout(User,IP)),
	println(["Logout",User,IP]," ").
server_1(XID,User,IP) :-
	@group(g1) @resume(id1)
	rcvMsg(XID,Protocol,From,request,update(User,IP)),
	println(["Update",User,IP]," ").
server_1(XID,User,IP) :-
	@or(g1) @timeout(1000)
	rcvMsg(XID,Protocol,From,or,Events),
	println(["Pattern detected: ",Events," "]).

:- eval(client()). 

client() :- 
	% Send all the test messages from a separate thread
	switch_thread(),

	% Use user-id as conversation-id (XID) for partitioning so that each user is processed sequentially
	sendMsg(user1,async,0,request,login(user1,'10.10.10.10')),
	java.lang.Thread.sleep(200L),
	sendMsg(user2,async,0,request,login(user2,'30.30.30.30')),
	java.lang.Thread.sleep(300L),
	sendMsg(user1,async,0,request,logout(user1,'10.10.10.10')),
	sendMsg(user1,async,0,request,login(user1,'20.20.20.20')),
	% This is a terminator
	sendMsg(user2,async,0,request,login(user2,'40.40.40.40')),
	sendMsg(user2,async,0,request,update(user2,'30.30.30.30')),
	sendMsg(user2,async,0,request,login(user2,'50.50.50.50')).

switch_thread() :-
	sendMsgSync(XID,task,0,switch,[]),
	rcvMsg(XID,task,From,switch,[]).

We'll wrap this post up with one final feature that is the @count(NUMBER) annotation. This feature sets a required minimum count for event on the channel annotated with this annotation. It can be combined with any other annotation, where semantics are the same as for single events, but the arrival of the event is now detected when the necessary count of events has arrived. If the count is less than the required number, the vent will continue to be accepted and in the case of positive pattern detection will become of the result even collection. The example or_not_count.prova demonstrates.

% Demonstrate OR(NOT 2xE1,E2).
%
% This will print:
%
% Suspicious login user3 50.50.50.50 60.60.60.60
% Logout user1 10.10.10.10
% Suspicious login user2 30.30.30.30 40.40.40.40
% Suspicious login user2 30.30.30.30 40.40.40.40
% Pattern detected: [[[user1,async,0,request,[logout,user1,10.10.10.10]]]] 
% Pattern detected: [[[user3,async,0,request,[login,user3,60.60.60.60]], [not,[user3,async,0,request,[login,user3,<84>]]]]] 
% Pattern detected: [[[not,[user3,async,0,request,[login,user3,<252>]]]]] 
% Pattern detected: [[[not,[user2,async,0,request,[login,user2,<332>]]]]] 
% Pattern detected: [[[not,[user2,async,0,request,[login,user2,<420>]]]]] 
% Pattern detected: [[[not,[user1,async,0,request,[login,user1,<480>]]]]] 

:- eval(server()). 

server() :-
	% Start detection on each new login
	rcvMult(XID,Protocol,From,request,login(User,IP)),
	server_1(XID,User,IP).

server_1(XID,User,IP) :-
	@group(g1) @not @count(2)
	rcvMsg(XID,Protocol,From,request,login(User,IP2)) [IP2!=IP],
	println(["Suspicious login",User,IP,IP2]," ").
server_1(XID,User,IP) :-
	@group(g1)
	rcvMsg(XID,Protocol,From,request,logout(User,IP)),
	println(["Logout",User,IP]," ").
server_1(XID,User,IP) :-
	@or(g1) @timeout(1000)
	rcvMsg(XID,Protocol,From,or,Events),
	println(["Pattern detected: ",Events," "]).

:- eval(client()). 

client() :- 
	% Send all the test messages from a separate thread
	switch_thread(),

	% Use user-id as conversation-id (XID) for partitioning so that each user is processed sequentially
	sendMsg(user1,async,0,request,login(user1,'10.10.10.10')),
	% This login should result at the @not detection at timeout
	sendMsg(user3,async,0,request,login(user3,'50.50.50.50')),
	% Wait synchronously, could have waited asynchronously instead
	java.lang.Thread.sleep(200L),
	sendMsg(user2,async,0,request,login(user2,'30.30.30.30')),
	java.lang.Thread.sleep(300L),
	sendMsg(user1,async,0,request,logout(user1,'10.10.10.10')),
	sendMsg(user3,async,0,request,login(user3,'60.60.60.60')),
	sendMsg(user1,async,0,request,login(user1,'20.20.20.20')),
	sendMsg(user2,async,0,request,login(user2,'40.40.40.40')),
	sendMsg(user2,async,0,request,login(user2,'40.40.40.40')).

switch_thread() :-
	sendMsgSync(XID,task,0,switch,[]),
	rcvMsg(XID,task,From,switch,[]).