This commit is contained in:
Raycho Mukelov 2023-10-20 04:20:44 +03:00
parent 4e129930ea
commit 1643a8e2c0
2 changed files with 12 additions and 4 deletions

View File

@ -29,7 +29,7 @@ proc raftNodeCheckCommitIndex*[SmCommandType, SmStateType](node: RaftNode[SmComm
while node.commitIndex < newcommitIndex: while node.commitIndex < newcommitIndex:
node.commitIndex.inc node.commitIndex.inc
raftNodeApplyLogEntry(node, raftNodeLogEntryGet(node, node.commitIndex)) raftNodeApplyLogEntry(node, node.commitIndex)
proc raftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]): proc raftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
RaftMessageResponse[SmCommandType, SmStateType] = RaftMessageResponse[SmCommandType, SmStateType] =
@ -192,5 +192,5 @@ proc raftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCo
if replicateCnt >= (node.peers.len div 2 + node.peers.len mod 2): if replicateCnt >= (node.peers.len div 2 + node.peers.len mod 2):
node.commitIndex = raftNodeLogIndexGet(node) node.commitIndex = raftNodeLogIndexGet(node)
raftNodeApplyLogEntry(node, raftNodeLogEntryGet(node, node.commitIndex)) # Apply to state machine raftNodeApplyLogEntry(node, node.commitIndex) # Apply to state machine
result = true result = true

View File

@ -25,7 +25,15 @@ proc raftNodeLogTruncate*[SmCommandType, SmStateType](node: RaftNode[SmCommandTy
debug "Truncating log to index: ", truncateIndex=truncateIndex, ld=repr(node.log.logData) debug "Truncating log to index: ", truncateIndex=truncateIndex, ld=repr(node.log.logData)
# node.log.logData = node.log.logData[:truncateIndex] # node.log.logData = node.log.logData[:truncateIndex]
proc raftNodeApplyLogEntry*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) = proc raftNodeApplyLogEntry*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], entryIndex: RaftLogIndex) =
mixin raftNodeSmApply mixin raftNodeSmApply
raftNodeSmApply(node.stateMachine, logEntry.data.get) let logEntry = raftNodeLogEntryGet(node, entryIndex)
if entryIndex > node.lastApplied:
debug "Applying log entry: ", node_id=node.id, entryIndex=entryIndex, entry=repr(logEntry)
raftNodeSmApply(node.stateMachine, raftNodeLogEntryGet(node, entryIndex))
node.lastApplied = entryIndex
else:
debug "Log entry already applied: ", node_id=node.id, entryIndex=entryIndex, entry=repr(logEntry)