Fix voting timeout handling

This commit is contained in:
Raycho Mukelov 2023-10-15 22:09:23 +03:00
parent 5bd50081da
commit be86b1d185
2 changed files with 20 additions and 11 deletions

View File

@ -61,7 +61,7 @@ proc raftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommand
withRLock(node.raftStateMutex): withRLock(node.raftStateMutex):
node.state = rnsFollower node.state = rnsFollower
for fut in node.votesFuts: for fut in node.votesFuts:
waitFor cancelAndWait(fut) asyncSpawn cancelAndWait(fut)
proc raftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} = proc raftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
mixin raftNodeScheduleElectionTimeout, raftTimerCreate mixin raftNodeScheduleElectionTimeout, raftTimerCreate
@ -86,10 +86,13 @@ proc raftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommand
) )
withRLock(node.raftStateMutex): withRLock(node.raftStateMutex):
# Wait for votes or voting timeout
await allFutures(node.votesFuts) or raftTimerCreate(node.votingTimeout, proc()=discard)
# Process votes (if any) # Process votes (if any)
for voteFut in node.votesFuts: for voteFut in node.votesFuts:
awaitWithTimeout(voteFut, raftTimerCreate(node.votingTimeout, proc()=debug "Raft Node voting timeout", node_id=node.id)): if voteFut.finished and not voteFut.cancelled:
let respVote = RaftMessageResponse[SmCommandType, SmStateType](f.read) let respVote = RaftMessageResponse[SmCommandType, SmStateType](voteFut.read)
debug "Received vote", node_id=node.id, sender_id=respVote.senderId, granted=respVote.granted debug "Received vote", node_id=node.id, sender_id=respVote.senderId, granted=respVote.granted
for p in node.peers: for p in node.peers:
@ -153,6 +156,7 @@ proc raftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCo
withRLock(node.raftStateMutex): withRLock(node.raftStateMutex):
var var
logEntry: RaftLogEntry[SmCommandType](term: node.currentTerm, data: cmd, entryType: etData) logEntry: RaftLogEntry[SmCommandType](term: node.currentTerm, data: cmd, entryType: etData)
raftNodeLogAppend(node, logEntry) raftNodeLogAppend(node, logEntry)
for peer in node.peers: for peer in node.peers:
@ -163,4 +167,5 @@ proc raftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCo
prevLogTerm: raftNodeLogEntryGet(node, raftNodeLogIndexGet(node)).term, prevLogTerm: raftNodeLogEntryGet(node, raftNodeLogIndexGet(node)).term,
commitIndex: node.commitIndex, entries: @[logEntry] commitIndex: node.commitIndex, entries: @[logEntry]
) )
node.replicateFuts.add(node.msgSendCallback(msg)) node.replicateFuts.add(node.msgSendCallback(msg))

4
raft/rlock.nim Normal file
View File

@ -0,0 +1,4 @@
import types
template withRlockAlt(l: RLock, body: untyped) =