From cd00bd097f9934e94b166302b047eceb45ae1890 Mon Sep 17 00:00:00 2001 From: Marto Date: Fri, 12 Jan 2024 18:34:09 +0200 Subject: [PATCH] Add voting and commit logic --- raft/consensus_state_machine.nim | 292 +++++++++++++++++++++++++++++-- 1 file changed, 277 insertions(+), 15 deletions(-) diff --git a/raft/consensus_state_machine.nim b/raft/consensus_state_machine.nim index 3df58f8..148d7e5 100644 --- a/raft/consensus_state_machine.nim +++ b/raft/consensus_state_machine.nim @@ -17,7 +17,21 @@ type VoteReplay = 1, Append = 2, AppendReplay = 3 - + + RaftRpcCode* = enum + Rejected = 0, + Accepted = 1 + + RaftElectionResult* = enum + Unknown = 0, + Won = 1, + Lost = 2 + + RaftLogEntryType* = enum + rletCommand = 0, + rletConfig = 1, + rletEmpty = 2 + RaftRpcAppendRequest* = object previousTerm: RaftNodeTerm @@ -25,17 +39,48 @@ type commitIndex: RaftLogIndex entry: LogEntry + RaftRpcAppendReplayRejected* = object + nonMatchingIndex: RaftLogIndex + lastIdx: RaftLogIndex + + RaftRpcAppendReplayAccepted* = object + lastNewIndex: RaftLogIndex + RaftRpcAppendReplay* = object + commitIndex: RaftLogIndex + term: RaftNodeTerm + case result: RaftRpcCode: + of Accepted: accepted: RaftRpcAppendReplayAccepted + of Rejected: rejected: RaftRpcAppendReplayRejected + RaftRpcVoteRequest* = object - + currentTerm: RaftNodeTerm + lastLogIndex: RaftLogIndex + lastLogTerm: RaftNodeTerm + isPrevote: bool + force: bool + RaftRpcVoteReplay* = object - + currentTerm: RaftNodeTerm + voteGranted: bool + isPrevote: bool + LeaderState* = object followersProgress: seq[RaftFollowerProgressTracker] + RaftElectionTracker* = object + all: seq[RaftNodeId] + responded: seq[RaftNodeId] + granted: int + + RaftVotes* = object + voters: seq[RaftNodeId] + current: RaftElectionTracker + CandidateState* = object - votes: seq[RaftNodeId] + votes: RaftVotes + isPrevote: bool FollowerState* = object leader: RaftNodeId @@ -52,11 +97,16 @@ type Command* = object data: seq[byte] + Config* = object + LogEntry* = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.) term: RaftNodeTerm index: RaftLogIndex # TODO: Add configuration too - data: Command + case kind: RaftLogEntryType: + of rletCommand: command: Command + of rletConfig: config: Config + of rletEmpty: empty: bool RaftLog* = object logEntries: seq[LogEntry] @@ -67,15 +117,20 @@ type committed: seq[LogEntry] messages: seq[RaftRpcMessage] debugLogs: seq[string] + stateChange: bool RaftStateMachine* = object myId*: RaftNodeId term: RaftNodeTerm commitIndex: RaftLogIndex + toCommit: RaftLogIndex log: RaftLog output: RaftStateMachineOutput electionTimeout: times.Duration lastUpdate: Time + votedFor: RaftNodeId + currentLeader: RaftNodeId + pingLeader: bool case state: RaftNodeState of rnsFollower: follower : FollowerState of rnsCandidate: candidate: CandidateState @@ -86,10 +141,59 @@ type RaftFollowerProgressTracker* = object id: RaftNodeId nextIndex: RaftLogIndex + # Index of the highest log entry known to be replicated to this server. matchIndex: RaftLogIndex commitIndex: RaftLogIndex replayedIndex: RaftLogIndex + +func contains(a: seq[RaftNodeId], id: RaftNodeId): bool = + var found = false + for n in a: + if n == id: + found = true + break + return found + +func initElectionTracker*(nodes: seq[RaftNodeId]): RaftElectionTracker = + var r = RaftElectionTracker() + r.all = nodes + r.granted = 0 + return r + +func registerVote*(ret: var RaftElectionTracker, nodeId: RaftNodeId, granted: bool): bool = + if not ret.all.contains nodeId: + return false + + if not ret.responded.contains nodeId: + ret.responded.add(nodeId) + ret.granted += 1 + + return true + +func tallyVote*(ret: var RaftElectionTracker): RaftElectionResult = + let quorym = int(len(ret.all) / 2) + 1 + if ret.granted > quorym: + return RaftElectionResult.Won + let unkown = len(ret.all) - len(ret.responded) + if ret.granted + unkown > quorym: + return RaftElectionResult.Unknown + else: + return RaftElectionResult.Lost + +func initVotes*(nodes: seq[RaftNodeId]): RaftVotes = + var r = RaftVotes(voters: nodes, current: initElectionTracker(nodes)) + return r + +func registerVote*(rv: var RaftVotes, nodeId: RaftNodeId, granted: bool): bool = + # TODO: Add support for configuration + return rv.current.registerVote(nodeId, granted) + +func tallyVote*(rv: var RaftVotes): RaftElectionResult = + # TODO: Add support for configuration + return rv.current.tallyVote() + + func initFollowerProgressTracker*(follower: RaftNodeId, nextIndex: RaftLogIndex): RaftFollowerProgressTracker = return RaftFollowerProgressTracker(id: follower, nextIndex: nextIndex, matchIndex: 0, commitIndex: 0, replayedIndex: 0) @@ -117,10 +221,16 @@ func lastIndex*(rf: RaftLog): RaftNodeTerm = let idx = len(rf.logEntries) return rf.logEntries[idx - 1].index +func nextIndex*(rf: RaftLog): int = + return rf.lastIndex + 1 + func truncateUncomitted*(rf: var RaftLog, index: RaftLogIndex) = # TODO: We should add support for configurations and snapshots rf.logEntries.delete(index.. rf.lastTerm or (term == rf.lastTerm and index >= rf.lastIndex) + func getEntryByIndex(rf: RaftLog, index: RaftLogIndex): LogEntry = return rf.logEntries[index] @@ -138,25 +248,27 @@ func appendAsFollower*(rf: var RaftLog, entry: LogEntry) = rf.logEntries.add(entry) func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) = - rf.appendAsLeader(LogEntry(term: term, index: index, data: data)) + rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletCommand, command: data)) +func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, empty: bool) = + rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletEmpty, empty: true)) func appendAsFollower*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) = - rf.appendAsFollower(LogEntry(term: term, index: index, data: data)) + rf.appendAsFollower(LogEntry(term: term, index: index, kind: rletCommand, command: data)) -func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): bool = +func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): (bool, RaftNodeTerm) = if len(rf.logEntries) == 0: - return true + return (true, 0) # TODO: We should add support for snapshots if index > len(rf.logEntries): # The follower doesn't have all etries - return false + return (false, 0) if rf.logEntries[index].term == term: - return true + return (true, 0) else: - return false + return (false, rf.logEntries[index].term) func termForIndex*(rf: RaftLog, index: RaftLogIndex): RaftNodeTerm = # TODO: We should add support for snapshots @@ -173,8 +285,16 @@ func initRaftStateMachine*(config: RaftStateMachineConfig): RaftStateMachine = return st func sendTo*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) = - sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: Append, appendRequest: request)) + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.Append, appendRequest: request)) +func sendTo*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendReplay) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendReplay, appendReplay: request)) + +func sendTo*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteRequest) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.Vote, voteRequest: request)) + +func sendTo*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteReplay) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReplay, voteReplay: request)) func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage = return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: Vote, voteRequest: RaftRpcVoteRequest()) @@ -220,11 +340,20 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage,currentTime: Time) = else: sm.debug "Unhandle msg type" + func addEntry*(sm: var RaftStateMachine, entry: LogEntry) = if sm.state != RaftNodeState.rnsLeader: sm.debug "Error: only the leader can handle new entries" sm.log.appendAsLeader(entry) +func addEntry*(sm: var RaftStateMachine, command: Command) = + sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletCommand, command: command)) + +func addEntry*(sm: var RaftStateMachine, config: Config) = + sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletConfig, config: config)) + +func addEmptyEntry*(sm: var RaftStateMachine) = + sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletEmpty, empty: true)) func tick* (sm: var RaftStateMachine) = sm.debug "TODO:implement tick" @@ -240,6 +369,139 @@ func poll*(sm: var RaftStateMachine): RaftStateMachineOutput = func commit*(sm: var RaftStateMachine) = - if sm.state != RaftNodeState.rnsLeader + if sm.state != RaftNodeState.rnsLeader: return + var new_index = sm.commitIndex + var next_index = sm.commitIndex + 1 + while next_index < sm.log.lastIndex: + var replicationCnt = 0 + for p in sm.leader.followersProgress: + if p.matchIndex > new_index: + replicationCnt += 1 + if replicationCnt > len(sm.leader.followersProgress): + sm.output.committed.add(sm.log.getEntryByIndex(next_index)) + sm.commitIndex += next_index; + next_index += 1 + +func find_follower_proggress_by_id(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] = + for follower in sm.leader.followersProgress: + if follower.id == id: + return some(follower) + return none(RaftFollowerProgressTracker) + + +func append_entry_reply*(sm: var RaftStateMachine, from_id: RaftNodeId, replay: RaftRpcAppendReplay) = + if sm.state != RaftNodeState.rnsLeader: + sm.debug "You can't append append replay to the follower" + return + var follower = sm.find_follower_proggress_by_id(from_id) + if not follower.isSome: + sm.debug "Can't find the follower" + return + follower.get().commitIndex = max(follower.get().commitIndex, replay.commitIndex) + case replay.result: + of RaftRpcCode.Accepted: + let lestIndex = replay.accepted.lastNewIndex + follower.get().accepted(lestIndex) + # TODO: add leader stepping down logic here + sm.commit() + if sm.state != RaftNodeState.rnsLeader: + return + of RaftRpcCode.Rejected: + if replay.rejected.nonMatchingIndex == 0 and replay.rejected.lastIdx == 0: + sm.replicateTo(follower.get()) + follower.get().next_index = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1) + # if commit apply configuration that removes current follower + # we should take it again + var follower2 = sm.find_follower_proggress_by_id(from_id) + if follower2.isSome: + sm.replicate_to(follower2.get()) + +func advance_commit_idx(sm: var RaftStateMachine, leaderIdx: RaftLogIndex) = + let new_idx = min(leaderIdx, sm.log.lastIndex) + if new_idx > sm.commitIndex: + sm.commitIndex = new_idx + # TODO: signal the output for the update + + +func append_entry*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcAppendRequest) = + if sm.state != RaftNodeState.rnsFollower: + sm.debug "You can't append append request to the non follower" + return + let (match, term) = sm.log.matchTerm(request.previousLogIndex, request.previousTerm) + if not match: + let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex) + let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected) + sm.send_to(sm.myId, responce) + sm.debug "Reject to apply the entry" + # reject to apply + #send_to() + sm.log.appendAsFollower(request.entry) + sm.advance_commit_idx(request.commitIndex) + let accepted = RaftRpcAppendReplayAccepted(lastNewIndex: sm.log.lastIndex) + let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted) + sm.send_to(sm.myId, responce) + +func request_vote*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteRequest, ) = + let canVote = sm.votedFor == from_id or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId()) or (request.isPrevote and request.currentTerm > sm.term) + if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm): + if not request.is_prevote: + # TODO: Update election time + sm.votedFor = from_id + + let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true, isPrevote: request.is_prevote) + sm.send_to(sm.myId, responce) + else: + let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false, isPrevote: request.is_prevote) + sm.send_to(sm.myId, responce) + +func becomeFollower*(sm: var RaftStateMachine, leaderId: RaftNodeId) = + if sm.myId == leaderId: + sm.debug "Can't be follower of itself" + sm.output.stateChange = sm.state != RaftNodeState.rnsFollower + sm.state = RaftNodeState.rnsFollower + sm.follower = FollowerState(leader: leaderId) + if leaderId != RaftnodeId(): + sm.pingLeader = false + # TODO: Update last election time + +func becomeLeader*(sm: var RaftStateMachine) = + if sm.state == RaftNodeState.rnsLeader: + sm.debug "The leader can't become leader second time" + return + + sm.output.stateChange = true + sm.state = RaftnodeState.rnsLeader + sm.leader = LeaderState() + + sm.pingLeader = false + #TODO: Update last election time + + #TODO: setup the tracket + + sm.addEmptyEntry() + return +func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) = + #TODO: implement + return + +func request_vote_reply*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteReplay) = + if sm.state != RaftNodeState.rnsCandidate: + sm.debug "Non candidate can't handle votes" + return + discard sm.candidate.votes.registerVote(from_id, request.voteGranted) + + case sm.candidate.votes.tallyVote: + of RaftElectionResult.Unknown: + return + of RaftElectionResult.Won: + sm.debug "Win election" + if (sm.candidate.isPrevote): + sm.becomeCandidate(false) + else: + sm.becomeLeader() + of RaftElectionResult.Lost: + sm.debug "Lost election" + sm.becomeFollower(RaftNodeId()) + # TODO: become foller