diff --git a/raft/consensus_state_machine.nim b/raft/consensus_state_machine.nim index 81cdf36..9d08547 100644 --- a/raft/consensus_state_machine.nim +++ b/raft/consensus_state_machine.nim @@ -15,37 +15,33 @@ import types type # Node events EventType = enum - NodeStart, NodeStop, NodeTick, NodeStepDown, NodeStepDownToCandidate, NodeStepDownToFollower, NodeStepDownToLeader, - NodeStepDownToShutdown, VotingTimeout, ElectionTimeout, HeartbeatTimeout, HeartbeatReceived, HeartbeatSent, AppendEntriesReceived, - AppendEntriesSent, RequestVoteReceived, RequestVoteSent, RequestVoteGranted, RequestVoteDenied, ClientRequestReceived, - ClientRequestSent, ClientRequestProcessed, ClientRequestFailed, ClientRequestTimeout, ClientRequestRetry, ClientRequestRetryExhausted, - AddNewNode, RemoveNode, NodeShutdown, NodeShutdownComplete, NodeShutdownFailed, NodeShutdownTimeout, NodeShutdownRetry - + VotingTimeout, ElectionTimeout, HeartbeatTimeout, HeartbeatReceived, HeartbeatSent, AppendEntriesReceived, + AppendEntriesSent, RequestVoteReceived, RequestVoteSent, ClientRequestReceived, ClientRequestProcessed # Define callback to use with Terminals. Node states are updated/read in-place in the node object ConsensusFSMCallbackType*[NodeType] = proc(node: NodeType) {.gcsafe.} # Define Non-Terminals as a (unique) tuples of the internal state and a sequence of callbacks - NonTerminalSymbol*[NodeType] = (NodeType, seq[ConsensusFSMCallbackType[NodeType]]) + NonTerminalSymbol*[NodeState] = NodeState # Define logical functions (conditions) computed from our NodeType etc. (Truth Table) LogicalFunctionConditionValueType* = bool LogicalFunctionCondition*[EventType, NodeTytpe, RaftMessageBase] = proc(e: EventType, n: NodeTytpe, msg: Option[RaftMessageBase]): bool - LogicalFunctionConditionsLUT*[EventType, NodeType, RaftMessageBase] = Table[(EventType, NodeType), seq[LogicalFunctionCondition[EventType, NodeType, Option[RaftMessageBase]]]] + LogicalFunctionConditionsLUT*[NodeState, EventType, NodeType, RaftMessageBase] = Table[(NodeState, EventType), seq[LogicalFunctionCondition[EventType, NodeType, Option[RaftMessageBase]]]] # Define Terminals as a tuple of a Event and a sequence of logical functions (conditions) and their respective values computed from NodeType, NodeTytpe and RaftMessageBase # (kind of Truth Table) TerminalSymbol*[EventType, NodeType, RaftMessageBase] = (EventType, seq[LogicalFunctionConditionValueType]) # Define State Transition Rules LUT of the form ( NonTerminal -> Terminal ) -> NonTerminal ) - StateTransitionsRulesLUT*[NodeType, EventType, RaftMessageBase] = Table[ - (NonTerminalSymbol[NodeType], TerminalSymbol[NodeType, EventType, RaftMessageBase]), + StateTransitionsRulesLUT*[NodeState, EventType, NodeType, RaftMessageBase] = Table[ + (NonTerminalSymbol[NodeState], TerminalSymbol[NodeType, EventType, RaftMessageBase]), NonTerminalSymbol[NodeType]] # FSM type definition - ConsensusFSM*[NodeType, EventType, BaseRaftMessage] = ref object + ConsensusFSM*[NodeState, EventType, NodeType, RaftMessageBase] = ref object mtx: RLock state: NonTerminalSymbol[NodeType] - stateTransitionsLUT: StateTransitionsRulesLUT[NodeType, EventType, RaftMessageBase] + stateTransitionsLUT: StateTransitionsRulesLUT[NodeState, EventType, NodeType, RaftMessageBase] logicalFunctionsLut: LogicalFunctionConditionsLUT[EventType, NodeType, RaftMessageBase] # FSM type constructor diff --git a/raft/fsm.cpp b/raft/fsm.cpp deleted file mode 100644 index 43272cf..0000000 --- a/raft/fsm.cpp +++ /dev/null @@ -1,38 +0,0 @@ -// no need to retry for timeout, network sends packets all the time and finally ends with an error time out, and next restart it. - fsm.addStateAction( bsrnchlConnecting, ConnectingSocketChannel::startConnecting, false ); - fsm.add( bsrnchlConnecting, levsig_stop, bsrnchlStopped, ConnectingSocketChannel::closeSocket ); // close it - fsm.add( bsrnchlConnecting, levsig_start, bsrnchlConnecting, fsm.Empty ); - fsm.add( bsrnchlConnecting, levsig_connected, bsrnchlConnected, fsm.Empty ); - fsm.add( bsrnchlConnecting, levsig_accepted, bsrnchlConnected, fsm.Empty ); - fsm.add( bsrnchlConnecting, levsig_failed, bsrnchlFailed, fsm.Empty ); - fsm.add( bsrnchlConnecting, levsig_disconnected, bsrnchlFailed, fsm.Empty ); - fsm.add( bsrnchlConnecting, levsig_timeout, bsrnchlConnecting, ConnectingSocketChannel::startRestartConnecting ); - - fsm.addStateAction( bsrnchlFailed, ConnectingSocketChannel::closeSocket, false ); - fsm.add( bsrnchlFailed, levsig_stop, bsrnchlStopped, fsm.Empty ); // close it - fsm.add( bsrnchlFailed, levsig_start, bsrnchlFailed, fsm.Empty ); - fsm.add( bsrnchlFailed, levsig_connected, bsrnchlConnected, fsm.Empty ); // this is not expected - fsm.add( bsrnchlFailed, levsig_accepted, bsrnchlConnected, fsm.Empty ); // this is not expected - fsm.add( bsrnchlFailed, levsig_failed, bsrnchlFailed, fsm.Empty ); - fsm.add( bsrnchlFailed, levsig_disconnected, bsrnchlFailed, fsm.Empty ); - fsm.add( bsrnchlFailed, levsig_timeout, bsrnchlConnecting, fsm.Empty ); - - fsm.addStateAction( bsrnchlConnected, ConnectingSocketChannel::startProtocolCommunication, true ); - fsm.add( bsrnchlConnected, levsig_stop, bsrnchlStoppingRequest, fsm.Empty ); // close it - fsm.add( bsrnchlConnected, levsig_start, bsrnchlConnected, fsm.Empty ); - fsm.add( bsrnchlConnected, levsig_connected, bsrnchlConnected, fsm.Empty ); - fsm.add( bsrnchlConnected, levsig_accepted, bsrnchlConnected, fsm.Empty ); - fsm.add( bsrnchlConnected, levsig_failed, bsrnchlFailed, fsm.Empty ); - fsm.add( bsrnchlConnected, levsig_disconnected, bsrnchlFailed, fsm.Empty ); - fsm.add( bsrnchlConnected, levsig_timeout, bsrnchlConnected, fsm.Empty ); - fsm.add( bsrnchlConnected, levsig_handshakeDone, bsrnchlEstablishedSuccess, fsm.Empty ); - fsm.add( bsrnchlConnected, levsig_handshakeFailed, bsrnchlFailed, fsm.Empty ); - - fsm.add( bsrnchlEstablishedSuccess, levsig_stop, bsrnchlStoppingRequest, fsm.Empty ); // close it - fsm.add( bsrnchlEstablishedSuccess, levsig_start, bsrnchlEstablishedSuccess, fsm.Empty ); - fsm.add( bsrnchlEstablishedSuccess, levsig_connected, bsrnchlEstablishedSuccess, fsm.Empty ); - fsm.add( bsrnchlEstablishedSuccess, levsig_accepted, bsrnchlEstablishedSuccess, fsm.Empty ); - fsm.add( bsrnchlEstablishedSuccess, levsig_failed, bsrnchlFailed, fsm.Empty ); - fsm.add( bsrnchlEstablishedSuccess, levsig_disconnected, bsrnchlFailed, fsm.Empty ); - fsm.add( bsrnchlEstablishedSuccess, levsig_timeout, bsrnchlEstablishedSuccess, fsm.Empty ); - fsm.add( bsrnchlEstablishedSuccess, levsig_handshakeDone, bsrnchlEstablishedSuccess, fsm.Empty ); \ No newline at end of file diff --git a/tests/basic_cluster.nim b/tests/basic_cluster.nim index 399bc36..a5d29d6 100644 --- a/tests/basic_cluster.nim +++ b/tests/basic_cluster.nim @@ -21,11 +21,15 @@ type nodesLock*: RLock networkDelayJitter*: int -proc basicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] = +proc basicRaftClusterRaftMessageSendCallbackCreateWithNetDelay[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] = proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} = await raftTimerCreate(rand(cluster.networkDelayJitter), proc()=discard) # Simulate network delay result = await cluster.nodes[msg.receiverId].raftNodeMessageDeliver(msg) +proc basicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] = + proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} = + result = await cluster.nodes[msg.receiverId].raftNodeMessageDeliver(msg) + proc basicRaftClusterStart*(cluster: BasicRaftCluster) = for id, node in cluster.nodes: raftNodeStart(node)