This commit is contained in:
Raycho Mukelov 2023-10-20 01:36:40 +03:00
parent 14d49d5737
commit dfaf67f9ef
3 changed files with 10 additions and 4 deletions

View File

@ -172,3 +172,7 @@ proc raftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCo
)
node.replicateFuts.add(node.msgSendCallback(msg))
node.commitIndex.inc
raftNodeApplyLogEntry(node, raftNodeLogEntryGet(node, node.commitIndex)) # Apply to state machine

View File

@ -18,6 +18,7 @@ type
BasicRaftCluster* = ref object
nodes*: Table[RaftNodeId, BasicRaftNode]
nodesLock*: RLock
proc basicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
@ -29,9 +30,10 @@ proc basicRaftClusterStart*(cluster: BasicRaftCluster) =
proc basicRaftClusterGetLeaderId*(cluster: BasicRaftCluster): UUID =
result = DefaultUUID
for id, node in cluster.nodes:
if raftNodeIsLeader(node):
return raftNodeIdGet(node)
withRLock(cluster.nodesLock):
for id, node in cluster.nodes:
if raftNodeIsLeader(node):
return raftNodeIdGet(node)
proc basicRaftClusterClientRequest*(cluster: BasicRaftCluster, req: RaftNodeClientRequest): Future[RaftNodeClientResponse] {.async.} =
case req.op:

View File

@ -20,7 +20,7 @@ proc basicClusterElectionMain*() =
test "Basic Raft Cluster Init (5 nodes)":
for i in 0..4:
nodesIds[i] = genUUID()
cluster = basicRaftClusterInit(nodesIds, 5, 5, 20, 1)
cluster = basicRaftClusterInit(nodesIds, 150, 150, 20, 20)
check cluster != nil
test "Start Basic Raft Cluster and wait it to converge for a 2 seconds interval (Elect a Leader)":