This commit is contained in:
Raycho Mukelov 2023-10-30 11:58:48 +02:00
parent c71b2210ed
commit 903f4d9260
3 changed files with 13 additions and 51 deletions

View File

@ -15,37 +15,33 @@ import types
type
# Node events
EventType = enum
NodeStart, NodeStop, NodeTick, NodeStepDown, NodeStepDownToCandidate, NodeStepDownToFollower, NodeStepDownToLeader,
NodeStepDownToShutdown, VotingTimeout, ElectionTimeout, HeartbeatTimeout, HeartbeatReceived, HeartbeatSent, AppendEntriesReceived,
AppendEntriesSent, RequestVoteReceived, RequestVoteSent, RequestVoteGranted, RequestVoteDenied, ClientRequestReceived,
ClientRequestSent, ClientRequestProcessed, ClientRequestFailed, ClientRequestTimeout, ClientRequestRetry, ClientRequestRetryExhausted,
AddNewNode, RemoveNode, NodeShutdown, NodeShutdownComplete, NodeShutdownFailed, NodeShutdownTimeout, NodeShutdownRetry
VotingTimeout, ElectionTimeout, HeartbeatTimeout, HeartbeatReceived, HeartbeatSent, AppendEntriesReceived,
AppendEntriesSent, RequestVoteReceived, RequestVoteSent, ClientRequestReceived, ClientRequestProcessed
# Define callback to use with Terminals. Node states are updated/read in-place in the node object
ConsensusFSMCallbackType*[NodeType] = proc(node: NodeType) {.gcsafe.}
# Define Non-Terminals as a (unique) tuples of the internal state and a sequence of callbacks
NonTerminalSymbol*[NodeType] = (NodeType, seq[ConsensusFSMCallbackType[NodeType]])
NonTerminalSymbol*[NodeState] = NodeState
# Define logical functions (conditions) computed from our NodeType etc. (Truth Table)
LogicalFunctionConditionValueType* = bool
LogicalFunctionCondition*[EventType, NodeTytpe, RaftMessageBase] = proc(e: EventType, n: NodeTytpe, msg: Option[RaftMessageBase]): bool
LogicalFunctionConditionsLUT*[EventType, NodeType, RaftMessageBase] = Table[(EventType, NodeType), seq[LogicalFunctionCondition[EventType, NodeType, Option[RaftMessageBase]]]]
LogicalFunctionConditionsLUT*[NodeState, EventType, NodeType, RaftMessageBase] = Table[(NodeState, EventType), seq[LogicalFunctionCondition[EventType, NodeType, Option[RaftMessageBase]]]]
# Define Terminals as a tuple of a Event and a sequence of logical functions (conditions) and their respective values computed from NodeType, NodeTytpe and RaftMessageBase
# (kind of Truth Table)
TerminalSymbol*[EventType, NodeType, RaftMessageBase] = (EventType, seq[LogicalFunctionConditionValueType])
# Define State Transition Rules LUT of the form ( NonTerminal -> Terminal ) -> NonTerminal )
StateTransitionsRulesLUT*[NodeType, EventType, RaftMessageBase] = Table[
(NonTerminalSymbol[NodeType], TerminalSymbol[NodeType, EventType, RaftMessageBase]),
StateTransitionsRulesLUT*[NodeState, EventType, NodeType, RaftMessageBase] = Table[
(NonTerminalSymbol[NodeState], TerminalSymbol[NodeType, EventType, RaftMessageBase]),
NonTerminalSymbol[NodeType]]
# FSM type definition
ConsensusFSM*[NodeType, EventType, BaseRaftMessage] = ref object
ConsensusFSM*[NodeState, EventType, NodeType, RaftMessageBase] = ref object
mtx: RLock
state: NonTerminalSymbol[NodeType]
stateTransitionsLUT: StateTransitionsRulesLUT[NodeType, EventType, RaftMessageBase]
stateTransitionsLUT: StateTransitionsRulesLUT[NodeState, EventType, NodeType, RaftMessageBase]
logicalFunctionsLut: LogicalFunctionConditionsLUT[EventType, NodeType, RaftMessageBase]
# FSM type constructor

View File

@ -1,38 +0,0 @@
// no need to retry for timeout, network sends packets all the time and finally ends with an error time out, and next restart it.
fsm.addStateAction( bsrnchlConnecting, ConnectingSocketChannel::startConnecting, false );
fsm.add( bsrnchlConnecting, levsig_stop, bsrnchlStopped, ConnectingSocketChannel::closeSocket ); // close it
fsm.add( bsrnchlConnecting, levsig_start, bsrnchlConnecting, fsm.Empty );
fsm.add( bsrnchlConnecting, levsig_connected, bsrnchlConnected, fsm.Empty );
fsm.add( bsrnchlConnecting, levsig_accepted, bsrnchlConnected, fsm.Empty );
fsm.add( bsrnchlConnecting, levsig_failed, bsrnchlFailed, fsm.Empty );
fsm.add( bsrnchlConnecting, levsig_disconnected, bsrnchlFailed, fsm.Empty );
fsm.add( bsrnchlConnecting, levsig_timeout, bsrnchlConnecting, ConnectingSocketChannel::startRestartConnecting );
fsm.addStateAction( bsrnchlFailed, ConnectingSocketChannel::closeSocket, false );
fsm.add( bsrnchlFailed, levsig_stop, bsrnchlStopped, fsm.Empty ); // close it
fsm.add( bsrnchlFailed, levsig_start, bsrnchlFailed, fsm.Empty );
fsm.add( bsrnchlFailed, levsig_connected, bsrnchlConnected, fsm.Empty ); // this is not expected
fsm.add( bsrnchlFailed, levsig_accepted, bsrnchlConnected, fsm.Empty ); // this is not expected
fsm.add( bsrnchlFailed, levsig_failed, bsrnchlFailed, fsm.Empty );
fsm.add( bsrnchlFailed, levsig_disconnected, bsrnchlFailed, fsm.Empty );
fsm.add( bsrnchlFailed, levsig_timeout, bsrnchlConnecting, fsm.Empty );
fsm.addStateAction( bsrnchlConnected, ConnectingSocketChannel::startProtocolCommunication, true );
fsm.add( bsrnchlConnected, levsig_stop, bsrnchlStoppingRequest, fsm.Empty ); // close it
fsm.add( bsrnchlConnected, levsig_start, bsrnchlConnected, fsm.Empty );
fsm.add( bsrnchlConnected, levsig_connected, bsrnchlConnected, fsm.Empty );
fsm.add( bsrnchlConnected, levsig_accepted, bsrnchlConnected, fsm.Empty );
fsm.add( bsrnchlConnected, levsig_failed, bsrnchlFailed, fsm.Empty );
fsm.add( bsrnchlConnected, levsig_disconnected, bsrnchlFailed, fsm.Empty );
fsm.add( bsrnchlConnected, levsig_timeout, bsrnchlConnected, fsm.Empty );
fsm.add( bsrnchlConnected, levsig_handshakeDone, bsrnchlEstablishedSuccess, fsm.Empty );
fsm.add( bsrnchlConnected, levsig_handshakeFailed, bsrnchlFailed, fsm.Empty );
fsm.add( bsrnchlEstablishedSuccess, levsig_stop, bsrnchlStoppingRequest, fsm.Empty ); // close it
fsm.add( bsrnchlEstablishedSuccess, levsig_start, bsrnchlEstablishedSuccess, fsm.Empty );
fsm.add( bsrnchlEstablishedSuccess, levsig_connected, bsrnchlEstablishedSuccess, fsm.Empty );
fsm.add( bsrnchlEstablishedSuccess, levsig_accepted, bsrnchlEstablishedSuccess, fsm.Empty );
fsm.add( bsrnchlEstablishedSuccess, levsig_failed, bsrnchlFailed, fsm.Empty );
fsm.add( bsrnchlEstablishedSuccess, levsig_disconnected, bsrnchlFailed, fsm.Empty );
fsm.add( bsrnchlEstablishedSuccess, levsig_timeout, bsrnchlEstablishedSuccess, fsm.Empty );
fsm.add( bsrnchlEstablishedSuccess, levsig_handshakeDone, bsrnchlEstablishedSuccess, fsm.Empty );

View File

@ -21,11 +21,15 @@ type
nodesLock*: RLock
networkDelayJitter*: int
proc basicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc basicRaftClusterRaftMessageSendCallbackCreateWithNetDelay[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
await raftTimerCreate(rand(cluster.networkDelayJitter), proc()=discard) # Simulate network delay
result = await cluster.nodes[msg.receiverId].raftNodeMessageDeliver(msg)
proc basicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
result = await cluster.nodes[msg.receiverId].raftNodeMessageDeliver(msg)
proc basicRaftClusterStart*(cluster: BasicRaftCluster) =
for id, node in cluster.nodes:
raftNodeStart(node)