diff --git a/raft/consensus_state_machine.nim b/raft/consensus_state_machine.nim index 148d7e5..2b8fddc 100644 --- a/raft/consensus_state_machine.nim +++ b/raft/consensus_state_machine.nim @@ -119,6 +119,10 @@ type debugLogs: seq[string] stateChange: bool + RaftConfig* = object + currentSet*: seq[RaftNodeId] + + RaftStateMachine* = object myId*: RaftNodeId term: RaftNodeTerm @@ -126,17 +130,22 @@ type toCommit: RaftLogIndex log: RaftLog output: RaftStateMachineOutput - electionTimeout: times.Duration lastUpdate: Time votedFor: RaftNodeId currentLeader: RaftNodeId pingLeader: bool + config: RaftConfig + + lastElectionTime: times.Duration + randomizedElectionTime: times.Duration + timeNow: times.Duration + electionTimeout: times.Duration + case state: RaftNodeState of rnsFollower: follower : FollowerState of rnsCandidate: candidate: CandidateState of rnsLeader: leader: LeaderState - RaftStateMachineConfig* = object RaftFollowerProgressTracker* = object id: RaftNodeId @@ -277,12 +286,20 @@ func termForIndex*(rf: RaftLog, index: RaftLogIndex): RaftNodeTerm = func debug*(sm: var RaftStateMachine, log: string) = sm.output.debugLogs.add(log) -func initRaftStateMachine*(config: RaftStateMachineConfig): RaftStateMachine = - var st = RaftStateMachine() - st.term = 0 - st.commitIndex = 0 - st.state = RaftNodeState.rnsFollower - return st +func resetElectionTimeout*(sm: var RaftStateMachine) = + # TODO actually pick random time + sm.randomizedElectionTime = sm.electionTimeout + times.initDuration(milliseconds = 42) + +func initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftLog, commitIndex: RaftLogIndex, config: RaftConfig): RaftStateMachine = + var sm = RaftStateMachine() + sm.term = currentTerm + sm.log = log + sm.commitIndex = commitIndex + sm.state = RaftNodeState.rnsFollower + sm.config = config + + sm.resetElectionTimeout() + return sm func sendTo*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) = sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.Append, appendRequest: request)) @@ -355,107 +372,6 @@ func addEntry*(sm: var RaftStateMachine, config: Config) = func addEmptyEntry*(sm: var RaftStateMachine) = sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletEmpty, empty: true)) -func tick* (sm: var RaftStateMachine) = - sm.debug "TODO:implement tick" - # TODO: here we should keep track of the replications progress - -func poll*(sm: var RaftStateMachine): RaftStateMachineOutput = - # Should initiate replication if we have new entries - if sm.state == RaftNodeState.rnsLeader: - sm.replicate() - let output = sm.output - sm.output = RaftStateMachineOutput() - return output - - -func commit*(sm: var RaftStateMachine) = - if sm.state != RaftNodeState.rnsLeader: - return - var new_index = sm.commitIndex - var next_index = sm.commitIndex + 1 - while next_index < sm.log.lastIndex: - var replicationCnt = 0 - for p in sm.leader.followersProgress: - if p.matchIndex > new_index: - replicationCnt += 1 - if replicationCnt > len(sm.leader.followersProgress): - sm.output.committed.add(sm.log.getEntryByIndex(next_index)) - sm.commitIndex += next_index; - next_index += 1 - -func find_follower_proggress_by_id(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] = - for follower in sm.leader.followersProgress: - if follower.id == id: - return some(follower) - return none(RaftFollowerProgressTracker) - - -func append_entry_reply*(sm: var RaftStateMachine, from_id: RaftNodeId, replay: RaftRpcAppendReplay) = - if sm.state != RaftNodeState.rnsLeader: - sm.debug "You can't append append replay to the follower" - return - var follower = sm.find_follower_proggress_by_id(from_id) - if not follower.isSome: - sm.debug "Can't find the follower" - return - follower.get().commitIndex = max(follower.get().commitIndex, replay.commitIndex) - case replay.result: - of RaftRpcCode.Accepted: - let lestIndex = replay.accepted.lastNewIndex - follower.get().accepted(lestIndex) - # TODO: add leader stepping down logic here - sm.commit() - if sm.state != RaftNodeState.rnsLeader: - return - of RaftRpcCode.Rejected: - if replay.rejected.nonMatchingIndex == 0 and replay.rejected.lastIdx == 0: - sm.replicateTo(follower.get()) - follower.get().next_index = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1) - # if commit apply configuration that removes current follower - # we should take it again - var follower2 = sm.find_follower_proggress_by_id(from_id) - if follower2.isSome: - sm.replicate_to(follower2.get()) - -func advance_commit_idx(sm: var RaftStateMachine, leaderIdx: RaftLogIndex) = - let new_idx = min(leaderIdx, sm.log.lastIndex) - if new_idx > sm.commitIndex: - sm.commitIndex = new_idx - # TODO: signal the output for the update - - -func append_entry*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcAppendRequest) = - if sm.state != RaftNodeState.rnsFollower: - sm.debug "You can't append append request to the non follower" - return - let (match, term) = sm.log.matchTerm(request.previousLogIndex, request.previousTerm) - if not match: - let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex) - let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected) - sm.send_to(sm.myId, responce) - sm.debug "Reject to apply the entry" - # reject to apply - #send_to() - sm.log.appendAsFollower(request.entry) - sm.advance_commit_idx(request.commitIndex) - let accepted = RaftRpcAppendReplayAccepted(lastNewIndex: sm.log.lastIndex) - let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted) - sm.send_to(sm.myId, responce) - - -func request_vote*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteRequest, ) = - let canVote = sm.votedFor == from_id or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId()) or (request.isPrevote and request.currentTerm > sm.term) - if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm): - if not request.is_prevote: - # TODO: Update election time - sm.votedFor = from_id - - let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true, isPrevote: request.is_prevote) - sm.send_to(sm.myId, responce) - else: - let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false, isPrevote: request.is_prevote) - sm.send_to(sm.myId, responce) - func becomeFollower*(sm: var RaftStateMachine, leaderId: RaftNodeId) = if sm.myId == leaderId: sm.debug "Can't be follower of itself" @@ -486,7 +402,128 @@ func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) = #TODO: implement return -func request_vote_reply*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteReplay) = +func tickLeader*(sm: var RaftStateMachine, now: times.Duration) = + sm.timeNow = now + if sm.lastElectionTime - sm.timeNow > sm.electionTimeout: + sm.becomeFollower(RaftnodeId()) + return + if sm.state != RaftNodeState.rnsLeader: + sm.debug "tick_leader can be called only on the leader" + return + for followerIndex in 0..sm.leader.followersProgress.len: + var follower = sm.leader.followersProgress[followerIndex] + if sm.myId != follower.id: + if follower.matchIndex < sm.log.lastIndex or follower.commitIndex < sm.commitIndex: + sm.replicateTo(follower) + # TODO: implement step down logic + +func tick*(sm: var RaftStateMachine, now: times.Duration) = + sm.timeNow = now + if sm.state != RaftNodeState.rnsLeader: + sm.tickLeader(now); + elif sm.lastElectionTime - sm.timeNow > sm.randomizedElectionTime: + sm.becomeCandidate(false) + + +func poll*(sm: var RaftStateMachine): RaftStateMachineOutput = + # Should initiate replication if we have new entries + if sm.state == RaftNodeState.rnsLeader: + sm.replicate() + let output = sm.output + sm.output = RaftStateMachineOutput() + return output + + +func commit*(sm: var RaftStateMachine) = + if sm.state != RaftNodeState.rnsLeader: + return + var new_index = sm.commitIndex + var next_index = sm.commitIndex + 1 + while next_index < sm.log.lastIndex: + var replicationCnt = 0 + for p in sm.leader.followersProgress: + if p.matchIndex > new_index: + replicationCnt += 1 + if replicationCnt > len(sm.leader.followersProgress): + sm.output.committed.add(sm.log.getEntryByIndex(next_index)) + sm.commitIndex += next_index; + next_index += 1 + +func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] = + for follower in sm.leader.followersProgress: + if follower.id == id: + return some(follower) + return none(RaftFollowerProgressTracker) + + +func appendEntryReply*(sm: var RaftStateMachine, from_id: RaftNodeId, replay: RaftRpcAppendReplay) = + if sm.state != RaftNodeState.rnsLeader: + sm.debug "You can't append append replay to the follower" + return + var follower = sm.findFollowerProggressById(from_id) + if not follower.isSome: + sm.debug "Can't find the follower" + return + follower.get().commitIndex = max(follower.get().commitIndex, replay.commitIndex) + case replay.result: + of RaftRpcCode.Accepted: + let lestIndex = replay.accepted.lastNewIndex + follower.get().accepted(lestIndex) + # TODO: add leader stepping down logic here + sm.commit() + if sm.state != RaftNodeState.rnsLeader: + return + of RaftRpcCode.Rejected: + if replay.rejected.nonMatchingIndex == 0 and replay.rejected.lastIdx == 0: + sm.replicateTo(follower.get()) + follower.get().next_index = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1) + # if commit apply configuration that removes current follower + # we should take it again + var follower2 = sm.findFollowerProggressById(from_id) + if follower2.isSome: + sm.replicateTo(follower2.get()) + +func advanceCommitIdx(sm: var RaftStateMachine, leaderIdx: RaftLogIndex) = + let new_idx = min(leaderIdx, sm.log.lastIndex) + if new_idx > sm.commitIndex: + sm.commitIndex = new_idx + # TODO: signal the output for the update + + +func appendEntry*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcAppendRequest) = + if sm.state != RaftNodeState.rnsFollower: + sm.debug "You can't append append request to the non follower" + return + let (match, term) = sm.log.matchTerm(request.previousLogIndex, request.previousTerm) + if not match: + let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex) + let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected) + sm.sendTo(sm.myId, responce) + sm.debug "Reject to apply the entry" + # reject to apply + #sentTo() + sm.log.appendAsFollower(request.entry) + sm.advanceCommitIdx(request.commitIndex) + let accepted = RaftRpcAppendReplayAccepted(lastNewIndex: sm.log.lastIndex) + let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted) + sm.sendTo(sm.myId, responce) + + +func requestVote*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteRequest, ) = + let canVote = sm.votedFor == from_id or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId()) or (request.isPrevote and request.currentTerm > sm.term) + if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm): + if not request.is_prevote: + # TODO: Update election time + sm.votedFor = from_id + + let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true, isPrevote: request.is_prevote) + sm.sendTo(sm.myId, responce) + else: + let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false, isPrevote: request.is_prevote) + sm.sendTo(sm.myId, responce) + + +func requestVoteReply*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteReplay) = if sm.state != RaftNodeState.rnsCandidate: sm.debug "Non candidate can't handle votes" return diff --git a/tests/test_consensus_state_machine.nim b/tests/test_consensus_state_machine.nim index 305d6a5..7f6ebcf 100644 --- a/tests/test_consensus_state_machine.nim +++ b/tests/test_consensus_state_machine.nim @@ -11,14 +11,23 @@ import unittest2 import ../raft/types import ../raft/consensus_state_machine import std/[times, sequtils] +import uuids + +func configWith3Nodes(): RaftConfig = + var config = RaftConfig() + config.currentSet.add(RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb"))) + config.currentSet.add(RaftnodeId(parseUUID("2a98fc33-6559-44c0-b130-fc3e9df80a69"))) + config.currentSet.add(RaftnodeId(parseUUID("9156756d-697f-4ffa-9b82-0c86720344bd"))) + return config proc consensusstatemachineMain*() = suite "Basic state machine tests": test "create state machine": - let config = RaftStateMachineConfig() - let sm = initRaftStateMachine(config) + var config = configWith3Nodes() + var log = initRaftLog() + let sm = initRaftStateMachine(config.currentSet[0], 0, log, 0, config) echo sm test "advance empty state machine": @@ -70,5 +79,6 @@ proc consensusstatemachineMain*() = check log.entriesCount == 1 + if isMainModule: consensusstatemachineMain() \ No newline at end of file