Get rid of useless forward declarations + more refactoring
This commit is contained in:
parent
615db86c52
commit
d11190bc5d
|
@ -7,8 +7,6 @@
|
|||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
{.hint[XDeclaredButNotUsed]: off.}
|
||||
|
||||
import types
|
||||
import protocol
|
||||
import log_ops
|
||||
|
@ -24,6 +22,36 @@ proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
|
|||
if cnt >= (node.peers.len div 2 + 1):
|
||||
result = true
|
||||
|
||||
proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] =
|
||||
debug "Received heart-beat", node_id=node.id, sender_id=msg.sender_id, node_current_term=node.currentTerm, sender_term=msg.senderTerm
|
||||
result = RaftMessageAppendEntriesResponse[SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false)
|
||||
withRLock(node.raftStateMutex):
|
||||
if msg.senderTerm >= node.currentTerm:
|
||||
RaftNodeCancelAllTimers(node)
|
||||
if node.state == rnsCandidate:
|
||||
RaftNodeAbortElection(node)
|
||||
result.success = true
|
||||
node.currentTerm = msg.senderTerm
|
||||
node.votedFor = DefaultUUID
|
||||
node.currentLeaderId = msg.senderId
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
|
||||
proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse =
|
||||
withRLock(node.raftStateMutex):
|
||||
result = RaftMessageRequestVoteResponse(msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false)
|
||||
if node.state != rnsCandidate and node.state != rnsStopped and msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID:
|
||||
# if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term:
|
||||
asyncSpawn cancelAndWait(node.electionTimeoutTimer)
|
||||
node.votedFor = msg.senderId
|
||||
result.granted = true
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
|
||||
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
withRLock(node.raftStateMutex):
|
||||
node.state = rnsFollower
|
||||
for fut in node.votesFuts:
|
||||
waitFor cancelAndWait(fut)
|
||||
|
||||
proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
|
||||
withRLock(node.raftStateMutex):
|
||||
node.currentTerm.inc
|
||||
|
@ -60,27 +88,11 @@ proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommand
|
|||
if RaftNodeQuorumMin(node):
|
||||
asyncSpawn cancelAndWait(node.electionTimeoutTimer)
|
||||
debug "Raft Node transition to leader", node_id=node.id
|
||||
node.state = rnsLeader # Transition to leader and send Heart-Beat to establish this node as the cluster leader
|
||||
node.state = rnsLeader # Transition to leader state and send Heart-Beat to establish this node as the cluster leader
|
||||
asyncSpawn RaftNodeSendHeartBeat(node)
|
||||
|
||||
proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse =
|
||||
withRLock(node.raftStateMutex):
|
||||
result = RaftMessageRequestVoteResponse(msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false)
|
||||
if node.state != rnsCandidate and node.state != rnsStopped and msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID:
|
||||
# if msg.lastLogIndex >= RaftNodeLogIndexGet(node) and msg.lastLogTerm >= RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term:
|
||||
asyncSpawn cancelAndWait(node.electionTimeoutTimer)
|
||||
node.votedFor = msg.senderId
|
||||
result.granted = true
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
|
||||
proc RaftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] =
|
||||
discard
|
||||
|
||||
proc RaftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], cmd: SmCommandType) =
|
||||
discard
|
||||
|
||||
proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
|
||||
discard
|
||||
|
||||
proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) =
|
||||
discard
|
|
@ -14,4 +14,10 @@ proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandTy
|
|||
discard
|
||||
|
||||
proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): RaftNodeLogEntry[SmCommandType] =
|
||||
discard
|
||||
|
||||
proc RaftNodeLogAppend*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
|
||||
discard
|
||||
|
||||
proc RaftNodeLogTruncate*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) =
|
||||
discard
|
|
@ -7,8 +7,6 @@
|
|||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
{.hint[XDeclaredButNotUsed]: off.}
|
||||
|
||||
import types
|
||||
import protocol
|
||||
import consensus_module
|
||||
|
@ -26,10 +24,6 @@ export
|
|||
|
||||
# Forward declarations
|
||||
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.}
|
||||
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
|
||||
proc RaftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
|
||||
proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType])
|
||||
|
||||
# Raft Node Public API
|
||||
proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType]; # Create New Raft Node
|
||||
|
@ -77,20 +71,6 @@ func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
|
|||
result = node.state == rnsLeader
|
||||
|
||||
# Deliver Raft Message to the Raft Node and dispatch it
|
||||
proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse[SmStateType] =
|
||||
debug "Received heart-beat", node_id=node.id, sender_id=msg.sender_id, node_current_term=node.currentTerm, sender_term=msg.senderTerm
|
||||
result = RaftMessageAppendEntriesResponse[SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false)
|
||||
withRLock(node.raftStateMutex):
|
||||
if msg.senderTerm >= node.currentTerm:
|
||||
RaftNodeCancelAllTimers(node)
|
||||
if node.state == rnsCandidate:
|
||||
RaftNodeAbortElection(node)
|
||||
result.success = true
|
||||
node.currentTerm = msg.senderTerm
|
||||
node.votedFor = DefaultUUID
|
||||
node.currentLeaderId = msg.senderId
|
||||
RaftNodeScheduleElectionTimeout(node)
|
||||
|
||||
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): Future[RaftMessageResponseBase] {.async, gcsafe.} =
|
||||
case raftMessage.op
|
||||
of rmoRequestVote: # Dispatch different Raft Message types based on the operation code
|
||||
|
@ -103,7 +83,7 @@ proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmComman
|
|||
result = RaftNodeHandleHeartBeat(node, appendMsg)
|
||||
else: discard
|
||||
|
||||
# Process RaftNodeClientRequests
|
||||
# Process Raft Node Client Requests
|
||||
proc RaftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} =
|
||||
case req.op
|
||||
of rncroExecSmCommand:
|
||||
|
@ -119,7 +99,8 @@ proc RaftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCo
|
|||
|
||||
# Abstract State Machine Ops
|
||||
func RaftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType =
|
||||
node.stateMachine.state
|
||||
withRLock(node.raftStateMutex):
|
||||
node.stateMachine.state
|
||||
|
||||
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) =
|
||||
mixin RaftSmInit
|
||||
|
@ -127,7 +108,8 @@ proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateM
|
|||
|
||||
proc RaftNodeSmApply[SmCommandType, SmStateType](stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType], command: SmCommandType) =
|
||||
mixin RaftSmApply
|
||||
RaftSmApply(stateMachine, command)
|
||||
withRLock(node.raftStateMutex):
|
||||
RaftSmApply(stateMachine, command)
|
||||
|
||||
# Private Abstract Timer creation
|
||||
template RaftTimerCreate(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] =
|
||||
|
@ -136,7 +118,7 @@ template RaftTimerCreate(timerInterval: int, timerCallback: RaftTimerCallback):
|
|||
|
||||
# Timers scheduling stuff etc.
|
||||
proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
node.heartBeatTimer = RaftTimerCreate(150, proc() = asyncSpawn RaftNodeSendHeartBeat(node))
|
||||
node.heartBeatTimer = RaftTimerCreate(130, proc() = asyncSpawn RaftNodeSendHeartBeat(node))
|
||||
|
||||
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
|
||||
debug "Raft Node sending Heart-Beat to peers", node_id=node.id
|
||||
|
@ -157,13 +139,7 @@ proc RaftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode
|
|||
)
|
||||
|
||||
# Raft Node Control
|
||||
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
withRLock(node.raftStateMutex):
|
||||
node.state = rnsFollower
|
||||
for fut in node.votesFuts:
|
||||
waitFor cancelAndWait(fut)
|
||||
|
||||
proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
proc RaftNodeCancelAllTimers*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
withRLock(node.raftStateMutex):
|
||||
if node.heartBeatTimer != nil:
|
||||
waitFor cancelAndWait(node.heartBeatTimer)
|
||||
|
@ -175,6 +151,9 @@ proc RaftNodeCancelAllTimers[SmCommandType, SmStateType](node: RaftNode[SmComman
|
|||
proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
|
||||
# Try to stop gracefully
|
||||
withRLock(node.raftStateMutex):
|
||||
# Abort election if in election
|
||||
if node.state == rnsCandidate:
|
||||
RaftNodeAbortElection(node)s
|
||||
node.state = rnsStopped
|
||||
# Cancel pending timers (if any)
|
||||
RaftNodeCancelAllTimers(node)
|
||||
|
|
|
@ -12,7 +12,7 @@ import ../raft/raft_api
|
|||
export raft_api
|
||||
|
||||
proc RaftTimerCreateCustomImpl*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] {.async, nimcall, gcsafe.} =
|
||||
var f = sleepAsync(timerInterval)
|
||||
var f = sleepAsync(milliseconds(timerInterval))
|
||||
await f
|
||||
if not f.cancelled:
|
||||
timerCallback()
|
|
@ -28,7 +28,7 @@ proc basicClusterMain*() =
|
|||
|
||||
test "Start Basic Raft Cluster And wait it to converge (Elect a Leader)":
|
||||
BasicRaftClusterStart(cluster)
|
||||
let dur = seconds(5)
|
||||
let dur = seconds(60)
|
||||
waitFor sleepAsync(dur)
|
||||
|
||||
test "Simulate Basic Raft Cluster Client SmCommands Execution / Log Replication":
|
||||
|
|
Loading…
Reference in New Issue