This commit is contained in:
Raycho Mukelov 2023-10-28 20:13:37 +03:00
parent 7509976cdd
commit 27458fe0e1
5 changed files with 20 additions and 11 deletions

View File

@ -126,6 +126,9 @@ proc raftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommand
if p.id == respVote.senderId: if p.id == respVote.senderId:
p.hasVoted = respVote.granted p.hasVoted = respVote.granted
else:
await cancelAndWait(voteFut)
withRLock(node.raftStateMutex): withRLock(node.raftStateMutex):
if node.state == rnsCandidate: if node.state == rnsCandidate:
if raftNodeQuorumMin(node): if raftNodeQuorumMin(node):

View File

@ -18,7 +18,7 @@ type
# Define loose conditions computed from our NodeType # Define loose conditions computed from our NodeType
Condition*[NodeType] = proc(node: NodeType): bool Condition*[NodeType] = proc(node: NodeType): bool
# Define Terminals as a tuple of a Event and (Hash) Table of sequences of (loose) conditions and their respective values computed from NodeType (Truth Table) # Define Terminals as a tuple of a Event and (Hash) Table of sequences of (loose) conditions and their respective values computed from NodeType (Truth Table)
TerminalSymbol*[NodeType, EventType] = (Table[EventType, (seq[Condition[NodeType]], seq[bool])]) TerminalSymbol*[NodeType, EventType] = (EventType, (seq[Condition[NodeType]], seq[bool]))
# Define State Transition Rules LUT of the form ( NonTerminal -> Terminal ) -> NonTerminal ) # Define State Transition Rules LUT of the form ( NonTerminal -> Terminal ) -> NonTerminal )
StateTransitionsRulesLUT*[NodeType, EventType, NodeStates] = Table[ StateTransitionsRulesLUT*[NodeType, EventType, NodeStates] = Table[
(NonTerminalSymbol[NodeType, NodeStates], TerminalSymbol[NodeType, EventType]), (NonTerminalSymbol[NodeType, NodeStates], TerminalSymbol[NodeType, EventType]),
@ -43,10 +43,12 @@ proc new*[NodeType, EventType, NodeStates](T: type ConsensusFSM[NodeType, EventT
proc computeFSMInputRobustLogic[NodeType, EventType](node: NodeType, event: EventType, rawInput: TerminalSymbol[NodeType, EventType]): proc computeFSMInputRobustLogic[NodeType, EventType](node: NodeType, event: EventType, rawInput: TerminalSymbol[NodeType, EventType]):
TerminalSymbol[NodeType, EventType] = TerminalSymbol[NodeType, EventType] =
var var
robustLogicEventTerminal = rawInput[event] robustLogicEventTerminal = rawInput[1]
for f, v in robustLogicEventTerminal:
v = f(node) let f = robustLogicEventTerminal[0]
rawInput[event] = robustLogicEventTerminal
robustLogicEventTerminal[1] = f(node)
rawInput[1] = robustLogicEventTerminal
result = rawInput result = rawInput
proc consensusFSMAdvance[NodeType, EventType, NodeStates](fsm: ConsensusFSM[NodeType, EventType, NodeStates], node: NodeType, event: EventType, proc consensusFSMAdvance[NodeType, EventType, NodeStates](fsm: ConsensusFSM[NodeType, EventType, NodeStates], node: NodeType, event: EventType,
@ -54,5 +56,6 @@ proc consensusFSMAdvance[NodeType, EventType, NodeStates](fsm: ConsensusFSM[Node
withRLock(): withRLock():
var var
input = computeFSMInputRobustLogic(node, event, rawInput) input = computeFSMInputRobustLogic(node, event, rawInput)
fsm.state = fsm.stateTransitionsLUT[fsm.state, input]
fsm.state = fsm.stateTransitionsLUT[(fsm.state, input)]
result = fsm.state result = fsm.state

View File

@ -167,6 +167,9 @@ proc raftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommand
if resp.success: if resp.success:
successCnt.inc successCnt.inc
else:
await cancelAndWait(fut)
if successCnt >= (node.peers.len div 2 + node.peers.len mod 2): if successCnt >= (node.peers.len div 2 + node.peers.len mod 2):
node.hrtBtSuccess = true node.hrtBtSuccess = true

View File

@ -19,11 +19,11 @@ type
BasicRaftCluster* = ref object BasicRaftCluster* = ref object
nodes*: Table[RaftNodeId, BasicRaftNode] nodes*: Table[RaftNodeId, BasicRaftNode]
nodesLock*: RLock nodesLock*: RLock
networkDelay*: int networkDelayJitter*: int
proc basicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] = proc basicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} = proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
await raftTimerCreate(rand(cluster.networkDelay), proc()=discard) # Simulate network delay await raftTimerCreate(rand(cluster.networkDelayJitter), proc()=discard) # Simulate network delay
result = await cluster.nodes[msg.receiverId].raftNodeMessageDeliver(msg) result = await cluster.nodes[msg.receiverId].raftNodeMessageDeliver(msg)
proc basicRaftClusterStart*(cluster: BasicRaftCluster) = proc basicRaftClusterStart*(cluster: BasicRaftCluster) =
@ -48,7 +48,7 @@ proc basicRaftClusterClientRequest*(cluster: BasicRaftCluster, req: RaftNodeClie
of rncroExecSmCommand: of rncroExecSmCommand:
discard discard
proc basicRaftClusterInit*(nodesIds: seq[RaftNodeId], networkDelay: int=10, electionTimeout: int=150, heartBeatTimeout: int=150, appendEntriesRespTimeout: int=20, votingRespTimeout: int=20, proc basicRaftClusterInit*(nodesIds: seq[RaftNodeId], networkDelayJitter: int=10, electionTimeout: int=150, heartBeatTimeout: int=75, appendEntriesRespTimeout: int=20, votingRespTimeout: int=10,
heartBeatRespTimeout: int=10): BasicRaftCluster = heartBeatRespTimeout: int=10): BasicRaftCluster =
new(result) new(result)
for nodeId in nodesIds: for nodeId in nodesIds:
@ -56,7 +56,7 @@ proc basicRaftClusterInit*(nodesIds: seq[RaftNodeId], networkDelay: int=10, elec
peersIds = nodesIds peersIds = nodesIds
peersIds.del(peersIds.find(nodeId)) peersIds.del(peersIds.find(nodeId))
result.networkDelay = networkDelay result.networkDelayJitter = networkDelayJitter
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds, result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds,
basicRaftClusterRaftMessageSendCallbackCreate[SmCommand, SmState](result), basicRaftClusterRaftMessageSendCallbackCreate[SmCommand, SmState](result),
electionTimeout, heartBeatTimeout, appendEntriesRespTimeout, votingRespTimeout, heartBeatRespTimeout) electionTimeout, heartBeatTimeout, appendEntriesRespTimeout, votingRespTimeout, heartBeatRespTimeout)

View File

@ -20,7 +20,7 @@ proc basicClusterElectionMain*() =
test "Basic Raft Cluster Init (5 nodes)": test "Basic Raft Cluster Init (5 nodes)":
for i in 0..4: for i in 0..4:
nodesIds[i] = genUUID() nodesIds[i] = genUUID()
cluster = basicRaftClusterInit(nodesIds, 15, 150, 150, 20, 20, 10) cluster = basicRaftClusterInit(nodesIds, 15, 150, 75, 10, 10, 10)
check cluster != nil check cluster != nil
test "Start Basic Raft Cluster and wait it to converge for a 2 seconds interval (Elect a Leader)": test "Start Basic Raft Cluster and wait it to converge for a 2 seconds interval (Elect a Leader)":