From 31a3ae1078baa518a5a3e2203486d19d557a3a36 Mon Sep 17 00:00:00 2001 From: Raycho Mukelov Date: Sun, 3 Sep 2023 01:59:35 +0300 Subject: [PATCH] Fix some problems with function imports etc. Started implementing the consensus algo. --- raft/consensus_module.nim | 14 +++------- raft/raft_api.nim | 54 ++++++++++++++++++++++++++++----------- raft/types.nim | 1 + 3 files changed, 43 insertions(+), 26 deletions(-) diff --git a/raft/consensus_module.nim b/raft/consensus_module.nim index 8ffc929..70cd498 100644 --- a/raft/consensus_module.nim +++ b/raft/consensus_module.nim @@ -7,8 +7,7 @@ # This file may not be copied, modified, or distributed except according to # those terms. -import protocol -import types +import types, protocol proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = discard @@ -25,15 +24,8 @@ proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, proc RaftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], cmd: SmCommandType) = discard -# Private Timers Create Ops -proc RaftNodeScheduleHeartBeat[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = +proc RaftNodeScheduleElectionTimeOut*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = discard -proc RaftNodeScheduleHeartBeatTimeout[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = - discard - -proc RaftNodeScheduleElectionTimeOut[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = - discard - -proc RaftNodeScheduleRequestVoteTimeout[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = +proc RaftNodeScheduleRequestVotesCleanUpTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = discard \ No newline at end of file diff --git a/raft/raft_api.nim b/raft/raft_api.nim index bb2151d..fbe7a02 100644 --- a/raft/raft_api.nim +++ b/raft/raft_api.nim @@ -16,6 +16,8 @@ import consensus_module export types, protocol, consensus_module proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) +proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) +proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) # Raft Node Public API procedures / functions proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType]; # Create New Raft Node @@ -41,22 +43,13 @@ proc RaftNodeLoad*[SmCommandType, SmStateType]( msgSendCallback: RaftMessageSendCallback): Result[RaftNode[SmCommandType, SmStateType], string] = discard -proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = - discard - -proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = - if node.state != rnsFollower: - raiseAssert "Something's wrong - Follower state expected at start!" - - debugEcho "StartNode: ", node.id - -func RaftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId {.gcsafe.} = # Get Raft Node ID +proc RaftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId {.gcsafe.} = # Get Raft Node ID result = node.id -func RaftNodeStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeState = # Get Raft Node State +proc RaftNodeStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeState = # Get Raft Node State node.state -func RaftNodeTermGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeTerm = # Get Raft Node Term +proc RaftNodeTermGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeTerm = # Get Raft Node Term node.currentTerm func RaftNodePeersGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodePeers = # Get Raft Node Peers @@ -107,8 +100,39 @@ proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) = discard -proc RaftNodeLogIndexGet[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex = +proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex = discard -proc RaftNodeLogEntryGet[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): Result[RaftNodeLogEntry[SmCommandType], string] = - discard \ No newline at end of file +proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): Result[RaftNodeLogEntry[SmCommandType], string] = + discard + +# Timers scheduling stuff etc. +proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = + var fut = sleepAsync(node.heartBeatTimeout) + fut.callback = proc () = RaftNodeSendHeartBeat(node) + +proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = + node.heartBeatTimeoutTimer = sleepAsync(node.heartBeatTimeout) + node.heartBeatTimeoutTimer.callback = proc() = + node.state = rnsCandidate # Transition to candidate state and initiate new Election + RaftNodeStartElection(node) + +proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = + for raftPeer in node.peers: + let msgHrtBt = RaftMessageAppendEntries( + senderId: node.id, receiverId: raftPeer.id, + senderTerm: RaftNodeTermGet(node), commitIndex: node.commitIndex, + prevLogIndex: RaftNodeLogIndexGet(node) - 1, prevLogTerm: if RaftNodeLogIndexGet(node) > 0: RaftNodeLogEntry(node, RaftNodeLogIndexGet(node) - 1).term else: 0 + ) + asyncCheck node.msgSendCallback(msgHrtBt) + RaftNodeScheduleHeartBeat(node) + +# Raft Node Control +proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = + discard + +proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = + if node.state != rnsFollower: + raiseAssert "Something's wrong - Follower state expected at start!" + RaftNodeScheduleHeartBeatTimeout(node) + debugEcho "StartNode: ", node.id \ No newline at end of file diff --git a/raft/types.nim b/raft/types.nim index e9f1440..8d44d74 100644 --- a/raft/types.nim +++ b/raft/types.nim @@ -121,6 +121,7 @@ type requestVotesTimer: Future[void] heartBeatTimer: Future[void] + heartBeatTimeoutTimer: Future[void] appendEntriesTimer: Future[void] # Mtx definition(s) go here