Simulate Network delay in the basic cluster tests
This commit is contained in:
parent
892d48db16
commit
98f0683654
|
@ -181,6 +181,10 @@ proc raftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCo
|
||||||
|
|
||||||
raftNodeLogAppend(node, logEntry)
|
raftNodeLogAppend(node, logEntry)
|
||||||
|
|
||||||
|
for fut in node.replicateFuts:
|
||||||
|
discard fut
|
||||||
|
node.replicateFuts.clear
|
||||||
|
|
||||||
for peer in node.peers:
|
for peer in node.peers:
|
||||||
var
|
var
|
||||||
msg: RaftMessage[SmCommandType, SmStateType] = RaftMessage[SmCommandType, SmStateType](
|
msg: RaftMessage[SmCommandType, SmStateType] = RaftMessage[SmCommandType, SmStateType](
|
||||||
|
|
|
@ -20,7 +20,8 @@ export
|
||||||
protocol,
|
protocol,
|
||||||
consensus_module,
|
consensus_module,
|
||||||
log_ops,
|
log_ops,
|
||||||
chronicles
|
chronicles,
|
||||||
|
random
|
||||||
|
|
||||||
# Forward declarations
|
# Forward declarations
|
||||||
proc raftNodeSmInit*[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
|
proc raftNodeSmInit*[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
|
||||||
|
|
|
@ -19,9 +19,11 @@ type
|
||||||
BasicRaftCluster* = ref object
|
BasicRaftCluster* = ref object
|
||||||
nodes*: Table[RaftNodeId, BasicRaftNode]
|
nodes*: Table[RaftNodeId, BasicRaftNode]
|
||||||
nodesLock*: RLock
|
nodesLock*: RLock
|
||||||
|
networkDelay*: int
|
||||||
|
|
||||||
proc basicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
|
proc basicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
|
||||||
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
|
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
|
||||||
|
await raftTimerCreate(rand(cluster.networkDelay), proc()=discard) # Simulate network delay
|
||||||
result = await cluster.nodes[msg.receiverId].raftNodeMessageDeliver(msg)
|
result = await cluster.nodes[msg.receiverId].raftNodeMessageDeliver(msg)
|
||||||
|
|
||||||
proc basicRaftClusterStart*(cluster: BasicRaftCluster) =
|
proc basicRaftClusterStart*(cluster: BasicRaftCluster) =
|
||||||
|
@ -46,7 +48,7 @@ proc basicRaftClusterClientRequest*(cluster: BasicRaftCluster, req: RaftNodeClie
|
||||||
of rncroExecSmCommand:
|
of rncroExecSmCommand:
|
||||||
discard
|
discard
|
||||||
|
|
||||||
proc basicRaftClusterInit*(nodesIds: seq[RaftNodeId], electionTimeout: int=150, heartBeatTimeout: int=150, appendEntriesRespTimeout: int=20, votingRespTimeout: int=20,
|
proc basicRaftClusterInit*(nodesIds: seq[RaftNodeId], networkDelay: int=25, electionTimeout: int=150, heartBeatTimeout: int=150, appendEntriesRespTimeout: int=20, votingRespTimeout: int=20,
|
||||||
heartBeatRespTimeout: int=10): BasicRaftCluster =
|
heartBeatRespTimeout: int=10): BasicRaftCluster =
|
||||||
new(result)
|
new(result)
|
||||||
for nodeId in nodesIds:
|
for nodeId in nodesIds:
|
||||||
|
@ -54,6 +56,7 @@ proc basicRaftClusterInit*(nodesIds: seq[RaftNodeId], electionTimeout: int=150,
|
||||||
peersIds = nodesIds
|
peersIds = nodesIds
|
||||||
|
|
||||||
peersIds.del(peersIds.find(nodeId))
|
peersIds.del(peersIds.find(nodeId))
|
||||||
|
result.networkDelay = networkDelay
|
||||||
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds,
|
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds,
|
||||||
basicRaftClusterRaftMessageSendCallbackCreate[SmCommand, SmState](result),
|
basicRaftClusterRaftMessageSendCallbackCreate[SmCommand, SmState](result),
|
||||||
electionTimeout, heartBeatTimeout, appendEntriesRespTimeout, votingRespTimeout, heartBeatRespTimeout)
|
electionTimeout, heartBeatTimeout, appendEntriesRespTimeout, votingRespTimeout, heartBeatRespTimeout)
|
||||||
|
|
|
@ -20,7 +20,7 @@ proc basicClusterElectionMain*() =
|
||||||
test "Basic Raft Cluster Init (5 nodes)":
|
test "Basic Raft Cluster Init (5 nodes)":
|
||||||
for i in 0..4:
|
for i in 0..4:
|
||||||
nodesIds[i] = genUUID()
|
nodesIds[i] = genUUID()
|
||||||
cluster = basicRaftClusterInit(nodesIds, 150, 150, 20, 20, 10)
|
cluster = basicRaftClusterInit(nodesIds, 25, 150, 150, 20, 20, 10)
|
||||||
check cluster != nil
|
check cluster != nil
|
||||||
|
|
||||||
test "Start Basic Raft Cluster and wait it to converge for a 2 seconds interval (Elect a Leader)":
|
test "Start Basic Raft Cluster and wait it to converge for a 2 seconds interval (Elect a Leader)":
|
||||||
|
|
Loading…
Reference in New Issue