Implement functioning elections / voting / heart-beats
This commit is contained in:
parent
ca4041e4e6
commit
615db86c52
|
@ -56,22 +56,22 @@ proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommand
|
|||
while node.votesFuts.len > 0:
|
||||
discard node.votesFuts.pop
|
||||
|
||||
withRLock(node.raftStateMutex):
|
||||
if node.state == rnsCandidate:
|
||||
if RaftNodeQuorumMin(node):
|
||||
asyncSpawn cancelAndWait(node.electionTimeoutTimer)
|
||||
debug "Raft Node transition to leader", node_id=node.id
|
||||
node.state = rnsLeader # Transition to leader and send Heart-Beat to establish this node as the cluster leader
|
||||
asyncSpawn RaftNodeSendHeartBeat(node)
|
||||
else:
|
||||
asyncSpawn RaftNodeStartElection(node)
|
||||
|
||||
proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse =
|
||||
withRLock(node.raftStateMutex):
|
||||
result = RaftMessageRequestVoteResponse(msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false)
|
||||
if node.state != rnsCandidate and node.state != rnsStopped and msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID:
|
||||
if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term:
|
||||
node.votedFor = msg.senderId
|
||||
result.granted = true
|
||||
# if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term:
|
||||
asyncSpawn cancelAndWait(node.electionTimeoutTimer)
|
||||
node.votedFor = msg.senderId
|
||||
result.granted = true
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
|
||||
proc RaftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] =
|
||||
discard
|
||||
|
|
|
@ -15,6 +15,7 @@ import consensus_module
|
|||
import log_ops
|
||||
import ../db/kvstore_mdbx
|
||||
import chronicles
|
||||
import std/random
|
||||
|
||||
export
|
||||
types,
|
||||
|
@ -26,9 +27,8 @@ export
|
|||
# Forward declarations
|
||||
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.}
|
||||
proc RaftNodeScheduleElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.}
|
||||
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
|
||||
proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): Future[void] {.async.}
|
||||
proc RaftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
|
||||
proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
|
||||
|
||||
# Raft Node Public API
|
||||
|
@ -82,11 +82,14 @@ proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmComma
|
|||
result = RaftMessageAppendEntriesResponse[SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false)
|
||||
withRLock(node.raftStateMutex):
|
||||
if msg.senderTerm >= node.currentTerm:
|
||||
RaftNodeAbortElection(node)
|
||||
RaftNodeCancelAllTimers(node)
|
||||
if node.state == rnsCandidate:
|
||||
RaftNodeAbortElection(node)
|
||||
result.success = true
|
||||
node.currentTerm = msg.senderTerm
|
||||
node.votedFor = DefaultUUID
|
||||
node.currentLeaderId = msg.senderId
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
|
||||
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
|
||||
case raftMessage.op
|
||||
|
@ -132,7 +135,7 @@ template RaftTimerCreate(timerInterval: int, timerCallback: RaftTimerCallback):
|
|||
RaftTimerCreateCustomImpl(timerInterval, timerCallback)
|
||||
|
||||
# Timers scheduling stuff etc.
|
||||
proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): Future[void] {.async.} =
|
||||
proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
node.heartBeatTimer = RaftTimerCreate(150, proc() = asyncSpawn RaftNodeSendHeartBeat(node))
|
||||
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
|
||||
|
@ -143,14 +146,13 @@ proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommand
|
|||
senderTerm: RaftNodeTermGet(node), commitIndex: node.commitIndex,
|
||||
prevLogIndex: RaftNodeLogIndexGet(node) - 1, prevLogTerm: if RaftNodeLogIndexGet(node) > 0: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node) - 1).term else: 0
|
||||
)
|
||||
discard node.msgSendCallback(msgHrtBt)
|
||||
let r = await node.msgSendCallback(msgHrtBt)
|
||||
discard r
|
||||
debug "Sent Heart-Beat", sender=node.id, to=raftPeer.id
|
||||
asyncSpawn RaftNodeScheduleHeartBeat(node)
|
||||
RaftNodeScheduleHeartBeat(node)
|
||||
|
||||
proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): Future[void] {.async.} =
|
||||
node.heartBeatTimeoutTimer = RaftTimerCreate(200, proc =
|
||||
withRLock(node.raftStateMutex):
|
||||
node.state = rnsCandidate # Transition to candidate state and initiate new Election
|
||||
proc RaftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
node.electionTimeoutTimer = RaftTimerCreate(150 + rand(150), proc =
|
||||
asyncSpawn RaftNodeStartElection(node)
|
||||
)
|
||||
|
||||
|
@ -160,16 +162,13 @@ proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommand
|
|||
node.state = rnsFollower
|
||||
for fut in node.votesFuts:
|
||||
waitFor cancelAndWait(fut)
|
||||
asyncSpawn RaftNodeScheduleHeartBeatTimeout(node)
|
||||
|
||||
proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
withRLock(node.raftStateMutex):
|
||||
if node.requestVotesTimer != nil:
|
||||
waitFor cancelAndWait(node.requestVotesTimer)
|
||||
if node.heartBeatTimer != nil:
|
||||
waitFor cancelAndWait(node.heartBeatTimer)
|
||||
if node.heartBeatTimeoutTimer != nil:
|
||||
waitFor cancelAndWait(node.heartBeatTimeoutTimer)
|
||||
if node.electionTimeoutTimer != nil:
|
||||
waitFor cancelAndWait(node.electionTimeoutTimer )
|
||||
if node.appendEntriesTimer != nil:
|
||||
waitFor cancelAndWait(node.appendEntriesTimer)
|
||||
|
||||
|
@ -183,4 +182,4 @@ proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmS
|
|||
proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
node.state = rnsFollower
|
||||
debug "Start Raft Node", node_id=node.id, state=node.state
|
||||
asyncSpawn RaftNodeScheduleHeartBeatTimeout(node)
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
|
|
|
@ -130,9 +130,8 @@ type
|
|||
heartBeatTimeout*: int
|
||||
appendEntriesTimeout*: int
|
||||
|
||||
requestVotesTimer*: Future[void]
|
||||
heartBeatTimer*: Future[void]
|
||||
heartBeatTimeoutTimer*: Future[void]
|
||||
electionTimeoutTimer*: Future[void]
|
||||
appendEntriesTimer*: Future[void]
|
||||
|
||||
# Mtx definition(s) go here
|
||||
|
|
|
@ -9,7 +9,6 @@
|
|||
|
||||
import unittest2
|
||||
import basic_cluster
|
||||
import std/times
|
||||
|
||||
proc basicClusterMain*() =
|
||||
var
|
||||
|
@ -23,17 +22,14 @@ proc basicClusterMain*() =
|
|||
nodesIds[i] = genUUID()
|
||||
|
||||
cluster = BasicRaftClusterInit(nodesIds)
|
||||
# check size(cluster.nodes) == 5
|
||||
|
||||
test "Generate Random Client SmCommands Queue":
|
||||
discard
|
||||
|
||||
test "Start Basic Raft Cluster And wait it to converge (Elect a Leader)":
|
||||
BasicRaftClusterStart(cluster)
|
||||
var
|
||||
dur: times.Duration
|
||||
dur = initDuration(seconds = 5, milliseconds = 100)
|
||||
waitFor sleepAsync(500)
|
||||
let dur = seconds(5)
|
||||
waitFor sleepAsync(dur)
|
||||
|
||||
test "Simulate Basic Raft Cluster Client SmCommands Execution / Log Replication":
|
||||
discard
|
||||
|
|
Loading…
Reference in New Issue