From c0860a6b50f476553c2a1749e2e7f98078a068c8 Mon Sep 17 00:00:00 2001 From: Avraham Sakal Date: Wed, 17 May 2023 18:09:44 -0400 Subject: [PATCH] Peer working --- src/index.ts | 80 ++++++++++++++++++++++++++++++++------- src/tests/01-ping-pong.ts | 2 +- 2 files changed, 68 insertions(+), 14 deletions(-) diff --git a/src/index.ts b/src/index.ts index 89bf527..66adeee 100644 --- a/src/index.ts +++ b/src/index.ts @@ -10,7 +10,7 @@ export interface EventReactionCouplings_T { eventName: string; reactions: Array>; }; -export type Reaction_T = SideEffect_T | ContextMutation_T | Peering_T | Goto_T; +export type Reaction_T = SideEffect_T | ContextMutation_T | Peer_T | Goto_T; export interface SideEffect_T { type: 'SideEffect'; fn: SideEffectFunction_T; @@ -21,12 +21,12 @@ export interface ContextMutation_T { fn: ContextMutationFunction_T; }; export type ContextMutationFunction_T = (ctx:C,e:Event_T,self:Interpreter_T)=>C; -export interface Peering_T { - type: 'Peering'; +export interface Peer_T { + type: 'SetPeer' | 'SetPeers' | 'AddPeers'; name: string; peerCreationFunction: PeerCreationFunction_T }; -export type PeerCreationFunction_T = (ctx:C,e:Event_T,self:Interpreter_T) => Interpreter_T; +export type PeerCreationFunction_T = (ctx:C,e:Event_T,self:Interpreter_T) => Interpreter_T | Array>; export interface Goto_T { type: 'Goto'; targetStateName: string; @@ -38,14 +38,16 @@ export const On = function(eventName:string, ...reactions:Array export const SideEffect = function(fn:SideEffectFunction_T) : SideEffect_T{ return {type:'SideEffect', fn}; }; export const Goto = function(targetStateName:string) : Goto_T { return {type:'Goto', targetStateName} }; export const Context = function(fn:ContextMutationFunction_T) : ContextMutation_T { return {type:'ContextMutation', fn} }; -export const Peer = function(name:string, peerCreationFunction:PeerCreationFunction_T) : Peering_T{ return {type:'Peering', name, peerCreationFunction}; } +export const Peer = function(name:string, peerCreationFunction:PeerCreationFunction_T) : Peer_T{ return {type:'SetPeer', name, peerCreationFunction}; } +export const Peers = function(name:string, peerCreationFunction:PeerCreationFunction_T) : Peer_T{ return {type:'SetPeers', name, peerCreationFunction}; } +export const AddPeers = function(name:string, peerCreationFunction:PeerCreationFunction_T) : Peer_T{ return {type:'AddPeers', name, peerCreationFunction}; } export interface Interpreter_T { machine: Machine_T; state: string; context: C; - peers: Record>; + peers: Record | Array>>; peerSubscriptionIds: Map,string>; eventQueue:Array; subscriptionsToEvents: Record>; // called upon every event @@ -133,7 +135,7 @@ function processNextEvent(interpreter:Interpreter_T){ // save the current context, before it's mutated, so as to pass it to sideEffects below: const originalContext = interpreter.context; // must process contextMutations in-series: - contextMutations.forEach((contextMutation)=>{ + contextMutations.forEach((contextMutation:ContextMutation_T)=>{ interpreter.context = contextMutation.fn(interpreter.context, event, interpreter); }); // processing of `goto` must be last: @@ -146,7 +148,17 @@ function processNextEvent(interpreter:Interpreter_T){ } // now that "internal" stuff has been run, we can run "external" stuff: // process peerings (possibly in parallel): - peerings.forEach((peering)=>{ addPeer(interpreter, peering.name, peering.peerCreationFunction(interpreter.context, event, interpreter)); }); + peerings.forEach((peering)=>{ + if(peering.type === 'SetPeer'){ + setPeer(interpreter, peering.name, peering.peerCreationFunction(interpreter.context, event, interpreter) as Interpreter_T); + } + else if(peering.type === 'SetPeers'){ + setPeers(interpreter, peering.name, peering.peerCreationFunction(interpreter.context, event, interpreter) as Array>); + } + else if(peering.type === 'AddPeers'){ + addPeers(interpreter, peering.name, peering.peerCreationFunction(interpreter.context, event, interpreter) as Array>); + } + }); // run subscription-to-events callbacks (can be in parallel), since an event just happened: Object.values(interpreter.subscriptionsToEvents).forEach((callbackFunction)=>{ callbackFunction(event, interpreter); }); // can process sideEffects in parallel (though we currently don't due to the overhead of doing so in Node.js): @@ -156,11 +168,11 @@ function processNextEvent(interpreter:Interpreter_T){ }); } } -function categorizeReactions(reactions:Array>) : {sideEffects:Array>, contextMutations:Array>, peerings:Array>, goto_:Goto_T|null}{ +function categorizeReactions(reactions:Array>) : {sideEffects:Array>, contextMutations:Array>, peerings:Array>, goto_:Goto_T|null}{ let sideEffects:Array> = [], contextMutations:Array> = [], - peerings:Array> = [], + peerings:Array> = [], goto_:Goto_T|null = null; reactions.forEach((reaction)=>{ if(reaction.type === 'SideEffect'){ @@ -169,7 +181,7 @@ function categorizeReactions(reactions:Array>) : {sideEffects:A else if(reaction.type === 'ContextMutation'){ contextMutations.push(reaction); } - else if(reaction.type === 'Peering'){ + else if(reaction.type === 'SetPeer'){ peerings.push(reaction); } else if(reaction.type === 'Goto'){ @@ -206,14 +218,56 @@ export function unsubscribe(interpreter:Interpreter_T, subscriptionId:stri delete interpreter.subscriptionsToEvents[subscriptionId.toString()]; } -export function addPeer(self:Interpreter_T, name:string, peer:Interpreter_T){ +export function setPeer(self:Interpreter_T, name:string, peer:Interpreter_T){ + // if it exists, unsubscribe: + if(self.peers.hasOwnProperty(name)){ + unsubscribe(self, self.peerSubscriptionIds.get(self.peers[name] as Interpreter_T)); + } self.peers[name] = peer; - subscribeToEvents(peer, (e, peer)=>{ + const subscriptionId = subscribeToEvents(peer, (e, peer)=>{ + // TODO: ensure there's no faulty logic in having this `if`: // this `if` prevents infinite loops due to mutually-subscribed peers (cyclical dependencies): if(self.isTransitioning === false){ send(self, [name+'.'+e[0], e[1]]); } }); + self.peerSubscriptionIds.set(peer, subscriptionId); +} +export function setPeers(self:Interpreter_T, name:string, peers:Array>){ + // if any exist, unsubscribe: + if(self.peers.hasOwnProperty(name)){ + (self.peers[name] as Array>).forEach(peer => { + unsubscribe(self, self.peerSubscriptionIds.get(peer)); + }); + } + self.peers[name] = peers; + peers.forEach((peer)=>{ + const subscriptionId = subscribeToEvents(peer, (e, peer)=>{ + // TODO: ensure there's no faulty logic in having this `if`: + // this `if` prevents infinite loops due to mutually-subscribed peers (cyclical dependencies): + if(self.isTransitioning === false){ + send(self, [name+'.'+e[0], e[1]]); + } + }); + self.peerSubscriptionIds.set(peer, subscriptionId); + }); +} +export function addPeers(self:Interpreter_T, name:string, peers:Array>){ + // if none exist, initialize with empty array: + if(!self.peers.hasOwnProperty(name)){ + self.peers[name] = []; + } + (self.peers[name] as Array>).concat(peers); + peers.forEach((peer)=>{ + const subscriptionId = subscribeToEvents(peer, (e, peer)=>{ + // TODO: ensure there's no faulty logic in having this `if`: + // this `if` prevents infinite loops due to mutually-subscribed peers (cyclical dependencies): + if(self.isTransitioning === false){ + send(self, [name+'.'+e[0], e[1]]); + } + }); + self.peerSubscriptionIds.set(peer, subscriptionId); + }); } /* diff --git a/src/tests/01-ping-pong.ts b/src/tests/01-ping-pong.ts index 38bc1f0..43d58c5 100644 --- a/src/tests/01-ping-pong.ts +++ b/src/tests/01-ping-pong.ts @@ -1,7 +1,7 @@ import { Machine, State, On, SideEffect, Goto, Interpreter, Interpreter_T, send, Event_T, Context, SideEffectFunction_T, Peer, PeerCreationFunction_T, ContextMutationFunction_T } from '../index'; const wait = (ms:number)=>new Promise((resolve)=>{ setTimeout(()=>{ resolve(1); }, ms); }); -const makeRequest : SideEffectFunction_T = (ctx,e,self)=>{ send(self.peers.server, ['received-request',self]); }; +const makeRequest : SideEffectFunction_T = (ctx,e,self)=>{ send((self.peers.server as Interpreter_T), ['received-request',self]); }; const sendResponse : SideEffectFunction_T = (ctx,e,self)=>{ send(ctx.client, ['received-response',self]); }; const startTimer : SideEffectFunction_T = async (ctx,e,self)=>{ await wait(1500); console.log(' timer actually finished'); send(self, ['timer-finished',null]); } const log = (namespace:string)=>(ctx, e, self)=>{ console.log(namespace, self.state, e[0]); };