Change Lock to RLock
This commit is contained in:
parent
3380c83bde
commit
ca4041e4e6
|
@ -16,7 +16,7 @@ import chronicles
|
|||
|
||||
proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool =
|
||||
result = false
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
var cnt = 0
|
||||
for peer in node.peers:
|
||||
if peer.hasVoted:
|
||||
|
@ -25,11 +25,11 @@ proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
|
|||
result = true
|
||||
|
||||
proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
|
||||
withLock(node.raftStateMutex):
|
||||
debug "Raft Node started election. Node ID: ", node_id=node.id
|
||||
withRLock(node.raftStateMutex):
|
||||
node.currentTerm.inc
|
||||
node.state = rnsCandidate
|
||||
node.votedFor = node.id
|
||||
debug "Raft Node started election", node_id=node.id, state=node.state, voted_for=node.votedFor
|
||||
|
||||
for peer in node.peers:
|
||||
peer.hasVoted = false
|
||||
|
@ -45,35 +45,32 @@ proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommand
|
|||
for voteFut in node.votesFuts:
|
||||
let r = await voteFut
|
||||
let respVote = RaftMessageRequestVoteResponse(r)
|
||||
debug "Received vote", node_id=node.id, sender_id=respVote.senderId, granted=respVote.granted
|
||||
|
||||
debugEcho "r: ", repr(r)
|
||||
debug "voteFut.finished", voteFut_finished=voteFut.finished
|
||||
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
for p in node.peers:
|
||||
debug "voteFut: ", Response=repr(r)
|
||||
debug "senderId: ", sender_id=respVote.senderId
|
||||
debug "granted: ", granted=respVote.granted
|
||||
if p.id == respVote.senderId:
|
||||
p.hasVoted = respVote.granted
|
||||
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
while node.votesFuts.len > 0:
|
||||
discard node.votesFuts.pop
|
||||
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
if node.state == rnsCandidate:
|
||||
if RaftNodeQuorumMin(node):
|
||||
debug "Raft Node transition to leader", node_id=node.id
|
||||
node.state = rnsLeader # Transition to leader and send Heart-Beat to establish this node as the cluster leader
|
||||
RaftNodeSendHeartBeat(node)
|
||||
asyncSpawn RaftNodeSendHeartBeat(node)
|
||||
else:
|
||||
asyncSpawn RaftNodeStartElection(node)
|
||||
|
||||
proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse =
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
result = RaftMessageRequestVoteResponse(msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false)
|
||||
if node.state != rnsCandidate and node.state != rnsStopped and msg.senderTerm > node.currentTerm:
|
||||
if node.state != rnsCandidate and node.state != rnsStopped and msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID:
|
||||
if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term:
|
||||
node.votedFor = msg.senderId
|
||||
result.granted = true
|
||||
|
||||
proc RaftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] =
|
||||
|
|
|
@ -25,7 +25,8 @@ export
|
|||
|
||||
# Forward declarations
|
||||
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.}
|
||||
proc RaftNodeScheduleElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.}
|
||||
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
|
||||
proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): Future[void] {.async.}
|
||||
proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
|
||||
|
@ -48,7 +49,7 @@ proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateTyp
|
|||
)
|
||||
|
||||
RaftNodeSmInit(result.stateMachine)
|
||||
initLock(result.raftStateMutex)
|
||||
initRLock(result.raftStateMutex)
|
||||
|
||||
proc RaftNodeLoad*[SmCommandType, SmStateType](
|
||||
persistentStorage: RaftNodePersistentStorage, # Load Raft Node From Storage
|
||||
|
@ -56,32 +57,36 @@ proc RaftNodeLoad*[SmCommandType, SmStateType](
|
|||
discard
|
||||
|
||||
proc RaftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId {.gcsafe.} = # Get Raft Node ID
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
result = node.id
|
||||
|
||||
proc RaftNodeStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeState = # Get Raft Node State
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
result = node.state
|
||||
|
||||
proc RaftNodeTermGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeTerm = # Get Raft Node Term
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
result = node.currentTerm
|
||||
|
||||
func RaftNodePeersGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodePeers = # Get Raft Node Peers
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
result = node.peers
|
||||
|
||||
func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool = # Check if Raft Node is Leader
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
result = node.state == rnsLeader
|
||||
|
||||
# Deliver Raft Message to the Raft Node and dispatch it
|
||||
proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] =
|
||||
debug "Received heart-beat", node_id=node.id, sender_id=msg.sender_id, node_current_term=node.currentTerm, sender_term=msg.senderTerm
|
||||
result = RaftMessageAppendEntriesResponse[SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false)
|
||||
if msg.senderTerm >= node.currentTerm:
|
||||
result.success = true
|
||||
RaftNodeCancelAllTimers(node)
|
||||
RaftNodeAbortElection(node)
|
||||
withRLock(node.raftStateMutex):
|
||||
if msg.senderTerm >= node.currentTerm:
|
||||
RaftNodeAbortElection(node)
|
||||
RaftNodeCancelAllTimers(node)
|
||||
result.success = true
|
||||
node.votedFor = DefaultUUID
|
||||
node.currentLeaderId = msg.senderId
|
||||
|
||||
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
|
||||
case raftMessage.op
|
||||
|
@ -128,9 +133,10 @@ template RaftTimerCreate(timerInterval: int, timerCallback: RaftTimerCallback):
|
|||
|
||||
# Timers scheduling stuff etc.
|
||||
proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): Future[void] {.async.} =
|
||||
node.heartBeatTimer = RaftTimerCreate(150, proc() = RaftNodeSendHeartBeat(node))
|
||||
node.heartBeatTimer = RaftTimerCreate(150, proc() = asyncSpawn RaftNodeSendHeartBeat(node))
|
||||
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
|
||||
debug "Raft Node sending Heart-Beat to peers", node_id=node.id
|
||||
for raftPeer in node.peers:
|
||||
let msgHrtBt = RaftMessageAppendEntries[SmCommandType](
|
||||
op: rmoAppendLogEntry, senderId: node.id, receiverId: raftPeer.id,
|
||||
|
@ -138,38 +144,43 @@ proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommand
|
|||
prevLogIndex: RaftNodeLogIndexGet(node) - 1, prevLogTerm: if RaftNodeLogIndexGet(node) > 0: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node) - 1).term else: 0
|
||||
)
|
||||
discard node.msgSendCallback(msgHrtBt)
|
||||
debug "Sent Heart-Beat", sender=node.id, to=raftPeer.id
|
||||
asyncSpawn RaftNodeScheduleHeartBeat(node)
|
||||
|
||||
proc RaftNodeScheduleHeartBeatTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): Future[void] {.async.} =
|
||||
node.heartBeatTimeoutTimer = RaftTimerCreate(180, proc =
|
||||
withLock(node.raftStateMutex):
|
||||
node.heartBeatTimeoutTimer = RaftTimerCreate(200, proc =
|
||||
withRLock(node.raftStateMutex):
|
||||
node.state = rnsCandidate # Transition to candidate state and initiate new Election
|
||||
asyncSpawn RaftNodeStartElection(node)
|
||||
)
|
||||
|
||||
# Raft Node Control
|
||||
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
node.state = rnsFollower
|
||||
for fut in node.votesFuts:
|
||||
waitFor cancelAndWait(fut)
|
||||
asyncSpawn RaftNodeScheduleHeartBeatTimeout(node)
|
||||
|
||||
proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
withLock(node.raftStateMutex):
|
||||
waitFor cancelAndWait(node.requestVotesTimer)
|
||||
waitFor cancelAndWait(node.heartBeatTimer)
|
||||
waitFor cancelAndWait(node.heartBeatTimeoutTimer)
|
||||
waitFor cancelAndWait(node.appendEntriesTimer)
|
||||
withRLock(node.raftStateMutex):
|
||||
if node.requestVotesTimer != nil:
|
||||
waitFor cancelAndWait(node.requestVotesTimer)
|
||||
if node.heartBeatTimer != nil:
|
||||
waitFor cancelAndWait(node.heartBeatTimer)
|
||||
if node.heartBeatTimeoutTimer != nil:
|
||||
waitFor cancelAndWait(node.heartBeatTimeoutTimer)
|
||||
if node.appendEntriesTimer != nil:
|
||||
waitFor cancelAndWait(node.appendEntriesTimer)
|
||||
|
||||
proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
# Try to stop gracefully
|
||||
withLock(node.raftStateMutex):
|
||||
withRLock(node.raftStateMutex):
|
||||
node.state = rnsStopped
|
||||
# Cancel pending timers (if any)
|
||||
RaftNodeCancelAllTimers(node)
|
||||
|
||||
proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
debug "Start Raft Node with ID: ", nodeid=node.id
|
||||
node.state = rnsFollower
|
||||
debug "Start Raft Node", node_id=node.id, state=node.state
|
||||
asyncSpawn RaftNodeScheduleHeartBeatTimeout(node)
|
||||
|
|
|
@ -9,7 +9,7 @@
|
|||
|
||||
# Raft Node Public Types
|
||||
|
||||
import std/locks
|
||||
import std/rlocks
|
||||
import options
|
||||
import stew/results
|
||||
import uuids
|
||||
|
@ -18,7 +18,7 @@ import chronos
|
|||
export
|
||||
results,
|
||||
options,
|
||||
locks,
|
||||
rlocks,
|
||||
uuids,
|
||||
chronos
|
||||
|
||||
|
@ -136,7 +136,7 @@ type
|
|||
appendEntriesTimer*: Future[void]
|
||||
|
||||
# Mtx definition(s) go here
|
||||
raftStateMutex*: Lock
|
||||
raftStateMutex*: RLock
|
||||
|
||||
# Misc
|
||||
msgSendCallback*: RaftMessageSendCallback
|
||||
|
|
Loading…
Reference in New Issue