diff --git a/raft/consensus_state_machine.nim b/raft/consensus_state_machine.nim index b2e4c60..884dc53 100644 --- a/raft/consensus_state_machine.nim +++ b/raft/consensus_state_machine.nim @@ -20,9 +20,9 @@ randomize() type RaftRpcMessageType* = enum - Vote = 0, + VoteRequest = 0, VoteReplay = 1, - Append = 2, + AppendRequest = 2, AppendReplay = 3 RaftRpcCode* = enum @@ -54,20 +54,17 @@ type currentTerm*: RaftNodeTerm lastLogIndex*: RaftLogIndex lastLogTerm*: RaftNodeTerm - isPrevote*: bool force*: bool RaftRpcVoteReplay* = object currentTerm*: RaftNodeTerm voteGranted*: bool - isPrevote*: bool LeaderState* = object tracker: RaftTracker CandidateState* = object votes: RaftVotes - isPrevote: bool FollowerState* = object leader: RaftNodeId @@ -77,9 +74,9 @@ type sender*: RaftNodeId receiver*: RaftNodeId case kind*: RaftRpcMessageType - of Vote: voteRequest*: RaftRpcVoteRequest + of VoteRequest: voteRequest*: RaftRpcVoteRequest of VoteReplay: voteReplay*: RaftRpcVoteReplay - of Append: appendRequest*: RaftRpcAppendRequest + of AppendRequest: appendRequest*: RaftRpcAppendRequest of AppendReplay: appendReplay*: RaftRpcAppendReplay RaftStateMachineOutput* = object @@ -176,13 +173,13 @@ func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option return sm.leader.tracker.find(id) func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) = - sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.Append, appendRequest: request)) + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendRequest, appendRequest: request)) func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendReplay) = sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendReplay, appendReplay: request)) func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteRequest) = - sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.Vote, voteRequest: request)) + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteRequest, voteRequest: request)) func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteReplay) = sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReplay, voteReplay: request)) @@ -199,7 +196,7 @@ func sendTo[MsgType](sm: var RaftStateMachine, id: RaftNodeId, request: MsgType) sm.sendToImpl(id, request) func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage = - return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: Vote, voteRequest: RaftRpcVoteRequest()) + return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: VoteRequest, voteRequest: RaftRpcVoteRequest()) func replicateTo*(sm: var RaftStateMachine, follower: RaftFollowerProgressTracker) = if follower.nextIndex > sm.log.lastIndex: @@ -268,7 +265,7 @@ func becomeLeader*(sm: var RaftStateMachine) = #TODO: Update last election time return -func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) = +func becomeCandidate*(sm: var RaftStateMachine) = #TODO: implement if not sm.state.isCandidate: sm.output.stateChange = true @@ -285,14 +282,10 @@ func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) = sm.votedFor = nodeId continue - let request = RaftRpcVoteRequest(currentTerm: sm.term, lastLogIndex: sm.log.lastIndex, lastLogTerm: sm.log.lastTerm, isPrevote: isPrevote, force: false) + let request = RaftRpcVoteRequest(currentTerm: sm.term, lastLogIndex: sm.log.lastIndex, lastLogTerm: sm.log.lastTerm, force: false) sm.sendTo(nodeId, request) sm.debug "Elecation won" & $(sm.candidate.votes) & $sm.myId if sm.candidate.votes.tallyVote == RaftElectionResult.Won: - - if isPrevote: - sm.becomeCandidate(false) - else: sm.becomeLeader() return @@ -310,7 +303,7 @@ func tickLeader*(sm: var RaftStateMachine, now: times.DateTime) = sm.lastElectionTime = now if not sm.state.isLeader: - sm.debug "tick_leader can be called only on the leader" + sm.debug "tickLeader can be called only on the leader" return for followerIndex in 0.. sm.randomizedElectionTime: sm.debug "Become candidate" - sm.becomeCandidate(false) + sm.becomeCandidate() func poll*(sm: var RaftStateMachine): RaftStateMachineOutput = @@ -351,17 +344,17 @@ func poll*(sm: var RaftStateMachine): RaftStateMachineOutput = func commit*(sm: var RaftStateMachine) = if not sm.state.isLeader: return - var new_index = sm.commitIndex - var next_index = sm.commitIndex + 1 - while next_index < sm.log.lastIndex: + var newIndex = sm.commitIndex + var nextIndex = sm.commitIndex + 1 + while nextIndex < sm.log.lastIndex: var replicationCnt = 0 for p in sm.leader.tracker.progress: - if p.matchIndex > new_index: + if p.matchIndex > newIndex: replicationCnt += 1 if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1): - sm.output.committed.add(sm.log.getEntryByIndex(next_index)) - sm.commitIndex += next_index; - next_index += 1 + sm.output.committed.add(sm.log.getEntryByIndex(nextIndex)) + sm.commitIndex += nextIndex; + nextIndex += 1 else: break @@ -386,7 +379,7 @@ func appendEntryReplay*(sm: var RaftStateMachine, fromId: RaftNodeId, replay: Ra of RaftRpcCode.Rejected: if replay.rejected.nonMatchingIndex == 0 and replay.rejected.lastIdx == 0: sm.replicateTo(follower.get()) - follower.get().next_index = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1) + follower.get().nextIndex = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1) # if commit apply configuration that removes current follower # we should take it again var follower2 = sm.findFollowerProggressById(fromId) @@ -394,9 +387,9 @@ func appendEntryReplay*(sm: var RaftStateMachine, fromId: RaftNodeId, replay: Ra sm.replicateTo(follower2.get()) func advanceCommitIdx(sm: var RaftStateMachine, leaderIdx: RaftLogIndex) = - let new_idx = min(leaderIdx, sm.log.lastIndex) - if new_idx > sm.commitIndex: - sm.commitIndex = new_idx + let newIdx = min(leaderIdx, sm.log.lastIndex) + if newIdx > sm.commitIndex: + sm.commitIndex = newIdx # TODO: signal the output for the update @@ -418,52 +411,44 @@ func appendEntry*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpc sm.sendTo(fromId, responce) func requestVote*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteRequest) = - let canVote = sm.votedFor == fromId or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId()) or (request.isPrevote and request.currentTerm > sm.term) + let canVote = sm.votedFor == fromId or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId()) if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm): - if not request.is_prevote: - # TODO: Update election time - sm.votedFor = fromId - - let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true, isPrevote: request.is_prevote) + let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true) sm.sendTo(fromId, responce) else: - let responce: RaftRpcVoteReplay = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false, isPrevote: request.is_prevote) + let responce: RaftRpcVoteReplay = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false) sm.sendTo(fromId, responce) -func requestVoteReply*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteReplay) = +func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteReplay) = if not sm.state.isCandidate: sm.debug "Non candidate can't handle votes" return - discard sm.candidate.votes.registerVote(from_id, request.voteGranted) + discard sm.candidate.votes.registerVote(fromId, request.voteGranted) case sm.candidate.votes.tallyVote: of RaftElectionResult.Unknown: return of RaftElectionResult.Won: sm.debug "Win election" - if (sm.candidate.isPrevote): - sm.becomeCandidate(false) - else: - sm.becomeLeader() + sm.becomeLeader() of RaftElectionResult.Lost: sm.debug "Lost election" sm.becomeFollower(RaftNodeId()) - # TODO: become foller func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime) = #sm.debug $msg if msg.currentTerm > sm.term: sm.debug "Current node is behind" var leaderId = RaftnodeId() - if msg.kind == RaftRpcMessageType.Append: + if msg.kind == RaftRpcMessageType.AppendRequest: leaderId = msg.sender sm.becomeFollower(leaderId) # TODO: implement pre vote sm.term = msg.currentTerm sm.votedFor = RaftnodeId() elif msg.currentTerm < sm.term: - if msg.kind == RaftRpcMessageType.Append: + if msg.kind == RaftRpcMessageType.AppendRequest: # Instruct leader to step down let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: 0, lastIdx: sm.log.lastIndex) let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected) @@ -472,14 +457,14 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime sm.debug "Ignore message with lower term" else: # TODO: add also snapshot - if msg.kind == RaftRpcMessageType.Append: + if msg.kind == RaftRpcMessageType.AppendRequest: if sm.state.isCandidate: sm.becomeFollower(msg.sender) elif sm.state.isFollower: sm.follower.leader = msg.sender # TODO: fix time if sm.state.isCandidate: - if msg.kind == RaftRpcMessageType.Vote: + if msg.kind == RaftRpcMessageType.VoteRequest: sm.requestVote(msg.sender, msg.voteRequest) elif msg.kind == RaftRpcMessageType.VoteReplay: sm.debug "Apply vote" @@ -489,19 +474,19 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime elif sm.state.isFollower: if msg.sender == sm.follower.leader: sm.lastElectionTime = now - if msg.kind == RaftRpcMessageType.Append: + if msg.kind == RaftRpcMessageType.AppendRequest: sm.appendEntry(msg.sender, msg.appendRequest) - elif msg.kind == RaftRpcMessageType.Vote: + elif msg.kind == RaftRpcMessageType.VoteRequest: sm.requestVote(msg.sender, msg.voteRequest) else: sm.debug "Follower ignore message" & $msg # TODO: imelement the rest of the state transitions elif sm.state.isLeader: - if msg.kind == RaftRpcMessageType.Append: + if msg.kind == RaftRpcMessageType.AppendRequest: sm.debug "Ignore message leader append his entries directly" elif msg.kind == RaftRpcMessageType.AppendReplay: sm.appendEntryReplay(msg.sender, msg.appendReplay) - elif msg.kind == RaftRpcMessageType.Vote: + elif msg.kind == RaftRpcMessageType.VoteRequest: sm.requestVote(msg.sender, msg.voteRequest) else: sm.debug "Leader ignore message" diff --git a/raft/log.nim b/raft/log.nim index b6ec5b9..097399d 100644 --- a/raft/log.nim +++ b/raft/log.nim @@ -67,7 +67,6 @@ func appendAsFollower*(rf: var RaftLog, entry: LogEntry) = if entry.index <= currentIdx: # TODO: The indexing hold only if we keep all entries in memory # we should change it when we add support for snapshots - if entry.index >= rf.firstIndex or entry.term != rf.getEntryByIndex(entry.index).term: rf.truncateUncomitted(entry.index) rf.logEntries.add(entry) diff --git a/tests/test_consensus_state_machine.nim b/tests/test_consensus_state_machine.nim index a81d519..dac5938 100644 --- a/tests/test_consensus_state_machine.nim +++ b/tests/test_consensus_state_machine.nim @@ -238,7 +238,7 @@ proc consensusstatemachineMain*() = timeNow += 1.milliseconds block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true, isPrevote: false) + let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true) let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) check sm.state.isCandidate sm.advance(msg, timeNow) @@ -250,7 +250,7 @@ proc consensusstatemachineMain*() = # Older messages should be ignored block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: (output.term - 1), voteGranted: true, isPrevote: false) + let voteRaplay = RaftRpcVoteReplay(currentTerm: (output.term - 1), voteGranted: true) let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) sm.advance(msg, timeNow) output = sm.poll() @@ -269,7 +269,7 @@ proc consensusstatemachineMain*() = output = sm.poll() let entry = LogEntry(term: (output.term + 1), index: 101, kind: RaftLogEntryType.rletEmpty, empty: true) let appendRequest = RaftRpcAppendRequest(previousTerm: (output.term + 1), previousLogIndex: 100, commitIndex: 99, entries: @[entry]) - let msg = RaftRpcMessage(currentTerm: (output.term + 1), sender: id2, receiver:id1, kind: RaftRpcMessageType.Append, appendRequest: appendRequest) + let msg = RaftRpcMessage(currentTerm: (output.term + 1), sender: id2, receiver:id1, kind: RaftRpcMessageType.AppendRequest, appendRequest: appendRequest) sm.advance(msg, timeNow) output = sm.poll() check output.stateChange == true @@ -292,7 +292,7 @@ proc consensusstatemachineMain*() = check output.votedFor.get() == mainNodeId timeNow += 1.milliseconds block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false, isPrevote: false) + let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false) let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) check sm.state.isCandidate sm.advance(msg, timeNow) @@ -302,7 +302,7 @@ proc consensusstatemachineMain*() = timeNow += 1.milliseconds block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false, isPrevote: false) + let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false) let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) check sm.state.isCandidate sm.advance(msg, timeNow) @@ -329,7 +329,7 @@ proc consensusstatemachineMain*() = check output.votedFor.get() == mainNodeId timeNow += 1.milliseconds block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false, isPrevote: false) + let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false) let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) check sm.state.isCandidate sm.advance(msg, timeNow) @@ -339,7 +339,7 @@ proc consensusstatemachineMain*() = timeNow += 1.milliseconds block: - let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true, isPrevote: false) + let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true) let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay) check sm.state.isCandidate sm.advance(msg, timeNow)