diff --git a/raft/consensus_module.nim b/raft/consensus_module.nim index bbe4009..11dbc3a 100644 --- a/raft/consensus_module.nim +++ b/raft/consensus_module.nim @@ -29,7 +29,7 @@ proc raftNodeCheckCommitIndex*[SmCommandType, SmStateType](node: RaftNode[SmComm while node.commitIndex < newcommitIndex: node.commitIndex.inc - raftNodeApplyLogEntry(node, raftNodeLogEntryGet(node, node.commitIndex)) + raftNodeApplyLogEntry(node, node.commitIndex) proc raftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]): RaftMessageResponse[SmCommandType, SmStateType] = @@ -192,5 +192,5 @@ proc raftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCo if replicateCnt >= (node.peers.len div 2 + node.peers.len mod 2): node.commitIndex = raftNodeLogIndexGet(node) - raftNodeApplyLogEntry(node, raftNodeLogEntryGet(node, node.commitIndex)) # Apply to state machine + raftNodeApplyLogEntry(node, node.commitIndex) # Apply to state machine result = true \ No newline at end of file diff --git a/raft/log_ops.nim b/raft/log_ops.nim index 32cc2d9..c4ce137 100644 --- a/raft/log_ops.nim +++ b/raft/log_ops.nim @@ -25,7 +25,15 @@ proc raftNodeLogTruncate*[SmCommandType, SmStateType](node: RaftNode[SmCommandTy debug "Truncating log to index: ", truncateIndex=truncateIndex, ld=repr(node.log.logData) # node.log.logData = node.log.logData[:truncateIndex] -proc raftNodeApplyLogEntry*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) = +proc raftNodeApplyLogEntry*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], entryIndex: RaftLogIndex) = mixin raftNodeSmApply - raftNodeSmApply(node.stateMachine, logEntry.data.get) \ No newline at end of file + let logEntry = raftNodeLogEntryGet(node, entryIndex) + + if entryIndex > node.lastApplied: + debug "Applying log entry: ", node_id=node.id, entryIndex=entryIndex, entry=repr(logEntry) + raftNodeSmApply(node.stateMachine, raftNodeLogEntryGet(node, entryIndex)) + node.lastApplied = entryIndex + + else: + debug "Log entry already applied: ", node_id=node.id, entryIndex=entryIndex, entry=repr(logEntry) \ No newline at end of file