From 9bfde8cde8e0bc36aae4c88bf085fe05ff30ef3c Mon Sep 17 00:00:00 2001 From: Marto Date: Thu, 15 Feb 2024 18:59:44 +0200 Subject: [PATCH] Fix log 'commit' logic --- src/raft/consensus_state_machine.nim | 30 +++++++++++++++++++++++--- tests/test_consensus_state_machine.nim | 9 ++------ 2 files changed, 29 insertions(+), 10 deletions(-) diff --git a/src/raft/consensus_state_machine.nim b/src/raft/consensus_state_machine.nim index 7fb37ba..1ac3adb 100644 --- a/src/raft/consensus_state_machine.nim +++ b/src/raft/consensus_state_machine.nim @@ -111,6 +111,12 @@ type votedFor*: Option[RaftNodeId] stateChange*: bool + RaftLastPollState* = object + term*: RaftNodeTerm + votedFor*: RaftNodeId + commitIndex: RaftLogIndex + + RaftStateMachine* = object myId*: RaftNodeId term*: RaftNodeTerm @@ -132,8 +138,20 @@ type electionTimeout: times.Duration randomGenerator: Rand + observedState: RaftLastPollState state*: RaftStateMachineState + +func observe(ps: var RaftLastPollState, sm: RaftStateMachine) = + ps.term = sm.term + ps.votedFor = sm.votedFor + ps.commitIndex = sm.commitIndex + +func eq(ps: RaftLastPollState, sm: RaftStateMachine): bool = + return ps.term == sm.term and + ps.votedFor == sm.votedFor and + ps.commitIndex == sm.commitIndex + func leader*(sm: var RaftStateMachine): var LeaderState = return sm.state.leader @@ -180,6 +198,7 @@ func initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftL sm.heartbeatTime = times.initDuration(milliseconds = 50) sm.randomGenerator = randomGenerator sm.resetElectionTimeout() + sm.observedState.observe(sm) return sm func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] = @@ -352,14 +371,19 @@ func poll*(sm: var RaftStateMachine): RaftStateMachineOutput = if sm.state.isLeader: sm.replicate() sm.output.term = sm.term + if sm.observedState.commitIndex < sm.commitIndex: + for i in (sm.observedState.commitIndex + 1)..<(sm.commitIndex + 1): + sm.output.committed.add(sm.log.getEntryByIndex(i)) + if sm.votedFor != RaftnodeId(): sm.output.votedFor = some(sm.votedFor) + sm.observedState.observe(sm) let output = sm.output sm.output = RaftStateMachineOutput() return output -func commit*(sm: var RaftStateMachine) = +func commit(sm: var RaftStateMachine) = if not sm.state.isLeader: return var newIndex = sm.commitIndex @@ -370,9 +394,9 @@ func commit*(sm: var RaftStateMachine) = if p.matchIndex > newIndex: replicationCnt += 1 if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1): - sm.output.committed.add(sm.log.getEntryByIndex(nextIndex)) - sm.commitIndex += nextIndex; + sm.commitIndex = nextIndex; nextIndex += 1 + newIndex += 1 else: break diff --git a/tests/test_consensus_state_machine.nim b/tests/test_consensus_state_machine.nim index 9533c0c..b846d25 100644 --- a/tests/test_consensus_state_machine.nim +++ b/tests/test_consensus_state_machine.nim @@ -69,11 +69,9 @@ func allowMsgRouting(tc: var TestCluster, id: RaftnodeId) = proc cmpLogs(x, y: DebugLogEntry): int = cmp(x.time, y.time) - func `$`*(de: DebugLogEntry): string = return "[" & $de.level & "][" & de.time.format("HH:mm:ss:fff") & "][" & (($de.nodeId)[0..7]) & "...][" & $de.state & "]: " & de.msg - proc advance(tc: var TestCluster, now: times.DateTime, logLevel: DebugLogLevel = DebugLogLevel.Error) = var debugLogs : seq[DebugLogEntry] for id, node in tc.nodes: @@ -91,15 +89,12 @@ proc advance(tc: var TestCluster, now: times.DateTime, logLevel: DebugLogLevel = if DebugLogLevel.Debug <= logLevel: echo "[" & now.format("HH:mm:ss:fff") & "] rpc message is blocked: " & $msg & $tc.blockedMsgRoutingSet for commit in output.committed: - echo $commit + if DebugLogLevel.Debug <= logLevel: + echo "[" & (($node.myId)[0..7]) & "...] Commit:" & $commit debugLogs.sort(cmpLogs) for msg in debugLogs: if msg.level <= logLevel: echo $msg - - - - func getLeader(tc: TestCluster): Option[RaftStateMachine] = var leader = none(RaftStateMachine)