Fix log 'commit' logic

This commit is contained in:
Marto 2024-02-15 18:59:44 +02:00
parent d5a83ae19e
commit 9bfde8cde8
2 changed files with 29 additions and 10 deletions

View File

@ -111,6 +111,12 @@ type
votedFor*: Option[RaftNodeId] votedFor*: Option[RaftNodeId]
stateChange*: bool stateChange*: bool
RaftLastPollState* = object
term*: RaftNodeTerm
votedFor*: RaftNodeId
commitIndex: RaftLogIndex
RaftStateMachine* = object RaftStateMachine* = object
myId*: RaftNodeId myId*: RaftNodeId
term*: RaftNodeTerm term*: RaftNodeTerm
@ -132,8 +138,20 @@ type
electionTimeout: times.Duration electionTimeout: times.Duration
randomGenerator: Rand randomGenerator: Rand
observedState: RaftLastPollState
state*: RaftStateMachineState state*: RaftStateMachineState
func observe(ps: var RaftLastPollState, sm: RaftStateMachine) =
ps.term = sm.term
ps.votedFor = sm.votedFor
ps.commitIndex = sm.commitIndex
func eq(ps: RaftLastPollState, sm: RaftStateMachine): bool =
return ps.term == sm.term and
ps.votedFor == sm.votedFor and
ps.commitIndex == sm.commitIndex
func leader*(sm: var RaftStateMachine): var LeaderState = func leader*(sm: var RaftStateMachine): var LeaderState =
return sm.state.leader return sm.state.leader
@ -180,6 +198,7 @@ func initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftL
sm.heartbeatTime = times.initDuration(milliseconds = 50) sm.heartbeatTime = times.initDuration(milliseconds = 50)
sm.randomGenerator = randomGenerator sm.randomGenerator = randomGenerator
sm.resetElectionTimeout() sm.resetElectionTimeout()
sm.observedState.observe(sm)
return sm return sm
func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] = func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] =
@ -352,14 +371,19 @@ func poll*(sm: var RaftStateMachine): RaftStateMachineOutput =
if sm.state.isLeader: if sm.state.isLeader:
sm.replicate() sm.replicate()
sm.output.term = sm.term sm.output.term = sm.term
if sm.observedState.commitIndex < sm.commitIndex:
for i in (sm.observedState.commitIndex + 1)..<(sm.commitIndex + 1):
sm.output.committed.add(sm.log.getEntryByIndex(i))
if sm.votedFor != RaftnodeId(): if sm.votedFor != RaftnodeId():
sm.output.votedFor = some(sm.votedFor) sm.output.votedFor = some(sm.votedFor)
sm.observedState.observe(sm)
let output = sm.output let output = sm.output
sm.output = RaftStateMachineOutput() sm.output = RaftStateMachineOutput()
return output return output
func commit*(sm: var RaftStateMachine) = func commit(sm: var RaftStateMachine) =
if not sm.state.isLeader: if not sm.state.isLeader:
return return
var newIndex = sm.commitIndex var newIndex = sm.commitIndex
@ -370,9 +394,9 @@ func commit*(sm: var RaftStateMachine) =
if p.matchIndex > newIndex: if p.matchIndex > newIndex:
replicationCnt += 1 replicationCnt += 1
if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1): if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1):
sm.output.committed.add(sm.log.getEntryByIndex(nextIndex)) sm.commitIndex = nextIndex;
sm.commitIndex += nextIndex;
nextIndex += 1 nextIndex += 1
newIndex += 1
else: else:
break break

View File

@ -69,11 +69,9 @@ func allowMsgRouting(tc: var TestCluster, id: RaftnodeId) =
proc cmpLogs(x, y: DebugLogEntry): int = proc cmpLogs(x, y: DebugLogEntry): int =
cmp(x.time, y.time) cmp(x.time, y.time)
func `$`*(de: DebugLogEntry): string = func `$`*(de: DebugLogEntry): string =
return "[" & $de.level & "][" & de.time.format("HH:mm:ss:fff") & "][" & (($de.nodeId)[0..7]) & "...][" & $de.state & "]: " & de.msg return "[" & $de.level & "][" & de.time.format("HH:mm:ss:fff") & "][" & (($de.nodeId)[0..7]) & "...][" & $de.state & "]: " & de.msg
proc advance(tc: var TestCluster, now: times.DateTime, logLevel: DebugLogLevel = DebugLogLevel.Error) = proc advance(tc: var TestCluster, now: times.DateTime, logLevel: DebugLogLevel = DebugLogLevel.Error) =
var debugLogs : seq[DebugLogEntry] var debugLogs : seq[DebugLogEntry]
for id, node in tc.nodes: for id, node in tc.nodes:
@ -91,15 +89,12 @@ proc advance(tc: var TestCluster, now: times.DateTime, logLevel: DebugLogLevel =
if DebugLogLevel.Debug <= logLevel: if DebugLogLevel.Debug <= logLevel:
echo "[" & now.format("HH:mm:ss:fff") & "] rpc message is blocked: " & $msg & $tc.blockedMsgRoutingSet echo "[" & now.format("HH:mm:ss:fff") & "] rpc message is blocked: " & $msg & $tc.blockedMsgRoutingSet
for commit in output.committed: for commit in output.committed:
echo $commit if DebugLogLevel.Debug <= logLevel:
echo "[" & (($node.myId)[0..7]) & "...] Commit:" & $commit
debugLogs.sort(cmpLogs) debugLogs.sort(cmpLogs)
for msg in debugLogs: for msg in debugLogs:
if msg.level <= logLevel: if msg.level <= logLevel:
echo $msg echo $msg
func getLeader(tc: TestCluster): Option[RaftStateMachine] = func getLeader(tc: TestCluster): Option[RaftStateMachine] =
var leader = none(RaftStateMachine) var leader = none(RaftStateMachine)