Fix function names

This commit is contained in:
Raycho Mukelov 2023-10-13 07:24:35 +03:00
parent 6b9ab9a18b
commit ca099c6132
12 changed files with 204 additions and 134 deletions

View File

@ -11,36 +11,49 @@ import chronos
template awaitWithTimeout*[T](operation: Future[T],
deadline: Future[void],
onTimeout: untyped): T =
let f = operation
body: untyped): T =
let f {.inject.} = operation
await f or deadline
if not f.finished:
# If we don't wait for for the cancellation here, it's possible that
# the "next" operation will run concurrently to this one, messing up
# the order of operations (since await/async is not fair)
await cancelAndWait(f)
onTimeout
else:
f.read
body
template awaitWithTimeout*[T](operation: Future[T],
timeout: Duration,
onTimeout: untyped): T =
awaitWithTimeout(operation, sleepAsync(timeout), onTimeout)
# template awaitWithTimeout*[T](operation: Future[T],
# deadline: Future[void],
# onTimeout: untyped): T =
# let f = operation
# await f or deadline
# if not f.finished:
# # If we don't wait for for the cancellation here, it's possible that
# # the "next" operation will run concurrently to this one, messing up
# # the order of operations (since await/async is not fair)
# await cancelAndWait(f)
# onTimeout
# else:
# f.read
template awaitWithTimeout*(operation: Future[void],
deadline: Future[void],
onTimeout: untyped) =
let f = operation
await f or deadline
if not f.finished:
# If we don't wait for for the cancellation here, it's possible that
# the "next" operation will run concurrently to this one, messing up
# the order of operations (since await/async is not fair)
await cancelAndWait(f)
onTimeout
# template awaitWithTimeout*[T](operation: Future[T],
# timeout: Duration,
# onTimeout: untyped): T =
# awaitWithTimeout(operation, sleepAsync(timeout), onTimeout)
template awaitWithTimeout*(operation: Future[void],
timeout: Duration,
onTimeout: untyped) =
awaitWithTimeout(operation, sleepAsync(timeout), onTimeout)
# template awaitWithTimeout*(operation: Future[void],
# deadline: Future[void],
# onTimeout: untyped) =
# let f = operation
# await f or deadline
# if not f.finished:
# # If we don't wait for for the cancellation here, it's possible that
# # the "next" operation will run concurrently to this one, messing up
# # the order of operations (since await/async is not fair)
# await cancelAndWait(f)
# onTimeout
# template awaitWithTimeout*(operation: Future[void],
# timeout: Duration,
# onTimeout: untyped) =
# awaitWithTimeout(operation, sleepAsync(timeout), onTimeout)

View File

@ -13,7 +13,7 @@ import log_ops
import chronicles
import async_util
proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool =
proc raftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool =
result = false
var cnt = 0
for peer in node.peers:
@ -22,7 +22,7 @@ proc RaftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType,
if cnt >= (node.peers.len div 2 + node.peers.len mod 2):
result = true
proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
proc raftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
RaftMessageResponse[SmCommandType, SmStateType] =
debug "Received heart-beat", node_id=node.id, sender_id=msg.sender_id, node_current_term=node.currentTerm, sender_term=msg.senderTerm
result = RaftMessageResponse[SmCommandType, SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false)
@ -31,16 +31,16 @@ proc RaftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmComma
return
if msg.senderTerm >= node.currentTerm:
RaftNodeCancelTimers(node)
raftNodeCancelTimers(node)
if node.state == rnsCandidate:
RaftNodeAbortElection(node)
raftNodeAbortElection(node)
result.success = true
node.currentTerm = msg.senderTerm
node.votedFor = DefaultUUID
node.currentLeaderId = msg.senderId
RaftNodeScheduleElectionTimeout(node)
raftNodeScheduleElectionTimeout(node)
proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
proc raftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
RaftMessageResponse[SmCommandType, SmStateType] =
result = RaftMessageResponse[SmCommandType, SmStateType](op: rmoRequestVote, msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false)
withRLock(node.raftStateMutex):
@ -48,24 +48,24 @@ proc RaftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCom
return
if msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID:
if msg.lastLogTerm > RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term or
(msg.lastLogTerm == RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term and msg.lastLogIndex >= RaftNodeLogIndexGet(node)):
if msg.lastLogTerm > raftNodeLogEntryGet(node, raftNodeLogIndexGet(node)).term or
(msg.lastLogTerm == raftNodeLogEntryGet(node, raftNodeLogIndexGet(node)).term and msg.lastLogIndex >= raftNodeLogIndexGet(node)):
if node.electionTimeoutTimer != nil:
asyncSpawn cancelAndWait(node.electionTimeoutTimer)
node.votedFor = msg.senderId
node.currentLeaderId = DefaultUUID
result.granted = true
RaftNodeScheduleElectionTimeout(node)
raftNodeScheduleElectionTimeout(node)
proc RaftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
proc raftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
withRLock(node.raftStateMutex):
node.state = rnsFollower
for fut in node.votesFuts:
waitFor cancelAndWait(fut)
proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
mixin RaftNodeScheduleElectionTimeout, RaftTimerCreate
RaftNodeScheduleElectionTimeout(node)
proc raftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
mixin raftNodeScheduleElectionTimeout, raftTimerCreate
raftNodeScheduleElectionTimeout(node)
withRLock(node.raftStateMutex):
while node.votesFuts.len > 0:
@ -80,40 +80,87 @@ proc RaftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommand
node.votesFuts.add(node.msgSendCallback(
RaftMessage[SmCommandType, SmStateType](
op: rmoRequestVote, msgId: genUUID(), senderId: node.id,
receiverId: peer.id, lastLogTerm: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node)).term,
lastLogIndex: RaftNodeLogIndexGet(node), senderTerm: node.currentTerm)
receiverId: peer.id, lastLogTerm: raftNodeLogEntryGet(node, raftNodeLogIndexGet(node)).term,
lastLogIndex: raftNodeLogIndexGet(node), senderTerm: node.currentTerm)
)
)
withRLock(node.raftStateMutex):
# Process votes (if any)
for voteFut in node.votesFuts:
try:
await voteFut or RaftTimerCreate(node.votingTimeout, proc()=discard)
if not voteFut.finished:
await cancelAndWait(voteFut)
else:
if not voteFut.cancelled:
let respVote = RaftMessageResponse[SmCommandType, SmStateType](voteFut.read)
debug "Received vote", node_id=node.id, sender_id=respVote.senderId, granted=respVote.granted
awaitWithTimeout(voteFut, raftTimerCreate(node.votingTimeout, proc()=debug "Raft Node voting timeout", node_id=node.id)):
let respVote = RaftMessageResponse[SmCommandType, SmStateType](f.read)
debug "Received vote", node_id=node.id, sender_id=respVote.senderId, granted=respVote.granted
for p in node.peers:
if p.id == respVote.senderId:
p.hasVoted = respVote.granted
except Exception as e:
discard
for p in node.peers:
if p.id == respVote.senderId:
p.hasVoted = respVote.granted
withRLock(node.raftStateMutex):
if node.state == rnsCandidate:
if RaftNodeQuorumMin(node):
if raftNodeQuorumMin(node):
await cancelAndWait(node.electionTimeoutTimer)
debug "Raft Node transition to leader", node_id=node.id
node.state = rnsLeader # Transition to leader state and send Heart-Beat to establish this node as the cluster leader
asyncSpawn RaftNodeSendHeartBeat(node)
asyncSpawn raftNodeSendHeartBeat(node)
proc RaftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
proc raftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
RaftMessageResponse[SmCommandType, SmStateType] =
discard
result = RaftMessageResponse[SmCommandType, SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false)
withRLock(node.raftStateMutex):
if node.state == rnsStopped:
return
proc RaftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], cmd: SmCommandType) =
discard
if msg.senderTerm >= node.currentTerm:
raftNodeCancelTimers(node)
if node.state == rnsCandidate:
raftNodeAbortElection(node)
node.currentTerm = msg.senderTerm
node.votedFor = DefaultUUID
node.state = rnsFollower
node.currentLeaderId = msg.senderId
raftNodeScheduleElectionTimeout(node)
if msg.senderTerm < node.currentTerm:
return
if msg.prevLogIndex > raftNodeLogIndexGet(node):
return
if msg.prevLogIndex == raftNodeLogIndexGet(node):
if msg.prevLogTerm != raftNodeLogEntryGet(node, raftNodeLogIndexGet(node)).term:
return
if msg.prevLogIndex < raftNodeLogIndexGet(node):
if msg.prevLogTerm != raftNodeLogEntryGet(node, msg.prevLogIndex).term:
raftNodeLogTruncate(node, msg.prevLogIndex)
return
if msg.entries.len > 0:
for entry in msg.entries:
raftNodeLogAppend(node, entry)
if msg.commitIndex > node.commitIndex:
node.commitIndex = min(msg.commitIndex, raftNodeLogIndexGet(node))
raftNodeApplyLogEntry(node, raftNodeLogEntryGet(node, node.commitIndex))
result.success = true
proc raftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], cmd: SmCommandType) =
mixin RaftLogEntry, raftTimerCreate
withRLock(node.raftStateMutex):
var
logEntry: RaftLogEntry[SmCommandType](term: node.currentTerm, data: cmd, entryType: etData)
raftNodeLogAppend(node, logEntry)
for peer in node.peers:
var
msg: RaftMessage[SmCommandType, SmStateType] = RaftMessage[SmCommandType, SmStateType](
op: rmoAppendLogEntry, msgId: genUUID(), senderId: node.id, receiverId: peer.id,
senderTerm: node.currentTerm, prevLogIndex: raftNodeLogIndexGet(node),
prevLogTerm: raftNodeLogEntryGet(node, raftNodeLogIndexGet(node)).term,
commitIndex: node.commitIndex, entries: @[logEntry]
)
node.replicateFuts.add(node.msgSendCallback(msg))

View File

@ -10,15 +10,20 @@
import types
# Private Log Ops
proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
proc raftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
len(node.log.logData)
proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): RaftNodeLogEntry[SmCommandType] =
proc raftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): RaftNodeLogEntry[SmCommandType] =
if logIndex > 0:
result = node.log.logData[logIndex]
proc RaftNodeLogAppend*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
discard
proc raftNodeLogAppend*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
node.log.logData.add(logEntry)
proc RaftNodeLogTruncate*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) =
discard
proc raftNodeLogTruncate*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) =
node.log.logData.truncate(truncateIndex)
proc raftNodeApplyLogEntry*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
mixin raftNodeSmApply
raftNodeSmApply(node.stateMachine, logEntry.command)

View File

@ -23,7 +23,7 @@ export
chronicles
# Forward declarations
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
proc raftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
# Raft Node Public API
proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType];
@ -32,7 +32,7 @@ proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateTyp
msgSendCallback: RaftMessageSendCallback;
electionTimeout: int=150;
heartBeatTimeout: int=150;
appendEntriesTimeout: int=50;
appendEntriesTimeout: int=30;
votingTimeout: int=20
): T =
var
@ -48,127 +48,131 @@ proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateTyp
votingTimeout: votingTimeout
)
RaftNodeSmInit(result.stateMachine)
raftNodeSmInit(result.stateMachine)
initRLock(result.raftStateMutex)
proc RaftNodeLoad*[SmCommandType, SmStateType](
proc raftNodeLoad*[SmCommandType, SmStateType](
persistentStorage: RaftNodePersistentStorage, # Load Raft Node From Storage
msgSendCallback: RaftMessageSendCallback): Result[RaftNode[SmCommandType, SmStateType], string] =
discard
proc RaftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId {.gcsafe.} = # Get Raft Node ID
proc raftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId {.gcsafe.} = # Get Raft Node ID
withRLock(node.raftStateMutex):
result = node.id
proc RaftNodeStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeState = # Get Raft Node State
proc raftNodeStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeState = # Get Raft Node State
withRLock(node.raftStateMutex):
result = node.state
proc RaftNodeTermGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeTerm = # Get Raft Node Term
proc raftNodeTermGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeTerm = # Get Raft Node Term
withRLock(node.raftStateMutex):
result = node.currentTerm
func RaftNodePeersGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodePeers = # Get Raft Node Peers
func raftNodePeersGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodePeers = # Get Raft Node Peers
withRLock(node.raftStateMutex):
result = node.peers
func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool = # Check if Raft Node is Leader
func raftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool = # Check if Raft Node is Leader
withRLock(node.raftStateMutex):
result = node.state == rnsLeader
# Deliver Raft Message to the Raft Node and dispatch it
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase[SmCommandType, SmStateType]):
proc raftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase[SmCommandType, SmStateType]):
Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
var
rm = RaftMessage[SmCommandType, SmStateType](raftMessage)
case rm.op # Dispatch different Raft Message types based on the operation code
of rmoRequestVote:
result = RaftNodeHandleRequestVote(node, rm)
result = raftNodeHandleRequestVote(node, rm)
of rmoAppendLogEntry:
if rm.logEntries.isSome:
result = RaftNodeHandleAppendEntries(node, rm)
result = raftNodeHandleAppendEntries(node, rm)
else:
result = RaftNodeHandleHeartBeat(node, rm)
result = raftNodeHandleHeartBeat(node, rm)
else: discard
# Process Raft Node Client Requests
proc RaftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]):
proc raftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]):
Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} =
case req.op
of rncroExecSmCommand:
# TODO: implemenmt command handling
discard
of rncroRequestSmState:
if RaftNodeIsLeader(node):
return RaftNodeClientResponse(nodeId: node.id, error: rncreSuccess, state: RaftNodeStateGet(node))
if raftNodeIsLeader(node):
return RaftNodeClientResponse(nodeId: node.id, error: rncreSuccess, state: raftNodeStateGet(node))
else:
return RaftNodeClientResponse(nodeId: node.id, error: rncreNotLeader, currentLeaderId: node.currentLeaderId)
else:
raiseAssert "Unknown client request operation."
# Abstract State Machine Ops
func RaftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType =
func raftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType =
withRLock(node.raftStateMutex):
node.stateMachine.state
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) =
mixin RaftSmInit
RaftSmInit(stateMachine)
proc raftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) =
mixin raftSmInit
proc RaftNodeSmApply[SmCommandType, SmStateType](stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType], command: SmCommandType) =
mixin RaftSmApply
withRLock(node.raftStateMutex):
RaftSmApply(stateMachine, command)
raftSmInit(stateMachine)
proc raftNodeSmApply[SmCommandType, SmStateType](stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType], command: SmCommandType) =
mixin raftSmApply
withRLock(node.raftStateMutex):
raftSmApply(stateMachine, command)
# Private Abstract Timer creation
template RaftTimerCreate*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] =
mixin RaftTimerCreateCustomImpl
RaftTimerCreateCustomImpl(timerInterval, timerCallback)
template raftTimerCreate*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] =
mixin raftTimerCreateCustomImpl
raftTimerCreateCustomImpl(timerInterval, timerCallback)
# Timers scheduling stuff etc.
proc RaftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
proc raftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
withRLock(node.raftStateMutex):
node.heartBeatTimer = RaftTimerCreate(node.heartBeatTimeout, proc() = asyncSpawn RaftNodeSendHeartBeat(node))
node.heartBeatTimer = raftTimerCreate(node.heartBeatTimeout, proc() = asyncSpawn raftNodeSendHeartBeat(node))
proc RaftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
proc raftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
debug "Raft Node sending Heart-Beat to peers", node_id=node.id
for raftPeer in node.peers:
let msgHrtBt = RaftMessage[SmCommandType, SmStateType](
op: rmoAppendLogEntry, senderId: node.id, receiverId: raftPeer.id,
senderTerm: RaftNodeTermGet(node), commitIndex: node.commitIndex,
prevLogIndex: RaftNodeLogIndexGet(node) - 1, prevLogTerm: if RaftNodeLogIndexGet(node) > 0: RaftNodeLogEntryGet(node, RaftNodeLogIndexGet(node) - 1).term else: 0
senderTerm: raftNodeTermGet(node), commitIndex: node.commitIndex,
prevLogIndex: raftNodeLogIndexGet(node) - 1, prevLogTerm: if raftNodeLogIndexGet(node) > 0: raftNodeLogEntryGet(node, raftNodeLogIndexGet(node) - 1).term else: 0
)
discard node.msgSendCallback(msgHrtBt)
RaftNodeScheduleHeartBeat(node)
raftNodeScheduleHeartBeat(node)
proc RaftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
proc raftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
withRLock(node.raftStateMutex):
node.electionTimeoutTimer = RaftTimerCreate(node.electionTimeout + rand(node.electionTimeout), proc =
asyncSpawn RaftNodeStartElection(node)
node.electionTimeoutTimer = raftTimerCreate(node.electionTimeout + rand(node.electionTimeout), proc =
asyncSpawn raftNodeStartElection(node)
)
# Raft Node Control
proc RaftNodeCancelTimers*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
proc raftNodeCancelTimers*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
withRLock(node.raftStateMutex):
if node.heartBeatTimer != nil:
asyncSpawn cancelAndWait(node.heartBeatTimer)
if node.electionTimeoutTimer != nil:
asyncSpawn cancelAndWait(node.electionTimeoutTimer )
proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
proc raftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
# Try to stop gracefully
withRLock(node.raftStateMutex):
# Abort election if in election
if node.state == rnsCandidate:
RaftNodeAbortElection(node)s
raftNodeAbortElection(node)s
node.state = rnsStopped
# Cancel pending timers (if any)
RaftNodeCancelTimers(node)
raftNodeCancelTimers(node)
proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
proc raftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
randomize()
withRLock(node.raftStateMutex):
node.state = rnsFollower
debug "Start Raft Node", node_id=node.id, state=node.state
RaftNodeScheduleElectionTimeout(node)
raftNodeScheduleElectionTimeout(node)

View File

@ -124,6 +124,7 @@ type
RaftNode*[SmCommandType, SmStateType] = ref object
# Timers
votesFuts*: seq[Future[RaftMessageResponseBase[SmCommandType, SmStateType]]]
replicateFuts*: seq[Future[RaftMessageResponseBase[SmCommandType, SmStateType]]]
electionTimeout*: int
heartBeatTimeout*: int

View File

@ -19,37 +19,37 @@ type
BasicRaftCluster* = ref object
nodes*: Table[RaftNodeId, BasicRaftNode]
proc BasicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc basicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
result = await cluster.nodes[msg.receiverId].RaftNodeMessageDeliver(msg)
result = await cluster.nodes[msg.receiverId].raftNodeMessageDeliver(msg)
proc BasicRaftClusterStart*(cluster: BasicRaftCluster) =
proc basicRaftClusterStart*(cluster: BasicRaftCluster) =
for id, node in cluster.nodes:
RaftNodeStart(node)
raftNodeStart(node)
proc BasicRaftClusterGetLeaderId*(cluster: BasicRaftCluster): UUID =
proc basicRaftClusterGetLeaderId*(cluster: BasicRaftCluster): UUID =
result = DefaultUUID
for id, node in cluster.nodes:
if RaftNodeIsLeader(node):
return RaftNodeIdGet(node)
if raftNodeIsLeader(node):
return raftNodeIdGet(node)
proc BasicRaftClusterClientRequest*(cluster: BasicRaftCluster, req: RaftNodeClientRequest): Future[RaftNodeClientResponse] {.async.} =
proc basicRaftClusterClientRequest*(cluster: BasicRaftCluster, req: RaftNodeClientRequest): Future[RaftNodeClientResponse] {.async.} =
case req.op:
of rncroRequestSmState:
var
nodeId = cluster.nodesIds[BasicRaftClusterGetLeaderId(cluster)]
nodeId = cluster.nodesIds[basicRaftClusterGetLeaderId(cluster)]
result = await cluster.nodes[nodeId].RaftNodeServeClientRequest(req)
result = await cluster.nodes[nodeId].raftNodeServeClientRequest(req)
of rncroExecSmCommand:
discard
proc BasicRaftClusterInit*(nodesIds: seq[RaftNodeId]): BasicRaftCluster =
proc basicRaftClusterInit*(nodesIds: seq[RaftNodeId], electionTimeout=5, heartBeatTimeout=5): BasicRaftCluster =
new(result)
for nodeId in nodesIds:
var
peersIds = nodesIds
peersIds.del(peersIds.find(nodeId))
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, BasicRaftClusterRaftMessageSendCallbackCreate[SmCommand, SmState](result), electionTimeout=50, heartBeatTimeout=50)
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, basicRaftClusterRaftMessageSendCallbackCreate[SmCommand, SmState](result), electionTimeout, heartBeatTimeout)

View File

@ -26,11 +26,11 @@ type
RaftBasicSm* = RaftNodeStateMachine[SmCommand, SmState]
proc RaftSmInit*(stateMachine: var RaftBasicSm) =
proc raftSmInit*(stateMachine: var RaftBasicSm) =
new(stateMachine)
new(stateMachine.state)
proc RaftSmApply*(stateMachine: RaftBasicSm, command: SmCommand) =
proc raftSmApply*(stateMachine: RaftBasicSm, command: SmCommand) =
case command.cmd:
of scSet:
stateMachine.state[command.key] = command.val

View File

@ -11,7 +11,7 @@ import ../raft/raft_api
export raft_api
proc RaftTimerCreateCustomImpl*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] {.async.} =
proc raftTimerCreateCustomImpl*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] {.async.} =
var f = sleepAsync(milliseconds(timerInterval))
await f
if f.finished and not f.cancelled:

View File

@ -28,7 +28,7 @@ proc loadConfig(): RaftPeersConfContainer =
conf.add(RaftPeerConf(id: parseUUID(n["id"].getStr), host: n["host"].getStr, port: n["port"].getInt))
result = conf
proc RaftPipesRead[SmCommandType, SmStateType](node: BasicRaftNode, port: int) =
proc raftPipesRead[SmCommandType, SmStateType](node: BasicRaftNode, port: int) =
var
fifoRead = fmt"RAFTNODERECEIVEMSGPIPE{port}"
fifoWrite = fmt"RAFTNODESENDMSGRESPPIPE{port}"
@ -44,14 +44,14 @@ proc RaftPipesRead[SmCommandType, SmStateType](node: BasicRaftNode, port: int) =
debug "Received Req: ", req=repr(xx)
var
r = waitFor RaftNodeMessageDeliver(node, xx)
r = waitFor raftNodeMessageDeliver(node, xx)
resp = RaftMessageResponse[SmCommandType, SmStateType](r)
rs = MsgStream.init()
rs.pack(resp)
fwFD.write(rs.data)
proc TestRaftMessageSendCallbackCreate[SmCommandType, SmStateType](conf: RaftPeersConfContainer): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc testRaftMessageSendCallbackCreate[SmCommandType, SmStateType](conf: RaftPeersConfContainer): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
var
host: string
@ -94,10 +94,10 @@ proc main() =
port = conf[idx].port
peersIds.del(idx)
node = BasicRaftNode.new(nodeId, peersIds, TestRaftMessageSendCallbackCreate[SmCommand, SmState](conf))
node = BasicRaftNode.new(nodeId, peersIds, testRaftMessageSendCallbackCreate[SmCommand, SmState](conf))
RaftNodeStart(node)
spawn RaftPipesRead[SmCommand, SmState](node, port)
raftNodeStart(node)
spawn raftPipesRead[SmCommand, SmState](node, port)
runForever()
if isMainModule:

View File

@ -20,23 +20,23 @@ proc basicClusterElectionMain*() =
test "Basic Raft Cluster Init (5 nodes)":
for i in 0..4:
nodesIds[i] = genUUID()
cluster = BasicRaftClusterInit(nodesIds)
cluster = basicRaftClusterInit(nodesIds, 3, 3)
check cluster != nil
test "Start Basic Raft Cluster and wait it to converge for a 2 seconds interval (Elect a Leader)":
BasicRaftClusterStart(cluster)
basicRaftClusterStart(cluster)
let dur = seconds(2)
waitFor sleepAsync(dur)
let
leaderId = BasicRaftClusterGetLeaderId(cluster)
leaderId = basicRaftClusterGetLeaderId(cluster)
check leaderId != DefaultUUID
test "Check for leader every second for a 10 second interval":
let dur = seconds(1)
for i in 0..9:
for i in 0..117:
waitFor sleepAsync(dur)
let
leaderId = BasicRaftClusterGetLeaderId(cluster)
leaderId = basicRaftClusterGetLeaderId(cluster)
check leaderId != DefaultUUID
if isMainModule:

View File

@ -18,7 +18,7 @@ proc basicStateMachineMain*() =
suite "Test Basic State Machine Implementation ":
test "Test Init":
RaftSmInit(sm)
raftSmInit(sm)
check sm != nil and sm.state != nil and sm.state.len == 0
@ -41,7 +41,7 @@ proc basicStateMachineMain*() =
test "Apply commands from the Log and check result":
for c in smCommandsLog:
RaftSmApply(sm, c)
raftSmApply(sm, c)
check sm.state[] == {"b": "b", "c": "c", "e": "e", "f": "f", "g": "g", "h": "h"}.toTable

View File

@ -38,10 +38,10 @@ proc basicTimersMain*() =
test "Create 'slow' and 'fast' timers":
for i in 0..MAX_TIMERS:
slowTimers[i] = RaftTimerCreateCustomImpl(max(SLOW_TIMERS_MIN, rand(SLOW_TIMERS_MAX)), RaftTimerCallbackCnt(slowCnt))
slowTimers[i] = raftTimerCreateCustomImpl(max(SLOW_TIMERS_MIN, rand(SLOW_TIMERS_MAX)), RaftTimerCallbackCnt(slowCnt))
for i in 0..MAX_TIMERS:
fastTimers[i] = RaftTimerCreateCustomImpl(max(FAST_TIMERS_MIN, rand(FAST_TIMERS_MAX)), RaftDummyTimerCallback)
fastTimers[i] = raftTimerCreateCustomImpl(max(FAST_TIMERS_MIN, rand(FAST_TIMERS_MAX)), RaftDummyTimerCallback)
test "Wait for and cancel 'slow' timers":
waitFor sleepAsync(WAIT_FOR_SLOW_TIMERS)