This commit is contained in:
Raycho Mukelov 2023-10-20 04:41:10 +03:00
parent 1643a8e2c0
commit 755f358dfa
3 changed files with 12 additions and 8 deletions

View File

@ -162,6 +162,7 @@ proc raftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCo
withRLock(node.raftStateMutex):
var
logEntry: RaftLogEntry[SmCommandType](term: node.currentTerm, data: cmd, entryType: etData)
raftNodeLogAppend(node, logEntry)
for peer in node.peers:

View File

@ -32,7 +32,7 @@ proc raftNodeApplyLogEntry*[SmCommandType, SmStateType](node: RaftNode[SmCommand
if entryIndex > node.lastApplied:
debug "Applying log entry: ", node_id=node.id, entryIndex=entryIndex, entry=repr(logEntry)
raftNodeSmApply(node.stateMachine, raftNodeLogEntryGet(node, entryIndex))
raftNodeSmApply(node.stateMachine, raftNodeLogEntryGet(node, entryIndex).data.get)
node.lastApplied = entryIndex
else:

View File

@ -144,13 +144,16 @@ proc raftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCom
proc raftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
debug "Raft Node sending Heart-Beat to peers", node_id=node.id
for raftPeer in node.peers:
let msgHrtBt = RaftMessage[SmCommandType, SmStateType](
op: rmoAppendLogEntry, senderId: node.id, receiverId: raftPeer.id,
senderTerm: raftNodeTermGet(node), commitIndex: node.commitIndex,
prevLogIndex: raftNodeLogIndexGet(node) - 1, prevLogTerm: if raftNodeLogIndexGet(node) > 0: raftNodeLogEntryGet(node, raftNodeLogIndexGet(node) - 1).term else: 0
)
discard node.msgSendCallback(msgHrtBt)
withRLock(node.raftStateMutex):
for raftPeer in node.peers:
let msgHrtBt = RaftMessage[SmCommandType, SmStateType](
op: rmoAppendLogEntry, senderId: node.id, receiverId: raftPeer.id,
senderTerm: raftNodeTermGet(node), commitIndex: node.commitIndex,
prevLogIndex: raftNodeLogIndexGet(node) - 1, prevLogTerm: if raftNodeLogIndexGet(node) > 0: raftNodeLogEntryGet(node, raftNodeLogIndexGet(node) - 1).term else: 0
)
discard node.msgSendCallback(msgHrtBt)
raftNodeScheduleHeartBeat(node)
proc raftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =