Peer working

main
Avraham Sakal 2 years ago
parent 636ba279c8
commit c0860a6b50

@ -10,7 +10,7 @@ export interface EventReactionCouplings_T<C> {
eventName: string; eventName: string;
reactions: Array<Reaction_T<C>>; reactions: Array<Reaction_T<C>>;
}; };
export type Reaction_T<C> = SideEffect_T<C> | ContextMutation_T<C> | Peering_T<C,unknown> | Goto_T; export type Reaction_T<C> = SideEffect_T<C> | ContextMutation_T<C> | Peer_T<C,unknown> | Goto_T;
export interface SideEffect_T<C> { export interface SideEffect_T<C> {
type: 'SideEffect'; type: 'SideEffect';
fn: SideEffectFunction_T<C>; fn: SideEffectFunction_T<C>;
@ -21,12 +21,12 @@ export interface ContextMutation_T<C> {
fn: ContextMutationFunction_T<C>; fn: ContextMutationFunction_T<C>;
}; };
export type ContextMutationFunction_T<C> = (ctx:C,e:Event_T,self:Interpreter_T<C>)=>C; export type ContextMutationFunction_T<C> = (ctx:C,e:Event_T,self:Interpreter_T<C>)=>C;
export interface Peering_T<C,C_Peer> { export interface Peer_T<C,C_Peer> {
type: 'Peering'; type: 'SetPeer' | 'SetPeers' | 'AddPeers';
name: string; name: string;
peerCreationFunction: PeerCreationFunction_T<C,C_Peer> peerCreationFunction: PeerCreationFunction_T<C,C_Peer>
}; };
export type PeerCreationFunction_T<C,C_Peer> = (ctx:C,e:Event_T,self:Interpreter_T<C>) => Interpreter_T<C_Peer>; export type PeerCreationFunction_T<C,C_Peer> = (ctx:C,e:Event_T,self:Interpreter_T<C>) => Interpreter_T<C_Peer> | Array<Interpreter_T<C_Peer>>;
export interface Goto_T { export interface Goto_T {
type: 'Goto'; type: 'Goto';
targetStateName: string; targetStateName: string;
@ -38,14 +38,16 @@ export const On = function<C>(eventName:string, ...reactions:Array<Reaction_T<C>
export const SideEffect = function<C>(fn:SideEffectFunction_T<C>) : SideEffect_T<C>{ return {type:'SideEffect', fn}; }; export const SideEffect = function<C>(fn:SideEffectFunction_T<C>) : SideEffect_T<C>{ return {type:'SideEffect', fn}; };
export const Goto = function(targetStateName:string) : Goto_T { return {type:'Goto', targetStateName} }; export const Goto = function(targetStateName:string) : Goto_T { return {type:'Goto', targetStateName} };
export const Context = function<C>(fn:ContextMutationFunction_T<C>) : ContextMutation_T<C> { return {type:'ContextMutation', fn} }; export const Context = function<C>(fn:ContextMutationFunction_T<C>) : ContextMutation_T<C> { return {type:'ContextMutation', fn} };
export const Peer = function<C,C_Peer>(name:string, peerCreationFunction:PeerCreationFunction_T<C,C_Peer>) : Peering_T<C,C_Peer>{ return {type:'Peering', name, peerCreationFunction}; } export const Peer = function<C,C_Peer>(name:string, peerCreationFunction:PeerCreationFunction_T<C,C_Peer>) : Peer_T<C,C_Peer>{ return {type:'SetPeer', name, peerCreationFunction}; }
export const Peers = function<C,C_Peer>(name:string, peerCreationFunction:PeerCreationFunction_T<C,C_Peer>) : Peer_T<C,C_Peer>{ return {type:'SetPeers', name, peerCreationFunction}; }
export const AddPeers = function<C,C_Peer>(name:string, peerCreationFunction:PeerCreationFunction_T<C,C_Peer>) : Peer_T<C,C_Peer>{ return {type:'AddPeers', name, peerCreationFunction}; }
export interface Interpreter_T<C> { export interface Interpreter_T<C> {
machine: Machine_T<C>; machine: Machine_T<C>;
state: string; state: string;
context: C; context: C;
peers: Record<string, Interpreter_T<unknown>>; peers: Record<string, Interpreter_T<unknown> | Array<Interpreter_T<unknown>>>;
peerSubscriptionIds: Map<Interpreter_T<unknown>,string>; peerSubscriptionIds: Map<Interpreter_T<unknown>,string>;
eventQueue:Array<Event_T>; eventQueue:Array<Event_T>;
subscriptionsToEvents: Record<string, EventsSubscriptionCallbackFunction_T<C>>; // called upon every event subscriptionsToEvents: Record<string, EventsSubscriptionCallbackFunction_T<C>>; // called upon every event
@ -133,7 +135,7 @@ function processNextEvent<C>(interpreter:Interpreter_T<C>){
// save the current context, before it's mutated, so as to pass it to sideEffects below: // save the current context, before it's mutated, so as to pass it to sideEffects below:
const originalContext = interpreter.context; const originalContext = interpreter.context;
// must process contextMutations in-series: // must process contextMutations in-series:
contextMutations.forEach((contextMutation)=>{ contextMutations.forEach((contextMutation:ContextMutation_T<C>)=>{
interpreter.context = contextMutation.fn(interpreter.context, event, interpreter); interpreter.context = contextMutation.fn(interpreter.context, event, interpreter);
}); });
// processing of `goto` must be last: // processing of `goto` must be last:
@ -146,7 +148,17 @@ function processNextEvent<C>(interpreter:Interpreter_T<C>){
} }
// now that "internal" stuff has been run, we can run "external" stuff: // now that "internal" stuff has been run, we can run "external" stuff:
// process peerings (possibly in parallel): // process peerings (possibly in parallel):
peerings.forEach((peering)=>{ addPeer(interpreter, peering.name, peering.peerCreationFunction(interpreter.context, event, interpreter)); }); peerings.forEach((peering)=>{
if(peering.type === 'SetPeer'){
setPeer(interpreter, peering.name, peering.peerCreationFunction(interpreter.context, event, interpreter) as Interpreter_T<unknown>);
}
else if(peering.type === 'SetPeers'){
setPeers(interpreter, peering.name, peering.peerCreationFunction(interpreter.context, event, interpreter) as Array<Interpreter_T<unknown>>);
}
else if(peering.type === 'AddPeers'){
addPeers(interpreter, peering.name, peering.peerCreationFunction(interpreter.context, event, interpreter) as Array<Interpreter_T<unknown>>);
}
});
// run subscription-to-events callbacks (can be in parallel), since an event just happened: // run subscription-to-events callbacks (can be in parallel), since an event just happened:
Object.values(interpreter.subscriptionsToEvents).forEach((callbackFunction)=>{ callbackFunction(event, interpreter); }); Object.values(interpreter.subscriptionsToEvents).forEach((callbackFunction)=>{ callbackFunction(event, interpreter); });
// can process sideEffects in parallel (though we currently don't due to the overhead of doing so in Node.js): // can process sideEffects in parallel (though we currently don't due to the overhead of doing so in Node.js):
@ -156,11 +168,11 @@ function processNextEvent<C>(interpreter:Interpreter_T<C>){
}); });
} }
} }
function categorizeReactions<C>(reactions:Array<Reaction_T<C>>) : {sideEffects:Array<SideEffect_T<C>>, contextMutations:Array<ContextMutation_T<C>>, peerings:Array<Peering_T<C,unknown>>, goto_:Goto_T|null}{ function categorizeReactions<C>(reactions:Array<Reaction_T<C>>) : {sideEffects:Array<SideEffect_T<C>>, contextMutations:Array<ContextMutation_T<C>>, peerings:Array<Peer_T<C,unknown>>, goto_:Goto_T|null}{
let let
sideEffects:Array<SideEffect_T<C>> = [], sideEffects:Array<SideEffect_T<C>> = [],
contextMutations:Array<ContextMutation_T<C>> = [], contextMutations:Array<ContextMutation_T<C>> = [],
peerings:Array<Peering_T<C,unknown>> = [], peerings:Array<Peer_T<C,unknown>> = [],
goto_:Goto_T|null = null; goto_:Goto_T|null = null;
reactions.forEach((reaction)=>{ reactions.forEach((reaction)=>{
if(reaction.type === 'SideEffect'){ if(reaction.type === 'SideEffect'){
@ -169,7 +181,7 @@ function categorizeReactions<C>(reactions:Array<Reaction_T<C>>) : {sideEffects:A
else if(reaction.type === 'ContextMutation'){ else if(reaction.type === 'ContextMutation'){
contextMutations.push(reaction); contextMutations.push(reaction);
} }
else if(reaction.type === 'Peering'){ else if(reaction.type === 'SetPeer'){
peerings.push(reaction); peerings.push(reaction);
} }
else if(reaction.type === 'Goto'){ else if(reaction.type === 'Goto'){
@ -206,14 +218,56 @@ export function unsubscribe<C>(interpreter:Interpreter_T<C>, subscriptionId:stri
delete interpreter.subscriptionsToEvents[subscriptionId.toString()]; delete interpreter.subscriptionsToEvents[subscriptionId.toString()];
} }
export function addPeer<C, C_Peer>(self:Interpreter_T<C>, name:string, peer:Interpreter_T<C_Peer>){ export function setPeer<C, C_Peer>(self:Interpreter_T<C>, name:string, peer:Interpreter_T<C_Peer>){
// if it exists, unsubscribe:
if(self.peers.hasOwnProperty(name)){
unsubscribe(self, self.peerSubscriptionIds.get(self.peers[name] as Interpreter_T<C_Peer>));
}
self.peers[name] = peer; self.peers[name] = peer;
subscribeToEvents(peer, (e, peer)=>{ const subscriptionId = subscribeToEvents(peer, (e, peer)=>{
// TODO: ensure there's no faulty logic in having this `if`:
// this `if` prevents infinite loops due to mutually-subscribed peers (cyclical dependencies):
if(self.isTransitioning === false){
send(self, [name+'.'+e[0], e[1]]);
}
});
self.peerSubscriptionIds.set(peer, subscriptionId);
}
export function setPeers<C, C_Peer>(self:Interpreter_T<C>, name:string, peers:Array<Interpreter_T<C_Peer>>){
// if any exist, unsubscribe:
if(self.peers.hasOwnProperty(name)){
(self.peers[name] as Array<Interpreter_T<C_Peer>>).forEach(peer => {
unsubscribe(self, self.peerSubscriptionIds.get(peer));
});
}
self.peers[name] = peers;
peers.forEach((peer)=>{
const subscriptionId = subscribeToEvents(peer, (e, peer)=>{
// TODO: ensure there's no faulty logic in having this `if`:
// this `if` prevents infinite loops due to mutually-subscribed peers (cyclical dependencies): // this `if` prevents infinite loops due to mutually-subscribed peers (cyclical dependencies):
if(self.isTransitioning === false){ if(self.isTransitioning === false){
send(self, [name+'.'+e[0], e[1]]); send(self, [name+'.'+e[0], e[1]]);
} }
}); });
self.peerSubscriptionIds.set(peer, subscriptionId);
});
}
export function addPeers<C, C_Peer>(self:Interpreter_T<C>, name:string, peers:Array<Interpreter_T<C_Peer>>){
// if none exist, initialize with empty array:
if(!self.peers.hasOwnProperty(name)){
self.peers[name] = [];
}
(self.peers[name] as Array<Interpreter_T<C_Peer>>).concat(peers);
peers.forEach((peer)=>{
const subscriptionId = subscribeToEvents(peer, (e, peer)=>{
// TODO: ensure there's no faulty logic in having this `if`:
// this `if` prevents infinite loops due to mutually-subscribed peers (cyclical dependencies):
if(self.isTransitioning === false){
send(self, [name+'.'+e[0], e[1]]);
}
});
self.peerSubscriptionIds.set(peer, subscriptionId);
});
} }
/* /*

@ -1,7 +1,7 @@
import { Machine, State, On, SideEffect, Goto, Interpreter, Interpreter_T, send, Event_T, Context, SideEffectFunction_T, Peer, PeerCreationFunction_T, ContextMutationFunction_T } from '../index'; import { Machine, State, On, SideEffect, Goto, Interpreter, Interpreter_T, send, Event_T, Context, SideEffectFunction_T, Peer, PeerCreationFunction_T, ContextMutationFunction_T } from '../index';
const wait = (ms:number)=>new Promise((resolve)=>{ setTimeout(()=>{ resolve(1); }, ms); }); const wait = (ms:number)=>new Promise((resolve)=>{ setTimeout(()=>{ resolve(1); }, ms); });
const makeRequest : SideEffectFunction_T<Cc> = (ctx,e,self)=>{ send(self.peers.server, ['received-request',self]); }; const makeRequest : SideEffectFunction_T<Cc> = (ctx,e,self)=>{ send((self.peers.server as Interpreter_T<Cs>), ['received-request',self]); };
const sendResponse : SideEffectFunction_T<Cs> = (ctx,e,self)=>{ send(ctx.client, ['received-response',self]); }; const sendResponse : SideEffectFunction_T<Cs> = (ctx,e,self)=>{ send(ctx.client, ['received-response',self]); };
const startTimer : SideEffectFunction_T<Cs> = async (ctx,e,self)=>{ await wait(1500); console.log(' timer actually finished'); send(self, ['timer-finished',null]); } const startTimer : SideEffectFunction_T<Cs> = async (ctx,e,self)=>{ await wait(1500); console.log(' timer actually finished'); send(self, ['timer-finished',null]); }
const log = (namespace:string)=>(ctx, e, self)=>{ console.log(namespace, self.state, e[0]); }; const log = (namespace:string)=>(ctx, e, self)=>{ console.log(namespace, self.state, e[0]); };

Loading…
Cancel
Save