From c28243cb8a2a65fcc62760465435abe8f5c421f8 Mon Sep 17 00:00:00 2001 From: Marto Date: Thu, 4 Jan 2024 19:06:20 +0200 Subject: [PATCH] WIP: Inital consensus state machine work --- raft/consensus_state_machine.nim | 196 ++++++++++++------------- raft/types.nim | 7 +- tests/all_tests.nim | 3 +- tests/test_consensus_state_machine.nim | 34 ++++- 4 files changed, 128 insertions(+), 112 deletions(-) diff --git a/raft/consensus_state_machine.nim b/raft/consensus_state_machine.nim index 30bbc84..5e905fc 100644 --- a/raft/consensus_state_machine.nim +++ b/raft/consensus_state_machine.nim @@ -7,123 +7,121 @@ # This file may not be copied, modified, or distributed except according to # those terms. -import std/tables -import std/rlocks import types -import chronicles +import std/[times] type - # Node events - EventType* = enum - VotingTimeout, - ElectionTimeout, - HeartbeatTimeout, - AppendEntriesTimeout - HeartbeatReceived, - HeartbeatSent, - AppendEntriesReceived, - AppendEntriesSent, - RequestVoteReceived, - RequestVoteSent, - ClientRequestReceived, - ClientRequestProcessed + RaftRpcMessageType* = enum + Vote = 0, + VoteReplay = 1, + Append = 2, + AppendReplay = 3 + - # Define callback to use with transitions. It is a function that takes a node as an argument and returns nothing. - # It is used to perform some action when a transition is triggered. For example, when a node becomes a leader, - # it should start sending heartbeats to other nodes. - ConsensusFsmTransitionActionType*[NodeType] = proc(node: NodeType) {.gcsafe.} + RaftRpcAppendRequest* = object + previousTerm: RaftNodeTerm + + RaftRpcAppendReplay* = object - # Define logical functions (conditions) computed from our NodeType etc. (Truth Table) - LogicalConditionValueType* = bool - LogicalCondition*[NodeTytpe, RaftMessageType] = - proc(node: NodeTytpe, msg: Option[RaftMessageType]): LogicalConditionValueType - LogicalConditionsLut*[RaftNodeState, EventType, NodeType, RaftMessageType] = - Table[(RaftNodeState, EventType), seq[LogicalCondition[NodeType, RaftMessageType]]] + RaftRpcVoteRequest* = object + + RaftRpcVoteReplay* = object + + LeaderState* = object - # Define Terminals as a tuple of a Event and a sequence of logical functions (conditions) and their respective values computed from NodeType, NodeTytpe and RaftMessageType - # (kind of Truth Table) - TerminalSymbol*[EventType, NodeType, RaftMessageType] = - (EventType, seq[LogicalConditionValueType]) + CandidateState* = object + votes: seq[RaftNodeId] - # Define State Transition Rules LUT of the form ( NonTerminal -> Terminal ) -> NonTerminal ) - # NonTerminal is a NodeState and Terminal is a TerminalSymbol - the tuple (EventType, seq[LogicalConditionValueType]) - StateTransitionsRulesLut*[RaftNodeState, EventType, NodeType, RaftMessageType] = Table[ - (RaftNodeState, TerminalSymbol[NodeType, EventType, RaftMessageType]), - (RaftNodeState, Option[ConsensusFsmTransitionActionType[NodeType]]) - ] + FollowerState* = object + leader: RaftNodeId - # FSM type definition - ConsensusFsm*[RaftNodeState, EventType, NodeType, RaftMessageType] = ref object - mtx: RLock - state: RaftNodeState - logicalFunctionsLut: LogicalConditionsLut[RaftNodeState, EventType, NodeType, RaftMessageType] - stateTransitionsLut: StateTransitionsRulesLut[RaftNodeState, EventType, NodeType, RaftMessageType] + RaftRpcMessage* = object + currentTerm: RaftNodeTerm + sender: RaftNodeId + case kind: RaftRpcMessageType + of Vote: voteRequest: RaftRpcVoteRequest + of VoteReplay: voteReplay: RaftRpcVoteReplay + of Append: appendRequest: RaftRpcAppendRequest + of AppendReplay: appendReplay: RaftRpcAppendReplay -# FSM type constructor -proc new*[RaftNodeState, EventType, NodeType, RaftMessageType]( - T: type ConsensusFsm[RaftNodeState, EventType, NodeType, RaftMessageType], startSymbol: RaftNodeState = rnsFollower): T = + Command* = object + data: seq[byte] + LogEntry* = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.) + term: RaftNodeTerm + index: RaftLogIndex + # TODO: Add configuration too + data: Command - result = T(mtx: RLock(), state: startSymbol) - initRLock(result.mtx) + RaftLog* = object + logEntries: seq[LogEntry] - debug "new: ", fsm=repr(result) + RaftStateMachineOutput* = object + logEntries: seq[LogEntry] + # Entries that should be applyed to the "User" State machine + committed: seq[LogEntry] + messages: seq[RaftRpcMessage] + debugLogs: seq[string] -proc addFsmTransition*[RaftNodeState, EventType, NodeType, RaftMessageType]( - fsm: ConsensusFsm[RaftNodeState, EventType, NodeType, RaftMessageType], - fromState: RaftNodeState, - termSymb: TerminalSymbol[EventType, NodeType, RaftMessageType], - toState: RaftNodeState, - action: Option[ConsensusFsmTransitionActionType]) = + RaftStateMachine* = object + myId*: RaftNodeId + term: RaftNodeTerm + commitIndex: RaftLogIndex + log: RaftLog + output: RaftStateMachineOutput + electionTimeout: times.Duration + lastUpdate: Time + case state: RaftNodeState + of rnsFollower: follower : FollowerState + of rnsCandidate: candidate: CandidateState + of rnsLeader: leader: LeaderState - fsm.stateTransitionsLut[(fromState.state, termSymb)] = (toState, action) + RaftStateMachineConfig* = object -proc addFsmTransitionLogicalConditions*[RaftNodeState, EventType, NodeType, RaftMessageType]( - fsm: ConsensusFsm[RaftNodeState, EventType, NodeType, RaftMessageType], - state: RaftNodeState, - event: EventType, - logicalConditions: seq[LogicalCondition[NodeType, RaftMessageType]]) = +func append(rf: var RaftLog, entry: LogEntry) = + rf.logEntries.add(entry) - fsm.logicalFunctionsLut[(state, event)] = logicalConditions +func debug*(sm: var RaftStateMachine, log: string) = + sm.output.debugLogs.add(log) + +func initRaftStateMachine*(config: RaftStateMachineConfig): RaftStateMachine = + var st = RaftStateMachine() + st.term = 0 + st.commitIndex = 0 + st.state = RaftNodeState.rnsFollower + return st -proc computeFsmLogicFunctionsPermutationValuе*[RaftNodeState, NodeType, EventType, RaftMessageType]( - fsm: ConsensusFsm[RaftNodeState, EventType, NodeType, RaftMessageType], - node: NodeType, - termSymb: TerminalSymbol, - msg: Option[RaftMessageType]): TerminalSymbol = - let - e = termSymb[0] +func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage = + return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: Vote, voteRequest: RaftRpcVoteRequest()) - debug "computeFSMLogicFunctionsPermutationValue: ", eventType=e, " ", nonTermSymb=nts, " ", msg=msg +func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage,currentTime: Time) = + if sm.term > msg.currentTerm: + sm.debug "Current node is behind" + elif sm.term < msg.currentTerm: + sm.debug "The leader is behind" + else: + case sm.state: + of rnsCandidate: + sm.state = rnsFollower + sm.follower.leader = msg.sender + of rnsFollower: + sm.follower.leader = msg.sender + of rnsLeader: + sm.debug "For now it should be impossible to have msg from a non leader to a leader" - var - logicFunctionsConds = fsm.logicalFunctionsLut[(e, NodeType)] - logicFunctionsCondsValues = seq[LogicalConditionValueType] + case msg.kind: + of RaftRpcMessageType.Vote: + sm.debug "Handle vote request" + else: + sm.debug "Unhandle msg type" - debug "computeFSMLogicFunctionsPermutationValue: ", logicFunctionsConds=logicFunctionsConds +func addEntry*(sm: var RaftStateMachine, entry: LogEntry) = + if sm.state != RaftNodeState.rnsLeader: + sm.debug "Error: only the leader can handle new entries" + sm.log.append(entry) - for f in logicFunctionsConds: - logicFunctionsCondsValues.add f(node, msg) - - debug "computeFSMLogicFunctionsPermutationValue: ", logicFunctionsCondsValues=logicFunctionsCondsValues - - termSymb[1] = logicFunctionsConds - result = termSymb - -proc fsmAdvance*[RaftNodeState, EventType, NodeType, RaftMessageType]( - fsm: ConsensusFsm[RaftNodeState, EventType, NodeType, RaftMessageType], - node: NodeType, - termSymb: TerminalSymbol[EventType, NodeType, RaftMessageType], - msg: Option[RaftMessageType]): RaftNodeState = - - withRLock(): - var - input = computeFsmLogicFunctionsPermutationValue(fsm, node, termSymb, msg) - let trans = fsm.stateTransitionsLut[(fsm.state, input)] - let action = trans[1] - - fsm.state = trans[0] - debug "ConsensusFsmAdvance", fsmState=fsm.state, " ", input=input, " ", action=repr(action) - if action.isSome: - action(node) - result = fsm.state +func poll*(sm: var RaftStateMachine): RaftStateMachineOutput = + # Should initiate replication if we have new entries + let output = sm.output + sm.output = RaftStateMachineOutput() + return output \ No newline at end of file diff --git a/raft/types.nim b/raft/types.nim index d7a1896..01ead54 100644 --- a/raft/types.nim +++ b/raft/types.nim @@ -27,10 +27,9 @@ const type RaftNodeState* = enum - rnsError = 0, # Error state - rnsFollower = 1, # Follower state - rnsCandidate = 2 # Candidate state - rnsLeader = 3 # Leader state + rnsFollower = 0, # Follower state + rnsCandidate = 1 # Candidate state + rnsLeader = 2 # Leader state RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node RaftNodeTerm* = int # Raft Node Term Type diff --git a/tests/all_tests.nim b/tests/all_tests.nim index 4b83dc5..59d13d9 100644 --- a/tests/all_tests.nim +++ b/tests/all_tests.nim @@ -14,4 +14,5 @@ import ../misc/test_macro cliBuilder: import ./test_basic_timers, ./test_basic_state_machine, - ./test_basic_cluster_election \ No newline at end of file + #./test_basic_cluster_election, + ./test_consensus_state_machine \ No newline at end of file diff --git a/tests/test_consensus_state_machine.nim b/tests/test_consensus_state_machine.nim index d42a342..f6e7796 100644 --- a/tests/test_consensus_state_machine.nim +++ b/tests/test_consensus_state_machine.nim @@ -9,16 +9,34 @@ import unittest2 import ../raft/types -import basic_state_machine import ../raft/consensus_state_machine +import std/[times] -var - csm: ConsensusFsm[RaftNodeState, EventType, RaftNode[SmCommand, SmState], RaftMessageBase[SmCommand, SmState]] +proc consensusstatemachineMain*() = + -suite "Create and test Consensus State Machine": + suite "Basic state machine tests": + test "create state machine": + let config = RaftStateMachineConfig() + let sm = initRaftStateMachine(config) + echo sm - test "Create Consensus State Machine": - csm = ConsensusFsm[RaftNodeState, EventType, RaftNode[SmCommand, SmState], RaftMessageBase[SmCommand, SmState]].new - check csm != nil + test "advance empty state machine": + var sm = RaftStateMachine() + var msg = sm.createVoteRequest() + sm.advance(msg ,getTime()) + echo sm.poll() + echo sm.poll() + echo getTime() - # test "Add state transitions / logic conditions": + test "two machines": + var sm = RaftStateMachine() + var sm2 = RaftStateMachine(myId: genUUID()) + var msg = sm2.createVoteRequest() + sm.advance(msg ,getTime()) + echo sm2 + echo getTime() + + +if isMainModule: + consensusstatemachineMain() \ No newline at end of file