Event processing with workflow logic--key for expanding event processing to new application domains

This post discusses some arguments in favour of joining together the workflow logic with event processing. Typically, event processors and, in particular, stream processors detect event patterns, i.e., groups of events related by temporal relationships and meeting constraints on their content. Event detection is usually an all-or-nothing affair, so that a composite event is either detected or not, in which case nothing happens. There are a few critical elements missing here, the ability to deal with multiple outcomes of an event pattern "detection", event synchronisation, and handling of (asynchronous) third-party interactions right within the pattern detection.

On the subject of multiple outcomes. Consider a simple example of an event pattern: every(A)->(B and C) within 10 s. If either of B or C fails to arrive after the initial event A within the specified timeout, the pattern is not detected. In workflow languages, like BPEL, timeout can be associated with a handler attached to the local scope, thereby allowing graceful business continuation. The workflow languages strive for completeness in their reactive behaviour in regards to the evolving situation. The stream processors tend to be fast but detect fairly stable, long-running patterns. If we consider EP to be a tool that assists decision making in businesses, that level of completeness should be enhanced, even if it means sacrificing some of linear latency in favour of improved situation analysis.

Here is a revised Prova example and.prova that illustrates a rule-based approach to the problem.

% Demonstrate a guarded reaction and @and groups. Also demonstrate processing of a timeout outcome from a reaction.
%
% This will print:
%
% Pattern detected: [[[user1,async,0,request,[logout,user1,10.10.10.10]], [user1,async,0,request,[login,user1,20.20.20.20]]]]
% Timeout occurred: [[[user2,async,0,request,[logout,user2,30.30.30.30]]]] 

:- eval(server()). 

server() :-
	% Start detection on each new login
	rcvMult(XID,Protocol,From,request,login(User,IP)),
	server_1(XID,User,IP).

server_1(XID,User,IP) :-
	@group(g1)
	rcvMsg(XID,Protocol,From,request,login(User,IP2)) [IP2!=IP].
server_1(XID,User,IP) :-
	@group(g1)
	rcvMsg(XID,Protocol,From,request,logout(User,IP)).
server_1(XID,User,IP) :-
	@and(g1) @timeout(2000)
	rcvMsg(XID,Protocol,From,and,Events),
	println(["Pattern detected: ",Events," "]).
server_1(XID,User,IP) :-
	@and(g1)
	rcvMsg(XID,Protocol,From,timeout,Events),
	println(["Timeout occurred: ",Events," "]).
	
:- eval(client()). 

client() :- 
	% Send all the test messages from a separate thread
	switch_thread(),

	% Use user-id as conversation-id (XID) for partitioning so that each user is processed sequentially
	sendMsg(user1,async,0,request,login(user1,'10.10.10.10')),
	% Wait synchronously, could have waited asynchronously instead
	java.lang.Thread.sleep(500L),
	sendMsg(user2,async,0,request,login(user2,'30.30.30.30')),
	java.lang.Thread.sleep(700L),
	sendMsg(user1,async,0,request,logout(user1,'10.10.10.10')),
	sendMsg(user1,async,0,request,login(user1,'20.20.20.20')),
	sendMsg(user2,async,0,request,logout(user2,'30.30.30.30')),
	java.lang.Thread.sleep(1500L),
	sendMsg(user2,async,0,request,login(user2,'40.40.40.40')).

The first point to observe here is that if the timeout occurs (as is the case for events corresponding to the activity of user2), a different outcome event results from the pattern detection, i.e., a message with performative (message type) timeout is sent. The message is absorbed in a separate reaction handler. Secondly, whether the pattern is detected or not, the full trace of the events detected while matching the pattern, is recorded and sent as part of either the and message or the timeout message. The trace may then help to diagnose the situation in more detail and define a better reaction to full or partial detection. In the example, the user2 interaction has a logout event received before the timeout occurs, which is copied to the trace of events communicated to the timeout handler. One might argue that the timeout would then need to be detected by a separate pattern. A counter-argument to that is the observation that in the case of more complex patterns, we do not want a proliferation of duplicate branches spread around the event flow code. Instead, handling the situation in concise blocks allowing for multiple outcomes seems to be a better approach that scales better and reads better.

There is more to note about this. Note that whenever an event arrives that is part of the pattern being detected, we have a chance to react directly to that event, for example, adding some action like this:

	rcvMsg(XID,Protocol,From,request,logout(User,IP)),
	println([logout(User,IP)]).

This allows us to define "internal probes" inside the pattern that could be useful for processing the evolving situation or for monitoring or diagnosing purposes.

Let us discuss now the event synchronisation issues. Consider the event pattern that detects the following events: ACB, BAC, BCA, and CBA. Can all those be detected using a regular expression style pattern matcher? Yes, they can, but it will be ugly and will not scale well. In fact, there is a simple "pattern" here that uses event synchronisation. When event A is detected, the pattern temporarily stops detecting events B. Conversely, when event C is received, detection of events B is resumed. The following fragment shows how this is achieved in Prova.

server_1(XID,User,IP) :-
	@group(g1) @id(id1)
	rcvMsg(XID,Protocol,From,request,login(User,IP2)) [IP2!=IP].
server_1(XID,User,IP) :-
	@group(g1)
	rcvMsg(XID,Protocol,From,request,logout(User,IP)).
server_1(XID,User,IP) :-
	@group(g1) @resume(id1)
	rcvMsg(XID,Protocol,From,request,update(User,IP)).
server_1(XID,User,IP) :-
	@group(g1) @pause(id1)
	rcvMsg(XID,Protocol,From,request,query(User,IP)).
server_1(XID,User,IP) :-
	@or(g1) @timeout(1000)
	rcvMsg(XID,Protocol,From,or,Events),
	println(["Pattern detected: ",Events," "]).

All the reactions are part of an OR-group g1. The update and query events are "control events" that regulate the detection of the login events, by pausing or resuming the detection. There is also a @stop annotation available that stops detection permanently, as well as a @paused annotation that starts a reaction in a paused state. These constructs extend all the functionality available in the BPEL link construct used for activating receive reactions based on the arrival of other messages. Again, these constructs add to the expressiveness and conciseness of the language capturing synchronisation relations cleanly and minimally.

Finally, before we go, consider the following scenario. We are detecting the following simple pattern: A(Param1)->B(param2). Now imagine that param2 (a constant) is not not known initially. Rather, it can be queried from an external web service endpoint by providing Param1 that only becomes bound when event A is detected. This means that in between the events A and B, an interaction with a third party must be made, with the results used for detecting event B with specific parameters. The upshot of this is that the pattern is once again "open" for this additional interaction, in particular, all the possible events B with any parameters will have to be stored temporarily and matched against when the web service call returns. It is also quite clear, that it will not be feasible to call the web service in-thread by blocking the event processor. Instead, asynchronous interaction with other parties should be possible from the event processor.

In our company, we have use cases that require combined workflow logic and event processing functionality, and our adoption of the event processing technology is dependent on expressiveness and conciseness of the available processing styles. The key takeaway from this is that we need the EP technology to help us to reason about dynamic and unpredictable situations. We do not need it only to detect well-known and relatively rigid "patterns". Instead, we need the technology to reason for us about complex situations that involve looking at state, synchronisation, multiple outcomes, partial results, and edge conditions.