diff --git a/raft/consensus_state_machine.nim b/raft/consensus_state_machine.nim index e568919..2971654 100644 --- a/raft/consensus_state_machine.nim +++ b/raft/consensus_state_machine.nim @@ -152,6 +152,7 @@ type randomizedElectionTime: times.Duration heartbeatTime: times.Duration timeNow: times.DateTime + startTime: times.DateTime electionTimeout: times.Duration state*: RaftStateMachineState @@ -274,6 +275,7 @@ func tallyVote*(rv: var RaftVotes): RaftElectionResult = func initRaftLog*(firstIndex: RaftLogIndex): RaftLog = var log = RaftLog() + assert firstIndex > 0 log.firstIndex = firstIndex return log @@ -295,7 +297,9 @@ func nextIndex*(rf: RaftLog): int = func truncateUncomitted*(rf: var RaftLog, index: RaftLogIndex) = # TODO: We should add support for configurations and snapshots - rf.logEntries.delete(index.. rf.lastTerm or (term == rf.lastTerm and index >= rf.lastIndex) @@ -307,11 +311,13 @@ func appendAsLeader(rf: var RaftLog, entry: LogEntry) = rf.logEntries.add(entry) func appendAsFollower*(rf: var RaftLog, entry: LogEntry) = - let currentIdx = rf.lastIndex() + assert entry.index > 0 + let currentIdx = rf.lastIndex if entry.index <= currentIdx: # TODO: The indexing hold only if we keep all entries in memory # we should change it when we add support for snapshots - if rf.logEntries.len > 0 and entry.term != rf.getEntryByIndex(entry.index).term: + + if entry.index >= rf.firstIndex or entry.term != rf.getEntryByIndex(entry.index).term: rf.truncateUncomitted(entry.index) rf.logEntries.add(entry) @@ -333,10 +339,11 @@ func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): (bool, Ra # The follower doesn't have all etries return (false, 0) - if rf.logEntries[index].term == term: + let i = index - rf.firstIndex + if rf.logEntries[i].term == term: return (true, 0) else: - return (false, rf.logEntries[index].term) + return (false, rf.logEntries[i].term) func termForIndex*(rf: RaftLog, index: RaftLogIndex): Option[RaftNodeTerm] = # TODO: snapshot support @@ -345,11 +352,11 @@ func termForIndex*(rf: RaftLog, index: RaftLogIndex): Option[RaftNodeTerm] = return none(RaftNodeTerm) func debug*(sm: var RaftStateMachine, log: string) = - sm.output.debugLogs.add(log) + sm.output.debugLogs.add("[" & $(sm.timeNow - sm.startTime).inMilliseconds & "ms] [" & (($sm.myId)[0..7]) & "...] [" & $sm.state.state & "]: " & log) proc resetElectionTimeout*(sm: var RaftStateMachine) = # TODO actually pick random time - sm.randomizedElectionTime = sm.electionTimeout + times.initDuration(milliseconds = 100 + rand(100)) + sm.randomizedElectionTime = sm.electionTimeout + times.initDuration(milliseconds = 100 + rand(200)) proc initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftLog, commitIndex: RaftLogIndex, config: RaftConfig, now: times.DateTime): RaftStateMachine = var sm = RaftStateMachine() @@ -360,6 +367,7 @@ proc initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftL sm.config = config sm.lastElectionTime = now sm.timeNow = now + sm.startTime = now sm.myId = id sm.electionTimeout = times.initDuration(milliseconds = 100) sm.heartbeatTime = times.initDuration(milliseconds = 50) @@ -398,7 +406,7 @@ func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage = return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: Vote, voteRequest: RaftRpcVoteRequest()) func replicateTo*(sm: var RaftStateMachine, follower: var RaftFollowerProgressTracker) = - if follower.nextIndex > sm.log.lastIndex(): + if follower.nextIndex > sm.log.lastIndex: return var previousTerm = sm.log.termForIndex(follower.nextIndex - 1) @@ -409,7 +417,15 @@ func replicateTo*(sm: var RaftStateMachine, follower: var RaftFollowerProgressTr commitIndex: sm.commitIndex, entries: @[sm.log.getEntryByIndex(follower.nextIndex)]) follower.nextIndex += 1 - + sm.sendTo(follower.id, request) + else: + # TODO: we add support for snapshots + let request = RaftRpcAppendRequest( + previousTerm: 0, + previousLogIndex: 1, + commitIndex: sm.commitIndex, + entries: @[sm.log.getEntryByIndex(follower.nextIndex)]) + follower.nextIndex += 1 sm.sendTo(follower.id, request) func replicate*(sm: var RaftStateMachine) = @@ -448,13 +464,10 @@ func becomeLeader*(sm: var RaftStateMachine) = sm.output.stateChange = true sm.state = RaftStateMachineState(state: RaftnodeState.rnsLeader, leader: LeaderState()) + sm.addEntry(Empty()) sm.leader.tracker = initTracker(sm.config, sm.log.lastIndex, sm.timeNow) sm.pingLeader = false - #TODO: Update last election time - - #TODO: setup the tracket - - sm.addEntry(Empty()) + #TODO: Update last election time return func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) = @@ -487,7 +500,7 @@ func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) = return func hearthbeat(sm: var RaftStateMachine, follower: var RaftFollowerProgressTracker) = - + sm.debug "hearthbear" & $follower.nextIndex var previousTerm = sm.log.termForIndex(follower.nextIndex - 1) if previousTerm.isSome: let request = RaftRpcAppendRequest( @@ -503,22 +516,28 @@ func tickLeader*(sm: var RaftStateMachine, now: times.DateTime) = # if sm.lastElectionTime - sm.timeNow > sm.electionTimeout: # sm.becomeFollower(RaftnodeId()) # return + + sm.lastElectionTime = now if not sm.state.isLeader: sm.debug "tick_leader can be called only on the leader" return + sm.debug "Tick leader" for followerIndex in 0.. sm.heartbeatTime: + sm.debug "heartbeat" sm.hearthbeat(follower) # TODO: implement step down logic func tick*(sm: var RaftStateMachine, now: times.DateTime) = - sm.debug "Time since last update: " & $(now - sm.timeNow) + sm.debug "Time since last update: " & $(now - sm.timeNow).inMilliseconds & "ms time until election:" & $(sm.randomizedElectionTime - (sm.timeNow - sm.lastElectionTime)).inMilliseconds & "ms" sm.timeNow = now if sm.state.isLeader: sm.tickLeader(now); @@ -618,10 +637,10 @@ func requestVote*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRp sm.votedFor = from_id let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true, isPrevote: request.is_prevote) - sm.sendTo(sm.myId, responce) + sm.sendTo(from_id, responce) else: let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false, isPrevote: request.is_prevote) - sm.sendTo(sm.myId, responce) + sm.sendTo(from_id, responce) func requestVoteReply*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteReplay) = @@ -645,6 +664,7 @@ func requestVoteReply*(sm: var RaftStateMachine, from_id: RaftNodeId, request: R # TODO: become foller func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime) = + #sm.debug $msg if msg.currentTerm > sm.term: sm.debug "Current node is behind" var leaderId = RaftnodeId() @@ -663,6 +683,7 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime sm.debug "Ignore message with lower term" else: + # TODO: add also snapshot if msg.kind == RaftRpcMessageType.Append: if sm.state.isCandidate: @@ -670,7 +691,7 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime elif sm.state.isFollower: sm.follower.leader = msg.sender # TODO: fix time - #lastElectionTime = now + if sm.state.isCandidate: if msg.kind == RaftRpcMessageType.Vote: diff --git a/tests/test_consensus_state_machine.nim b/tests/test_consensus_state_machine.nim index 7accead..ba1deb6 100644 --- a/tests/test_consensus_state_machine.nim +++ b/tests/test_consensus_state_machine.nim @@ -17,7 +17,6 @@ import tables type TestCluster* = object nodes: Table[RaftnodeId, RaftStateMachine] - messages: Table[RaftnodeId, seq[RaftRpcMessage]] var test_ids_3 = @[ RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")), @@ -38,34 +37,31 @@ func createConfigFromIds(ids: seq[RaftnodeId]): RaftConfig = proc createCluster(ids: seq[RaftnodeId], now: times.DateTime) : TestCluster = var config = createConfigFromIds(ids) var cluster = TestCluster() - cluster.messages = initTable[RaftnodeId, seq[RaftRpcMessage]]() cluster.nodes = initTable[RaftnodeId, RaftStateMachine]() for i in 0..