Big cleanup: concurrency and messaging

Concurrency and messaging and rules in combination is a fairly grey area and the old Prova attempted to propose a solution. The new redesign should feel more consistent and useful. All details will be on http://prova.ws/confluence site but here is a short outline. Prova agents execute protocols and send and receive messages asynchronously. The rcvMsg/rcvMult built-in in Prova does not block the processing thread but instead immediately releases it, preserving all the current rule context so that when a matching message arrives, the processing resumes as though it had never been interrupted. Now the big question is, on what thread the processing is resumed on?

Prova runs on the main thread and two additional thread pools: a conversation thread pool and a task thread pool. All common goals are run on the main thread but the thread pool a message reaction initiated by rcvMsg/rcvMult runs on is dictated by the protocol value passed to an inline reaction rcvMsg/rcvMult that receives a message initiating the goal processing (remember message reactions are goals): the self protocol now targets specifically the main thread, the async protocol targets the conversation pool, and the task protocol targets the task pool. The difference between the two thread pools is that in the async case, the particular thread from the conversation pool is uniquely chosen by taking the sequential id embedded in the conversation-id XID passed with the inbound message taken modulo the conversation pool size. This means that messages belonging to the same conversation are always run by the same thread. The added benefit of this is that any local data in context of the rule containing rcvMsg are processed only on one thread which reduces context switching on multcore architectures. The choice of the thread in the task pool is completely random.

You might assume that the reactions belonging to the same conversation run sequentially. Now imagine that while processing a reaction to a message on conversation XID, a rule sends more messages and uses rcvMsg to accept the response. Clearly, if no special care is taken, if another message arrives on the same conversation, for example, from the original source, it may be processed earlier than the mentioned response arrives, which may in some cases be undesirable. The test msg010.prova shows a particular message-based locking mechanism we propose that will ensure that the original messages are processed sequentially. The code uses a Requestor and a Lock Manager to make the messages going via Requestor execute sequentially. Note that since for any XID, the locks are processed on the same thread, they do never have contention and no synchronisation is required. The code of msg010.prova follows. Once we are completely happy with the implementation of the Requestor and Lock Manager, we will add it to the standard library.

% Prova "reloaded" messaging
% Demonstrate the use of 'cycled' event processing.
% Inbound events for 2 different XID are executed concurrently.
% 3 inbound events for the same XID are processed sequentially so that a new event is executed
% 		on the same 'conversation' thread when the previous processing is done.
% Internally each processing step consists of several concurrent processes executed on the 'task' pool.
% Only when all of them are finished, the lock manager unlocks the processing of further events on the same XID.

%%%%%%%%%%%%%%%%%
%%% REQUESTOR %%%
%%%%%%%%%%%%%%%%%

rcvMsg(XID,async,From,raw,Payload) :-
	sendMsg(XID,async,0,lock,[]),
	rcvMsg(XID,async,From,locked,[]),
	!,
	process(XID,Payload).

%%%%%%%%%%%%%%%%%%
%% LOCK MANAGER %%
%%%%%%%%%%%%%%%%%%

:- eval(lock_manager_init()).

lock_manager_init() :-
	$PLock=java.util.concurrent.ConcurrentHashMap(),
	$PJoin=java.util.concurrent.ConcurrentHashMap().

lock_manager_check(XID) :-
	Lock=$PLock.get(XID),
	Lock>0,
	!,
	fail().
lock_manager_check(XID) :-
	$PLock.put(XID,1).
	
rcvMsg(XID,async,From,lock,[]) :-
	lock_manager_check(XID),
	sendMsg(XID,async,0,locked,[]).
rcvMsg(XID,async,From,unlock,[]) :-
	$PLock.put(XID,0),
	println([unlocked,XID],": "),
	sendMsg(XID,async,0,locked,[]).

partition_join_init(XID) :-
	$PJoin.put(XID,0).

partition_join_increment(XID,N2) :-
	N=$PJoin.get(XID),
	N2=N+1,
	$PJoin.put(XID,N2).

%%%%%%%%%%%%%%%%%%
%%%%% SERVER %%%%%
%%%%%%%%%%%%%%%%%%

:- eval(server()).

% All requests run in parallel threads from the 'task' thread pool.
% This happens due to the client specifying 'task' as the protocol (second parameter to sendMsg).
server() :-
	println(["==========msg010=========="]),
	rcvMult(XID,Protocol,From,run,t(A,B,I),I),
	I2=I*2,
	sendMsg(XID,async,From,result,t(I2),I).

%%%%%%%%%%%%%%%%%%
%%%%% CLIENT %%%%%
%%%%%%%%%%%%%%%%%%

:- eval(client()).

client() :-
	element(I,[1,2]),
	% For each I, the conversation-id XID is different
	% Initialise XID so that the subsequent responses are all executed on the same thread
	sendMsg(XID,task,0,noop,[]),
	element(J,[1,2,3]),
	sendMsg(XID,async,0,raw,[I,J]),
	rcvMsg(XID,async,0,raw_result,Result,J),
	println(["Result:",I,J,Result]," ").

%%%%%%%%%%%%%%%%%
%%% PROCESSOR %%%
%%%%%%%%%%%%%%%%%

process(XID,[I,J]) :-
	partition_join_init(XID),
	element(K,[1,2,3,4]),
	% Execute on the non-partitoned 'task' thread pool
	sendMsg(XID,task,0,run,t(I,J,K),K),
	% For each response from the server, the remainder of this rule's body
	%   runs in one and the same "conversation" thread chosen from the partitioned conversation thread pool using XID as key.
	%   This means that there is no synchronisation required.
	rcvMsg(XID,async,From,result,t(K2),K),
	partition_join_increment(XID,N2),
	wrapup(XID,[I,J],N2).

% This only gets executed when all responses have arrived
wrapup(XID,[I,J],4) :-
	sendMsg(XID,async,0,raw_result,[4],J),
	sendMsg(XID,async,0,unlock,[]).