Fix some problems with function imports etc. Started implementing the consensus algo.

This commit is contained in:
Raycho Mukelov 2023-09-03 01:59:35 +03:00
parent 949c594c6a
commit 31a3ae1078
3 changed files with 43 additions and 26 deletions

View File

@ -7,8 +7,7 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
import protocol
import types
import types, protocol
proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard
@ -25,15 +24,8 @@ proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
proc RaftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], cmd: SmCommandType) =
discard
# Private Timers Create Ops
proc RaftNodeScheduleHeartBeat[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
proc RaftNodeScheduleElectionTimeOut*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard
proc RaftNodeScheduleHeartBeatTimeout[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard
proc RaftNodeScheduleElectionTimeOut[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard
proc RaftNodeScheduleRequestVoteTimeout[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
proc RaftNodeScheduleRequestVotesCleanUpTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard

View File

@ -16,6 +16,8 @@ import consensus_module
export types, protocol, consensus_module
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
# Raft Node Public API procedures / functions
proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType]; # Create New Raft Node
@ -41,22 +43,13 @@ proc RaftNodeLoad*[SmCommandType, SmStateType](
msgSendCallback: RaftMessageSendCallback): Result[RaftNode[SmCommandType, SmStateType], string] =
discard
proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard
proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
if node.state != rnsFollower:
raiseAssert "Something's wrong - Follower state expected at start!"
debugEcho "StartNode: ", node.id
func RaftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId {.gcsafe.} = # Get Raft Node ID
proc RaftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId {.gcsafe.} = # Get Raft Node ID
result = node.id
func RaftNodeStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeState = # Get Raft Node State
proc RaftNodeStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeState = # Get Raft Node State
node.state
func RaftNodeTermGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeTerm = # Get Raft Node Term
proc RaftNodeTermGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeTerm = # Get Raft Node Term
node.currentTerm
func RaftNodePeersGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodePeers = # Get Raft Node Peers
@ -107,8 +100,39 @@ proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) =
discard
proc RaftNodeLogIndexGet[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
discard
proc RaftNodeLogEntryGet[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): Result[RaftNodeLogEntry[SmCommandType], string] =
discard
proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): Result[RaftNodeLogEntry[SmCommandType], string] =
discard
# Timers scheduling stuff etc.
proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
var fut = sleepAsync(node.heartBeatTimeout)
fut.callback = proc () = RaftNodeSendHeartBeat(node)
proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
node.heartBeatTimeoutTimer = sleepAsync(node.heartBeatTimeout)
node.heartBeatTimeoutTimer.callback = proc() =
node.state = rnsCandidate # Transition to candidate state and initiate new Election
RaftNodeStartElection(node)
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
for raftPeer in node.peers:
let msgHrtBt = RaftMessageAppendEntries(
senderId: node.id, receiverId: raftPeer.id,
senderTerm: RaftNodeTermGet(node), commitIndex: node.commitIndex,
prevLogIndex: RaftNodeLogIndexGet(node) - 1, prevLogTerm: if RaftNodeLogIndexGet(node) > 0: RaftNodeLogEntry(node, RaftNodeLogIndexGet(node) - 1).term else: 0
)
asyncCheck node.msgSendCallback(msgHrtBt)
RaftNodeScheduleHeartBeat(node)
# Raft Node Control
proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard
proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
if node.state != rnsFollower:
raiseAssert "Something's wrong - Follower state expected at start!"
RaftNodeScheduleHeartBeatTimeout(node)
debugEcho "StartNode: ", node.id

View File

@ -121,6 +121,7 @@ type
requestVotesTimer: Future[void]
heartBeatTimer: Future[void]
heartBeatTimeoutTimer: Future[void]
appendEntriesTimer: Future[void]
# Mtx definition(s) go here