diff --git a/raft/consensus_module.nim b/raft/consensus_module.nim index 78231c2..acd9b36 100644 --- a/raft/consensus_module.nim +++ b/raft/consensus_module.nim @@ -7,8 +7,6 @@ # This file may not be copied, modified, or distributed except according to # those terms. -{.hint[XDeclaredButNotUsed]: off.} - import types import protocol import log_ops @@ -24,6 +22,36 @@ proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, if cnt >= (node.peers.len div 2 + 1): result = true +proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] = + debug "Received heart-beat", node_id=node.id, sender_id=msg.sender_id, node_current_term=node.currentTerm, sender_term=msg.senderTerm + result = RaftMessageAppendEntriesResponse[SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false) + withRLock(node.raftStateMutex): + if msg.senderTerm >= node.currentTerm: + RaftNodeCancelAllTimers(node) + if node.state == rnsCandidate: + RaftNodeAbortElection(node) + result.success = true + node.currentTerm = msg.senderTerm + node.votedFor = DefaultUUID + node.currentLeaderId = msg.senderId + RaftNodeScheduleElectionTimeout(node) + +proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse = + withRLock(node.raftStateMutex): + result = RaftMessageRequestVoteResponse(msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false) + if node.state != rnsCandidate and node.state != rnsStopped and msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID: + # if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term: + asyncSpawn cancelAndWait(node.electionTimeoutTimer) + node.votedFor = msg.senderId + result.granted = true + RaftNodeScheduleElectionTimeout(node) + +proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = + withRLock(node.raftStateMutex): + node.state = rnsFollower + for fut in node.votesFuts: + waitFor cancelAndWait(fut) + proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} = withRLock(node.raftStateMutex): node.currentTerm.inc @@ -60,27 +88,11 @@ proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommand if RaftNodeQuorumMin(node): asyncSpawn cancelAndWait(node.electionTimeoutTimer) debug "Raft Node transition to leader", node_id=node.id - node.state = rnsLeader # Transition to leader and send Heart-Beat to establish this node as the cluster leader + node.state = rnsLeader # Transition to leader state and send Heart-Beat to establish this node as the cluster leader asyncSpawn RaftNodeSendHeartBeat(node) -proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse = - withRLock(node.raftStateMutex): - result = RaftMessageRequestVoteResponse(msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false) - if node.state != rnsCandidate and node.state != rnsStopped and msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID: - # if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term: - asyncSpawn cancelAndWait(node.electionTimeoutTimer) - node.votedFor = msg.senderId - result.granted = true - RaftNodeScheduleElectionTimeout(node) - proc RaftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] = discard proc RaftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], cmd: SmCommandType) = - discard - -proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) = - discard - -proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) = discard \ No newline at end of file diff --git a/raft/log_ops.nim b/raft/log_ops.nim index 45aff78..933a369 100644 --- a/raft/log_ops.nim +++ b/raft/log_ops.nim @@ -14,4 +14,10 @@ proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandTy discard proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): RaftNodeLogEntry[SmCommandType] = + discard + +proc RaftNodeLogAppend*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) = + discard + +proc RaftNodeLogTruncate*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) = discard \ No newline at end of file diff --git a/raft/raft_api.nim b/raft/raft_api.nim index b49e735..253325d 100644 --- a/raft/raft_api.nim +++ b/raft/raft_api.nim @@ -7,8 +7,6 @@ # This file may not be copied, modified, or distributed except according to # those terms. -{.hint[XDeclaredButNotUsed]: off.} - import types import protocol import consensus_module @@ -26,10 +24,6 @@ export # Forward declarations proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) -proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} -proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) -proc RaftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) -proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) # Raft Node Public API proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType]; # Create New Raft Node @@ -77,20 +71,6 @@ func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, result = node.state == rnsLeader # Deliver Raft Message to the Raft Node and dispatch it -proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] = - debug "Received heart-beat", node_id=node.id, sender_id=msg.sender_id, node_current_term=node.currentTerm, sender_term=msg.senderTerm - result = RaftMessageAppendEntriesResponse[SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false) - withRLock(node.raftStateMutex): - if msg.senderTerm >= node.currentTerm: - RaftNodeCancelAllTimers(node) - if node.state == rnsCandidate: - RaftNodeAbortElection(node) - result.success = true - node.currentTerm = msg.senderTerm - node.votedFor = DefaultUUID - node.currentLeaderId = msg.senderId - RaftNodeScheduleElectionTimeout(node) - proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} = case raftMessage.op of rmoRequestVote: # Dispatch different Raft Message types based on the operation code @@ -103,7 +83,7 @@ proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmComman result = RaftNodeHandleHeartBeat(node, appendMsg) else: discard -# Process RaftNodeClientRequests +# Process Raft Node Client Requests proc RaftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} = case req.op of rncroExecSmCommand: @@ -119,7 +99,8 @@ proc RaftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCo # Abstract State Machine Ops func RaftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType = - node.stateMachine.state + withRLock(node.raftStateMutex): + node.stateMachine.state proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) = mixin RaftSmInit @@ -127,7 +108,8 @@ proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateM proc RaftNodeSmApply[SmCommandType, SmStateType](stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType], command: SmCommandType) = mixin RaftSmApply - RaftSmApply(stateMachine, command) + withRLock(node.raftStateMutex): + RaftSmApply(stateMachine, command) # Private Abstract Timer creation template RaftTimerCreate(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] = @@ -136,7 +118,7 @@ template RaftTimerCreate(timerInterval: int, timerCallback: RaftTimerCallback): # Timers scheduling stuff etc. proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = - node.heartBeatTimer = RaftTimerCreate(150, proc() = asyncSpawn RaftNodeSendHeartBeat(node)) + node.heartBeatTimer = RaftTimerCreate(130, proc() = asyncSpawn RaftNodeSendHeartBeat(node)) proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} = debug "Raft Node sending Heart-Beat to peers", node_id=node.id @@ -157,13 +139,7 @@ proc RaftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode ) # Raft Node Control -proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = - withRLock(node.raftStateMutex): - node.state = rnsFollower - for fut in node.votesFuts: - waitFor cancelAndWait(fut) - -proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = +proc RaftNodeCancelAllTimers*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = withRLock(node.raftStateMutex): if node.heartBeatTimer != nil: waitFor cancelAndWait(node.heartBeatTimer) @@ -175,6 +151,9 @@ proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmComman proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = # Try to stop gracefully withRLock(node.raftStateMutex): + # Abort election if in election + if node.state == rnsCandidate: + RaftNodeAbortElection(node)s node.state = rnsStopped # Cancel pending timers (if any) RaftNodeCancelAllTimers(node) diff --git a/tests/basic_timers.nim b/tests/basic_timers.nim index 367f649..5f29765 100644 --- a/tests/basic_timers.nim +++ b/tests/basic_timers.nim @@ -12,7 +12,7 @@ import ../raft/raft_api export raft_api proc RaftTimerCreateCustomImpl*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] {.async, nimcall, gcsafe.} = - var f = sleepAsync(timerInterval) + var f = sleepAsync(milliseconds(timerInterval)) await f if not f.cancelled: timerCallback() \ No newline at end of file diff --git a/tests/test_basic_cluster.nim b/tests/test_basic_cluster.nim index a655ac0..3e42c18 100644 --- a/tests/test_basic_cluster.nim +++ b/tests/test_basic_cluster.nim @@ -28,7 +28,7 @@ proc basicClusterMain*() = test "Start Basic Raft Cluster And wait it to converge (Elect a Leader)": BasicRaftClusterStart(cluster) - let dur = seconds(5) + let dur = seconds(60) waitFor sleepAsync(dur) test "Simulate Basic Raft Cluster Client SmCommands Execution / Log Replication":