fixes
This commit is contained in:
parent
117d59ea2f
commit
ba1fdafdd4
|
@ -19,7 +19,7 @@ proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
|
|||
for peer in node.peers:
|
||||
if peer.hasVoted:
|
||||
cnt.inc
|
||||
if cnt >= (node.peers.len div 2 + 1):
|
||||
if cnt >= (node.peers.len div 2 + node.peers.len mod 2):
|
||||
result = true
|
||||
|
||||
proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
|
||||
|
@ -31,7 +31,7 @@ proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmComma
|
|||
return
|
||||
|
||||
if msg.senderTerm >= node.currentTerm:
|
||||
RaftNodeCancelAllTimers(node)
|
||||
RaftNodeCancelTimers(node)
|
||||
if node.state == rnsCandidate:
|
||||
RaftNodeAbortElection(node)
|
||||
result.success = true
|
||||
|
@ -53,6 +53,7 @@ proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCom
|
|||
if node.electionTimeoutTimer != nil:
|
||||
asyncSpawn cancelAndWait(node.electionTimeoutTimer)
|
||||
node.votedFor = msg.senderId
|
||||
node.currentLeaderId = DefaultUUID
|
||||
result.granted = true
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
|
||||
|
@ -108,7 +109,7 @@ proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommand
|
|||
await cancelAndWait(node.electionTimeoutTimer)
|
||||
debug "Raft Node transition to leader", node_id=node.id
|
||||
node.state = rnsLeader # Transition to leader state and send Heart-Beat to establish this node as the cluster leader
|
||||
RaftNodeSendHeartBeat(node)
|
||||
asyncSpawn RaftNodeSendHeartBeat(node)
|
||||
|
||||
proc RaftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
|
||||
RaftMessageResponse[SmCommandType, SmStateType] =
|
||||
|
|
|
@ -33,7 +33,7 @@ proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateTyp
|
|||
electionTimeout: int=150;
|
||||
heartBeatTimeout: int=150;
|
||||
appendEntriesTimeout: int=50;
|
||||
votingTimeout: int=50
|
||||
votingTimeout: int=20
|
||||
): T =
|
||||
var
|
||||
peers: RaftNodePeers
|
||||
|
@ -128,9 +128,10 @@ template RaftTimerCreate*(timerInterval: int, timerCallback: RaftTimerCallback):
|
|||
|
||||
# Timers scheduling stuff etc.
|
||||
proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
node.heartBeatTimer = RaftTimerCreate(node.heartBeatTimeout, proc() = RaftNodeSendHeartBeat(node))
|
||||
withRLock(node.raftStateMutex):
|
||||
node.heartBeatTimer = RaftTimerCreate(node.heartBeatTimeout, proc() = asyncSpawn RaftNodeSendHeartBeat(node))
|
||||
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
|
||||
debug "Raft Node sending Heart-Beat to peers", node_id=node.id
|
||||
for raftPeer in node.peers:
|
||||
let msgHrtBt = RaftMessage[SmCommandType, SmStateType](
|
||||
|
@ -138,24 +139,22 @@ proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommand
|
|||
senderTerm: RaftNodeTermGet(node), commitIndex: node.commitIndex,
|
||||
prevLogIndex: RaftNodeLogIndexGet(node) - 1, prevLogTerm: if RaftNodeLogIndexGet(node) > 0: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node) - 1).term else: 0
|
||||
)
|
||||
let r = node.msgSendCallback(msgHrtBt)
|
||||
discard r
|
||||
discard node.msgSendCallback(msgHrtBt)
|
||||
RaftNodeScheduleHeartBeat(node)
|
||||
|
||||
proc RaftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
node.electionTimeoutTimer = RaftTimerCreate(node.electionTimeout + rand(node.electionTimeout), proc =
|
||||
asyncSpawn RaftNodeStartElection(node)
|
||||
)
|
||||
withRLock(node.raftStateMutex):
|
||||
node.electionTimeoutTimer = RaftTimerCreate(node.electionTimeout + rand(node.electionTimeout), proc =
|
||||
asyncSpawn RaftNodeStartElection(node)
|
||||
)
|
||||
|
||||
# Raft Node Control
|
||||
proc RaftNodeCancelAllTimers*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
proc RaftNodeCancelTimers*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
withRLock(node.raftStateMutex):
|
||||
if node.heartBeatTimer != nil:
|
||||
asyncSpawn cancelAndWait(node.heartBeatTimer)
|
||||
if node.electionTimeoutTimer != nil:
|
||||
asyncSpawn cancelAndWait(node.electionTimeoutTimer )
|
||||
if node.appendEntriesTimer != nil:
|
||||
asyncSpawn cancelAndWait(node.appendEntriesTimer)
|
||||
|
||||
proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
# Try to stop gracefully
|
||||
|
@ -164,10 +163,12 @@ proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmS
|
|||
if node.state == rnsCandidate:
|
||||
RaftNodeAbortElection(node)s
|
||||
node.state = rnsStopped
|
||||
# Cancel pending timers (if any)
|
||||
RaftNodeCancelAllTimers(node)
|
||||
# Cancel pending timers (if any)
|
||||
RaftNodeCancelTimers(node)
|
||||
|
||||
proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
node.state = rnsFollower
|
||||
debug "Start Raft Node", node_id=node.id, state=node.state
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
randomize()
|
||||
withRLock(node.raftStateMutex):
|
||||
node.state = rnsFollower
|
||||
debug "Start Raft Node", node_id=node.id, state=node.state
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
|
|
|
@ -45,6 +45,7 @@ type
|
|||
# (initialized to 0, increases monotonically)
|
||||
hasVoted*: bool # Indicates if this peer have voted for this Raft Node During Election
|
||||
canVote*: bool # Indicates if this peer can vote
|
||||
appendEntriesTimer*: Future[void]
|
||||
|
||||
RaftNodePeers* = seq[RaftNodePeer] # List of Raft Node Peers
|
||||
|
||||
|
@ -131,7 +132,6 @@ type
|
|||
|
||||
heartBeatTimer*: Future[void]
|
||||
electionTimeoutTimer*: Future[void]
|
||||
appendEntriesTimer*: Future[void]
|
||||
|
||||
# Mtx definition(s) go here
|
||||
raftStateMutex*: RLock
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
import basic_timers
|
||||
import basic_state_machine
|
||||
import std/tables
|
||||
import std/random
|
||||
|
||||
export raft_api
|
||||
|
||||
|
@ -46,12 +45,11 @@ proc BasicRaftClusterClientRequest*(cluster: BasicRaftCluster, req: RaftNodeClie
|
|||
discard
|
||||
|
||||
proc BasicRaftClusterInit*(nodesIds: seq[RaftNodeId]): BasicRaftCluster =
|
||||
randomize()
|
||||
new(result)
|
||||
for nodeId in nodesIds:
|
||||
var
|
||||
peersIds = nodesIds
|
||||
|
||||
peersIds.del(peersIds.find(nodeId))
|
||||
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate[SmCommand, SmState](result), electionTimeout=150, heartBeatTimeout=150)
|
||||
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate[SmCommand, SmState](result), electionTimeout=50, heartBeatTimeout=50)
|
||||
|
||||
|
|
|
@ -11,7 +11,7 @@ import ../raft/raft_api
|
|||
|
||||
export raft_api
|
||||
|
||||
proc RaftTimerCreateCustomImpl*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] {.async, nimcall, gcsafe.} =
|
||||
proc RaftTimerCreateCustomImpl*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] {.async.} =
|
||||
var f = sleepAsync(milliseconds(timerInterval))
|
||||
await f
|
||||
if f.finished and not f.cancelled:
|
||||
|
|
Loading…
Reference in New Issue