From d5a83ae19eccc143c2d33e63b3d160731c952cc4 Mon Sep 17 00:00:00 2001 From: Marto Date: Thu, 15 Feb 2024 18:27:42 +0200 Subject: [PATCH] Fix typos --- src/raft/consensus_state_machine.nim | 95 ++++++++++++++++---------- src/raft/types.nim | 1 + tests/test_consensus_state_machine.nim | 28 ++++---- 3 files changed, 77 insertions(+), 47 deletions(-) diff --git a/src/raft/consensus_state_machine.nim b/src/raft/consensus_state_machine.nim index 0bdf250..7fb37ba 100644 --- a/src/raft/consensus_state_machine.nim +++ b/src/raft/consensus_state_machine.nim @@ -18,9 +18,11 @@ import std/random type RaftRpcMessageType* = enum VoteRequest = 0, - VoteReplay = 1, + VoteReply = 1, AppendRequest = 2, - AppendReplay = 3 + AppendReply = 3, + InstallSnapshot = 4, + SnapshotReply = 5 RaftRpcCode* = enum Rejected = 0, @@ -49,19 +51,19 @@ type commitIndex*: RaftLogIndex entries*: seq[LogEntry] - RaftRpcAppendReplayRejected* = object + RaftRpcAppendReplyRejected* = object nonMatchingIndex: RaftLogIndex lastIdx: RaftLogIndex - RaftRpcAppendReplayAccepted* = object + RaftRpcAppendReplyAccepted* = object lastNewIndex: RaftLogIndex - RaftRpcAppendReplay* = object + RaftRpcAppendReply* = object commitIndex: RaftLogIndex term: RaftNodeTerm case result: RaftRpcCode: - of Accepted: accepted: RaftRpcAppendReplayAccepted - of Rejected: rejected: RaftRpcAppendReplayRejected + of Accepted: accepted: RaftRpcAppendReplyAccepted + of Rejected: rejected: RaftRpcAppendReplyRejected RaftRpcVoteRequest* = object currentTerm*: RaftNodeTerm @@ -69,9 +71,23 @@ type lastLogTerm*: RaftNodeTerm force*: bool - RaftRpcVoteReplay* = object + RaftRpcVoteReply* = object currentTerm*: RaftNodeTerm voteGranted*: bool + + RaftSnapshot* = object + index: RaftLogIndex + term: RaftNodeTerm + config: RaftConfig + snapshotId: RaftSnapshotId + + RaftInstallSnapshot* = object + term: RaftNodeTerm + snapshot: RaftSnapshot + + RaftSnapshotReply* = object + term: RaftNodeTerm + success: bool RaftRpcMessage* = object currentTerm*: RaftNodeTerm @@ -79,9 +95,11 @@ type receiver*: RaftNodeId case kind*: RaftRpcMessageType of VoteRequest: voteRequest*: RaftRpcVoteRequest - of VoteReplay: voteReplay*: RaftRpcVoteReplay + of VoteReply: voteReply*: RaftRpcVoteReply of AppendRequest: appendRequest*: RaftRpcAppendRequest - of AppendReplay: appendReplay*: RaftRpcAppendReplay + of AppendReply: appendReply*: RaftRpcAppendReply + of InstallSnapshot: installSnapshot*: RaftInstallSnapshot + of SnapshotReply: snapshotReply*: RaftSnapshotReply RaftStateMachineOutput* = object logEntries*: seq[LogEntry] @@ -170,17 +188,24 @@ func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) = sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendRequest, appendRequest: request)) -func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendReplay) = - sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendReplay, appendReplay: request)) +func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendReply) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendReply, appendReply: request)) func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteRequest) = sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteRequest, voteRequest: request)) -func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteReplay) = - sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReplay, voteReplay: request)) +func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteReply) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReply, voteReply: request)) + +func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftSnapshotReply) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.SnapshotReply, snapshotReply: request)) + +func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftInstallSnapshot) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.InstallSnapshot, installSnapshot: request)) + func sendTo[MsgType](sm: var RaftStateMachine, id: RaftNodeId, request: MsgType) = - sm.debug "Sent to" & $id & $request + sm.debug "Send to" & $id & $request if sm.state.isLeader: var follower = sm.findFollowerProggressById(id) if follower.isSome: @@ -351,18 +376,18 @@ func commit*(sm: var RaftStateMachine) = else: break -func appendEntryReplay*(sm: var RaftStateMachine, fromId: RaftNodeId, replay: RaftRpcAppendReplay) = +func appendEntryReply*(sm: var RaftStateMachine, fromId: RaftNodeId, reply: RaftRpcAppendReply) = if not sm.state.isLeader: - sm.debug "You can't append append replay to the follower" + sm.debug "You can't append append reply to the follower" return var follower = sm.findFollowerProggressById(fromId) if not follower.isSome: sm.debug "Can't find the follower" return - follower.get().commitIndex = max(follower.get().commitIndex, replay.commitIndex) - case replay.result: + follower.get().commitIndex = max(follower.get().commitIndex, reply.commitIndex) + case reply.result: of RaftRpcCode.Accepted: - let lastIndex = replay.accepted.lastNewIndex + let lastIndex = reply.accepted.lastNewIndex sm.debug "Accpeted" & $fromId & " " & $lastIndex follower.get().accepted(lastIndex) # TODO: add leader stepping down logic here @@ -370,9 +395,9 @@ func appendEntryReplay*(sm: var RaftStateMachine, fromId: RaftNodeId, replay: Ra if not sm.state.isLeader: return of RaftRpcCode.Rejected: - if replay.rejected.nonMatchingIndex == 0 and replay.rejected.lastIdx == 0: + if reply.rejected.nonMatchingIndex == 0 and reply.rejected.lastIdx == 0: sm.replicateTo(follower.get()) - follower.get().nextIndex = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1) + follower.get().nextIndex = min(reply.rejected.nonMatchingIndex, reply.rejected.lastIdx + 1) # if commit apply configuration that removes current follower # we should take it again var follower2 = sm.findFollowerProggressById(fromId) @@ -391,27 +416,27 @@ func appendEntry*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpc return let (match, term) = sm.log.matchTerm(request.previousLogIndex, request.previousTerm) if not match: - let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex) - let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected) + let rejected = RaftRpcAppendReplyRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex) + let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected) sm.sendTo(fromId, responce) sm.debug "Reject to apply the entry" for entry in request.entries: sm.log.appendAsFollower(entry) sm.advanceCommitIdx(request.commitIndex) - let accepted = RaftRpcAppendReplayAccepted(lastNewIndex: sm.log.lastIndex) - let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted) + let accepted = RaftRpcAppendReplyAccepted(lastNewIndex: sm.log.lastIndex) + let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted) sm.sendTo(fromId, responce) func requestVote*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteRequest) = let canVote = sm.votedFor == fromId or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId()) if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm): - let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true) + let responce = RaftRpcVoteReply(currentTerm: sm.term, voteGranted: true) sm.sendTo(fromId, responce) else: - let responce: RaftRpcVoteReplay = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false) + let responce: RaftRpcVoteReply = RaftRpcVoteReply(currentTerm: sm.term, voteGranted: false) sm.sendTo(fromId, responce) -func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteReplay) = +func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteReply) = if not sm.state.isCandidate: sm.debug "Non candidate can't handle votes" return @@ -441,8 +466,8 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime elif msg.currentTerm < sm.term: if msg.kind == RaftRpcMessageType.AppendRequest: # Instruct leader to step down - let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: 0, lastIdx: sm.log.lastIndex) - let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected) + let rejected = RaftRpcAppendReplyRejected(nonMatchingIndex: 0, lastIdx: sm.log.lastIndex) + let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected) sm.sendTo(msg.sender, responce) sm.warning "Ignore message with lower term" @@ -457,9 +482,9 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime if sm.state.isCandidate: if msg.kind == RaftRpcMessageType.VoteRequest: sm.requestVote(msg.sender, msg.voteRequest) - elif msg.kind == RaftRpcMessageType.VoteReplay: + elif msg.kind == RaftRpcMessageType.VoteReply: sm.debug "Apply vote" - sm.requestVoteReply(msg.sender, msg.voteReplay) + sm.requestVoteReply(msg.sender, msg.voteReply) else: sm.warning "Candidate ignore message" elif sm.state.isFollower: @@ -475,8 +500,8 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime elif sm.state.isLeader: if msg.kind == RaftRpcMessageType.AppendRequest: sm.warning "Ignore message leader append his entries directly" - elif msg.kind == RaftRpcMessageType.AppendReplay: - sm.appendEntryReplay(msg.sender, msg.appendReplay) + elif msg.kind == RaftRpcMessageType.AppendReply: + sm.appendEntryReply(msg.sender, msg.appendReply) elif msg.kind == RaftRpcMessageType.VoteRequest: sm.requestVote(msg.sender, msg.voteRequest) else: diff --git a/src/raft/types.nim b/src/raft/types.nim index 5c41460..f0d737d 100644 --- a/src/raft/types.nim +++ b/src/raft/types.nim @@ -29,5 +29,6 @@ type RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node RaftNodeTerm* = int # Raft Node Term Type RaftLogIndex* = int # Raft Node Log Index Type + RaftSnapshotId* = int RaftConfig* = object currentSet*: seq[RaftNodeId] \ No newline at end of file diff --git a/tests/test_consensus_state_machine.nim b/tests/test_consensus_state_machine.nim index 4b9b9f4..9533c0c 100644 --- a/tests/test_consensus_state_machine.nim +++ b/tests/test_consensus_state_machine.nim @@ -90,10 +90,14 @@ proc advance(tc: var TestCluster, now: times.DateTime, logLevel: DebugLogLevel = else: if DebugLogLevel.Debug <= logLevel: echo "[" & now.format("HH:mm:ss:fff") & "] rpc message is blocked: " & $msg & $tc.blockedMsgRoutingSet + for commit in output.committed: + echo $commit debugLogs.sort(cmpLogs) for msg in debugLogs: if msg.level <= logLevel: echo $msg + + @@ -275,8 +279,8 @@ proc consensusstatemachineMain*() = timeNow += 1.milliseconds block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true) - let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) + let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: true) + let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay) check sm.state.isCandidate sm.advance(msg, timeNow) output = sm.poll() @@ -287,8 +291,8 @@ proc consensusstatemachineMain*() = # Older messages should be ignored block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: (output.term - 1), voteGranted: true) - let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) + let voteRaplay = RaftRpcVoteReply(currentTerm: (output.term - 1), voteGranted: true) + let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay) sm.advance(msg, timeNow) output = sm.poll() check output.stateChange == false @@ -329,8 +333,8 @@ proc consensusstatemachineMain*() = check output.votedFor.get() == mainNodeId timeNow += 1.milliseconds block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false) - let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) + let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: false) + let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay) check sm.state.isCandidate sm.advance(msg, timeNow) output = sm.poll() @@ -339,8 +343,8 @@ proc consensusstatemachineMain*() = timeNow += 1.milliseconds block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false) - let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) + let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: false) + let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay) check sm.state.isCandidate sm.advance(msg, timeNow) output = sm.poll() @@ -366,8 +370,8 @@ proc consensusstatemachineMain*() = check output.votedFor.get() == mainNodeId timeNow += 1.milliseconds block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false) - let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) + let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: false) + let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay) check sm.state.isCandidate sm.advance(msg, timeNow) output = sm.poll() @@ -376,8 +380,8 @@ proc consensusstatemachineMain*() = timeNow += 1.milliseconds block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true) - let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) + let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: true) + let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay) check sm.state.isCandidate sm.advance(msg, timeNow) output = sm.poll()