Add voting and commit logic
This commit is contained in:
parent
a763d940da
commit
cd00bd097f
|
@ -17,7 +17,21 @@ type
|
||||||
VoteReplay = 1,
|
VoteReplay = 1,
|
||||||
Append = 2,
|
Append = 2,
|
||||||
AppendReplay = 3
|
AppendReplay = 3
|
||||||
|
|
||||||
|
RaftRpcCode* = enum
|
||||||
|
Rejected = 0,
|
||||||
|
Accepted = 1
|
||||||
|
|
||||||
|
RaftElectionResult* = enum
|
||||||
|
Unknown = 0,
|
||||||
|
Won = 1,
|
||||||
|
Lost = 2
|
||||||
|
|
||||||
|
RaftLogEntryType* = enum
|
||||||
|
rletCommand = 0,
|
||||||
|
rletConfig = 1,
|
||||||
|
rletEmpty = 2
|
||||||
|
|
||||||
|
|
||||||
RaftRpcAppendRequest* = object
|
RaftRpcAppendRequest* = object
|
||||||
previousTerm: RaftNodeTerm
|
previousTerm: RaftNodeTerm
|
||||||
|
@ -25,17 +39,48 @@ type
|
||||||
commitIndex: RaftLogIndex
|
commitIndex: RaftLogIndex
|
||||||
entry: LogEntry
|
entry: LogEntry
|
||||||
|
|
||||||
|
RaftRpcAppendReplayRejected* = object
|
||||||
|
nonMatchingIndex: RaftLogIndex
|
||||||
|
lastIdx: RaftLogIndex
|
||||||
|
|
||||||
|
RaftRpcAppendReplayAccepted* = object
|
||||||
|
lastNewIndex: RaftLogIndex
|
||||||
|
|
||||||
RaftRpcAppendReplay* = object
|
RaftRpcAppendReplay* = object
|
||||||
|
commitIndex: RaftLogIndex
|
||||||
|
term: RaftNodeTerm
|
||||||
|
case result: RaftRpcCode:
|
||||||
|
of Accepted: accepted: RaftRpcAppendReplayAccepted
|
||||||
|
of Rejected: rejected: RaftRpcAppendReplayRejected
|
||||||
|
|
||||||
|
|
||||||
RaftRpcVoteRequest* = object
|
RaftRpcVoteRequest* = object
|
||||||
|
currentTerm: RaftNodeTerm
|
||||||
|
lastLogIndex: RaftLogIndex
|
||||||
|
lastLogTerm: RaftNodeTerm
|
||||||
|
isPrevote: bool
|
||||||
|
force: bool
|
||||||
|
|
||||||
RaftRpcVoteReplay* = object
|
RaftRpcVoteReplay* = object
|
||||||
|
currentTerm: RaftNodeTerm
|
||||||
|
voteGranted: bool
|
||||||
|
isPrevote: bool
|
||||||
|
|
||||||
LeaderState* = object
|
LeaderState* = object
|
||||||
followersProgress: seq[RaftFollowerProgressTracker]
|
followersProgress: seq[RaftFollowerProgressTracker]
|
||||||
|
|
||||||
|
RaftElectionTracker* = object
|
||||||
|
all: seq[RaftNodeId]
|
||||||
|
responded: seq[RaftNodeId]
|
||||||
|
granted: int
|
||||||
|
|
||||||
|
RaftVotes* = object
|
||||||
|
voters: seq[RaftNodeId]
|
||||||
|
current: RaftElectionTracker
|
||||||
|
|
||||||
CandidateState* = object
|
CandidateState* = object
|
||||||
votes: seq[RaftNodeId]
|
votes: RaftVotes
|
||||||
|
isPrevote: bool
|
||||||
|
|
||||||
FollowerState* = object
|
FollowerState* = object
|
||||||
leader: RaftNodeId
|
leader: RaftNodeId
|
||||||
|
@ -52,11 +97,16 @@ type
|
||||||
|
|
||||||
Command* = object
|
Command* = object
|
||||||
data: seq[byte]
|
data: seq[byte]
|
||||||
|
Config* = object
|
||||||
|
|
||||||
LogEntry* = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
|
LogEntry* = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
|
||||||
term: RaftNodeTerm
|
term: RaftNodeTerm
|
||||||
index: RaftLogIndex
|
index: RaftLogIndex
|
||||||
# TODO: Add configuration too
|
# TODO: Add configuration too
|
||||||
data: Command
|
case kind: RaftLogEntryType:
|
||||||
|
of rletCommand: command: Command
|
||||||
|
of rletConfig: config: Config
|
||||||
|
of rletEmpty: empty: bool
|
||||||
|
|
||||||
RaftLog* = object
|
RaftLog* = object
|
||||||
logEntries: seq[LogEntry]
|
logEntries: seq[LogEntry]
|
||||||
|
@ -67,15 +117,20 @@ type
|
||||||
committed: seq[LogEntry]
|
committed: seq[LogEntry]
|
||||||
messages: seq[RaftRpcMessage]
|
messages: seq[RaftRpcMessage]
|
||||||
debugLogs: seq[string]
|
debugLogs: seq[string]
|
||||||
|
stateChange: bool
|
||||||
|
|
||||||
RaftStateMachine* = object
|
RaftStateMachine* = object
|
||||||
myId*: RaftNodeId
|
myId*: RaftNodeId
|
||||||
term: RaftNodeTerm
|
term: RaftNodeTerm
|
||||||
commitIndex: RaftLogIndex
|
commitIndex: RaftLogIndex
|
||||||
|
toCommit: RaftLogIndex
|
||||||
log: RaftLog
|
log: RaftLog
|
||||||
output: RaftStateMachineOutput
|
output: RaftStateMachineOutput
|
||||||
electionTimeout: times.Duration
|
electionTimeout: times.Duration
|
||||||
lastUpdate: Time
|
lastUpdate: Time
|
||||||
|
votedFor: RaftNodeId
|
||||||
|
currentLeader: RaftNodeId
|
||||||
|
pingLeader: bool
|
||||||
case state: RaftNodeState
|
case state: RaftNodeState
|
||||||
of rnsFollower: follower : FollowerState
|
of rnsFollower: follower : FollowerState
|
||||||
of rnsCandidate: candidate: CandidateState
|
of rnsCandidate: candidate: CandidateState
|
||||||
|
@ -86,10 +141,59 @@ type
|
||||||
RaftFollowerProgressTracker* = object
|
RaftFollowerProgressTracker* = object
|
||||||
id: RaftNodeId
|
id: RaftNodeId
|
||||||
nextIndex: RaftLogIndex
|
nextIndex: RaftLogIndex
|
||||||
|
# Index of the highest log entry known to be replicated to this server.
|
||||||
matchIndex: RaftLogIndex
|
matchIndex: RaftLogIndex
|
||||||
commitIndex: RaftLogIndex
|
commitIndex: RaftLogIndex
|
||||||
replayedIndex: RaftLogIndex
|
replayedIndex: RaftLogIndex
|
||||||
|
|
||||||
|
|
||||||
|
func contains(a: seq[RaftNodeId], id: RaftNodeId): bool =
|
||||||
|
var found = false
|
||||||
|
for n in a:
|
||||||
|
if n == id:
|
||||||
|
found = true
|
||||||
|
break
|
||||||
|
return found
|
||||||
|
|
||||||
|
func initElectionTracker*(nodes: seq[RaftNodeId]): RaftElectionTracker =
|
||||||
|
var r = RaftElectionTracker()
|
||||||
|
r.all = nodes
|
||||||
|
r.granted = 0
|
||||||
|
return r
|
||||||
|
|
||||||
|
func registerVote*(ret: var RaftElectionTracker, nodeId: RaftNodeId, granted: bool): bool =
|
||||||
|
if not ret.all.contains nodeId:
|
||||||
|
return false
|
||||||
|
|
||||||
|
if not ret.responded.contains nodeId:
|
||||||
|
ret.responded.add(nodeId)
|
||||||
|
ret.granted += 1
|
||||||
|
|
||||||
|
return true
|
||||||
|
|
||||||
|
func tallyVote*(ret: var RaftElectionTracker): RaftElectionResult =
|
||||||
|
let quorym = int(len(ret.all) / 2) + 1
|
||||||
|
if ret.granted > quorym:
|
||||||
|
return RaftElectionResult.Won
|
||||||
|
let unkown = len(ret.all) - len(ret.responded)
|
||||||
|
if ret.granted + unkown > quorym:
|
||||||
|
return RaftElectionResult.Unknown
|
||||||
|
else:
|
||||||
|
return RaftElectionResult.Lost
|
||||||
|
|
||||||
|
func initVotes*(nodes: seq[RaftNodeId]): RaftVotes =
|
||||||
|
var r = RaftVotes(voters: nodes, current: initElectionTracker(nodes))
|
||||||
|
return r
|
||||||
|
|
||||||
|
func registerVote*(rv: var RaftVotes, nodeId: RaftNodeId, granted: bool): bool =
|
||||||
|
# TODO: Add support for configuration
|
||||||
|
return rv.current.registerVote(nodeId, granted)
|
||||||
|
|
||||||
|
func tallyVote*(rv: var RaftVotes): RaftElectionResult =
|
||||||
|
# TODO: Add support for configuration
|
||||||
|
return rv.current.tallyVote()
|
||||||
|
|
||||||
|
|
||||||
func initFollowerProgressTracker*(follower: RaftNodeId, nextIndex: RaftLogIndex): RaftFollowerProgressTracker =
|
func initFollowerProgressTracker*(follower: RaftNodeId, nextIndex: RaftLogIndex): RaftFollowerProgressTracker =
|
||||||
return RaftFollowerProgressTracker(id: follower, nextIndex: nextIndex, matchIndex: 0, commitIndex: 0, replayedIndex: 0)
|
return RaftFollowerProgressTracker(id: follower, nextIndex: nextIndex, matchIndex: 0, commitIndex: 0, replayedIndex: 0)
|
||||||
|
|
||||||
|
@ -117,10 +221,16 @@ func lastIndex*(rf: RaftLog): RaftNodeTerm =
|
||||||
let idx = len(rf.logEntries)
|
let idx = len(rf.logEntries)
|
||||||
return rf.logEntries[idx - 1].index
|
return rf.logEntries[idx - 1].index
|
||||||
|
|
||||||
|
func nextIndex*(rf: RaftLog): int =
|
||||||
|
return rf.lastIndex + 1
|
||||||
|
|
||||||
func truncateUncomitted*(rf: var RaftLog, index: RaftLogIndex) =
|
func truncateUncomitted*(rf: var RaftLog, index: RaftLogIndex) =
|
||||||
# TODO: We should add support for configurations and snapshots
|
# TODO: We should add support for configurations and snapshots
|
||||||
rf.logEntries.delete(index..<len(rf.logEntries))
|
rf.logEntries.delete(index..<len(rf.logEntries))
|
||||||
|
|
||||||
|
func isUpToDate(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): bool =
|
||||||
|
return term > rf.lastTerm or (term == rf.lastTerm and index >= rf.lastIndex)
|
||||||
|
|
||||||
|
|
||||||
func getEntryByIndex(rf: RaftLog, index: RaftLogIndex): LogEntry =
|
func getEntryByIndex(rf: RaftLog, index: RaftLogIndex): LogEntry =
|
||||||
return rf.logEntries[index]
|
return rf.logEntries[index]
|
||||||
|
@ -138,25 +248,27 @@ func appendAsFollower*(rf: var RaftLog, entry: LogEntry) =
|
||||||
rf.logEntries.add(entry)
|
rf.logEntries.add(entry)
|
||||||
|
|
||||||
func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
|
func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
|
||||||
rf.appendAsLeader(LogEntry(term: term, index: index, data: data))
|
rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletCommand, command: data))
|
||||||
|
|
||||||
|
func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, empty: bool) =
|
||||||
|
rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletEmpty, empty: true))
|
||||||
|
|
||||||
func appendAsFollower*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
|
func appendAsFollower*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
|
||||||
rf.appendAsFollower(LogEntry(term: term, index: index, data: data))
|
rf.appendAsFollower(LogEntry(term: term, index: index, kind: rletCommand, command: data))
|
||||||
|
|
||||||
|
|
||||||
func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): bool =
|
func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): (bool, RaftNodeTerm) =
|
||||||
if len(rf.logEntries) == 0:
|
if len(rf.logEntries) == 0:
|
||||||
return true
|
return (true, 0)
|
||||||
# TODO: We should add support for snapshots
|
# TODO: We should add support for snapshots
|
||||||
if index > len(rf.logEntries):
|
if index > len(rf.logEntries):
|
||||||
# The follower doesn't have all etries
|
# The follower doesn't have all etries
|
||||||
return false
|
return (false, 0)
|
||||||
|
|
||||||
if rf.logEntries[index].term == term:
|
if rf.logEntries[index].term == term:
|
||||||
return true
|
return (true, 0)
|
||||||
else:
|
else:
|
||||||
return false
|
return (false, rf.logEntries[index].term)
|
||||||
|
|
||||||
func termForIndex*(rf: RaftLog, index: RaftLogIndex): RaftNodeTerm =
|
func termForIndex*(rf: RaftLog, index: RaftLogIndex): RaftNodeTerm =
|
||||||
# TODO: We should add support for snapshots
|
# TODO: We should add support for snapshots
|
||||||
|
@ -173,8 +285,16 @@ func initRaftStateMachine*(config: RaftStateMachineConfig): RaftStateMachine =
|
||||||
return st
|
return st
|
||||||
|
|
||||||
func sendTo*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) =
|
func sendTo*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) =
|
||||||
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: Append, appendRequest: request))
|
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.Append, appendRequest: request))
|
||||||
|
|
||||||
|
func sendTo*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendReplay) =
|
||||||
|
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendReplay, appendReplay: request))
|
||||||
|
|
||||||
|
func sendTo*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteRequest) =
|
||||||
|
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.Vote, voteRequest: request))
|
||||||
|
|
||||||
|
func sendTo*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteReplay) =
|
||||||
|
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReplay, voteReplay: request))
|
||||||
|
|
||||||
func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage =
|
func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage =
|
||||||
return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: Vote, voteRequest: RaftRpcVoteRequest())
|
return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: Vote, voteRequest: RaftRpcVoteRequest())
|
||||||
|
@ -220,11 +340,20 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage,currentTime: Time) =
|
||||||
else:
|
else:
|
||||||
sm.debug "Unhandle msg type"
|
sm.debug "Unhandle msg type"
|
||||||
|
|
||||||
|
|
||||||
func addEntry*(sm: var RaftStateMachine, entry: LogEntry) =
|
func addEntry*(sm: var RaftStateMachine, entry: LogEntry) =
|
||||||
if sm.state != RaftNodeState.rnsLeader:
|
if sm.state != RaftNodeState.rnsLeader:
|
||||||
sm.debug "Error: only the leader can handle new entries"
|
sm.debug "Error: only the leader can handle new entries"
|
||||||
sm.log.appendAsLeader(entry)
|
sm.log.appendAsLeader(entry)
|
||||||
|
|
||||||
|
func addEntry*(sm: var RaftStateMachine, command: Command) =
|
||||||
|
sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletCommand, command: command))
|
||||||
|
|
||||||
|
func addEntry*(sm: var RaftStateMachine, config: Config) =
|
||||||
|
sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletConfig, config: config))
|
||||||
|
|
||||||
|
func addEmptyEntry*(sm: var RaftStateMachine) =
|
||||||
|
sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletEmpty, empty: true))
|
||||||
|
|
||||||
func tick* (sm: var RaftStateMachine) =
|
func tick* (sm: var RaftStateMachine) =
|
||||||
sm.debug "TODO:implement tick"
|
sm.debug "TODO:implement tick"
|
||||||
|
@ -240,6 +369,139 @@ func poll*(sm: var RaftStateMachine): RaftStateMachineOutput =
|
||||||
|
|
||||||
|
|
||||||
func commit*(sm: var RaftStateMachine) =
|
func commit*(sm: var RaftStateMachine) =
|
||||||
if sm.state != RaftNodeState.rnsLeader
|
if sm.state != RaftNodeState.rnsLeader:
|
||||||
return
|
return
|
||||||
|
var new_index = sm.commitIndex
|
||||||
|
var next_index = sm.commitIndex + 1
|
||||||
|
while next_index < sm.log.lastIndex:
|
||||||
|
var replicationCnt = 0
|
||||||
|
for p in sm.leader.followersProgress:
|
||||||
|
if p.matchIndex > new_index:
|
||||||
|
replicationCnt += 1
|
||||||
|
if replicationCnt > len(sm.leader.followersProgress):
|
||||||
|
sm.output.committed.add(sm.log.getEntryByIndex(next_index))
|
||||||
|
sm.commitIndex += next_index;
|
||||||
|
next_index += 1
|
||||||
|
|
||||||
|
func find_follower_proggress_by_id(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] =
|
||||||
|
for follower in sm.leader.followersProgress:
|
||||||
|
if follower.id == id:
|
||||||
|
return some(follower)
|
||||||
|
return none(RaftFollowerProgressTracker)
|
||||||
|
|
||||||
|
|
||||||
|
func append_entry_reply*(sm: var RaftStateMachine, from_id: RaftNodeId, replay: RaftRpcAppendReplay) =
|
||||||
|
if sm.state != RaftNodeState.rnsLeader:
|
||||||
|
sm.debug "You can't append append replay to the follower"
|
||||||
|
return
|
||||||
|
var follower = sm.find_follower_proggress_by_id(from_id)
|
||||||
|
if not follower.isSome:
|
||||||
|
sm.debug "Can't find the follower"
|
||||||
|
return
|
||||||
|
follower.get().commitIndex = max(follower.get().commitIndex, replay.commitIndex)
|
||||||
|
case replay.result:
|
||||||
|
of RaftRpcCode.Accepted:
|
||||||
|
let lestIndex = replay.accepted.lastNewIndex
|
||||||
|
follower.get().accepted(lestIndex)
|
||||||
|
# TODO: add leader stepping down logic here
|
||||||
|
sm.commit()
|
||||||
|
if sm.state != RaftNodeState.rnsLeader:
|
||||||
|
return
|
||||||
|
of RaftRpcCode.Rejected:
|
||||||
|
if replay.rejected.nonMatchingIndex == 0 and replay.rejected.lastIdx == 0:
|
||||||
|
sm.replicateTo(follower.get())
|
||||||
|
follower.get().next_index = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1)
|
||||||
|
# if commit apply configuration that removes current follower
|
||||||
|
# we should take it again
|
||||||
|
var follower2 = sm.find_follower_proggress_by_id(from_id)
|
||||||
|
if follower2.isSome:
|
||||||
|
sm.replicate_to(follower2.get())
|
||||||
|
|
||||||
|
func advance_commit_idx(sm: var RaftStateMachine, leaderIdx: RaftLogIndex) =
|
||||||
|
let new_idx = min(leaderIdx, sm.log.lastIndex)
|
||||||
|
if new_idx > sm.commitIndex:
|
||||||
|
sm.commitIndex = new_idx
|
||||||
|
# TODO: signal the output for the update
|
||||||
|
|
||||||
|
|
||||||
|
func append_entry*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcAppendRequest) =
|
||||||
|
if sm.state != RaftNodeState.rnsFollower:
|
||||||
|
sm.debug "You can't append append request to the non follower"
|
||||||
|
return
|
||||||
|
let (match, term) = sm.log.matchTerm(request.previousLogIndex, request.previousTerm)
|
||||||
|
if not match:
|
||||||
|
let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex)
|
||||||
|
let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected)
|
||||||
|
sm.send_to(sm.myId, responce)
|
||||||
|
sm.debug "Reject to apply the entry"
|
||||||
|
# reject to apply
|
||||||
|
#send_to()
|
||||||
|
sm.log.appendAsFollower(request.entry)
|
||||||
|
sm.advance_commit_idx(request.commitIndex)
|
||||||
|
let accepted = RaftRpcAppendReplayAccepted(lastNewIndex: sm.log.lastIndex)
|
||||||
|
let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted)
|
||||||
|
sm.send_to(sm.myId, responce)
|
||||||
|
|
||||||
|
|
||||||
|
func request_vote*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteRequest, ) =
|
||||||
|
let canVote = sm.votedFor == from_id or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId()) or (request.isPrevote and request.currentTerm > sm.term)
|
||||||
|
if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm):
|
||||||
|
if not request.is_prevote:
|
||||||
|
# TODO: Update election time
|
||||||
|
sm.votedFor = from_id
|
||||||
|
|
||||||
|
let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true, isPrevote: request.is_prevote)
|
||||||
|
sm.send_to(sm.myId, responce)
|
||||||
|
else:
|
||||||
|
let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false, isPrevote: request.is_prevote)
|
||||||
|
sm.send_to(sm.myId, responce)
|
||||||
|
|
||||||
|
func becomeFollower*(sm: var RaftStateMachine, leaderId: RaftNodeId) =
|
||||||
|
if sm.myId == leaderId:
|
||||||
|
sm.debug "Can't be follower of itself"
|
||||||
|
sm.output.stateChange = sm.state != RaftNodeState.rnsFollower
|
||||||
|
sm.state = RaftNodeState.rnsFollower
|
||||||
|
sm.follower = FollowerState(leader: leaderId)
|
||||||
|
if leaderId != RaftnodeId():
|
||||||
|
sm.pingLeader = false
|
||||||
|
# TODO: Update last election time
|
||||||
|
|
||||||
|
func becomeLeader*(sm: var RaftStateMachine) =
|
||||||
|
if sm.state == RaftNodeState.rnsLeader:
|
||||||
|
sm.debug "The leader can't become leader second time"
|
||||||
|
return
|
||||||
|
|
||||||
|
sm.output.stateChange = true
|
||||||
|
sm.state = RaftnodeState.rnsLeader
|
||||||
|
sm.leader = LeaderState()
|
||||||
|
|
||||||
|
sm.pingLeader = false
|
||||||
|
#TODO: Update last election time
|
||||||
|
|
||||||
|
#TODO: setup the tracket
|
||||||
|
|
||||||
|
sm.addEmptyEntry()
|
||||||
|
return
|
||||||
|
func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) =
|
||||||
|
#TODO: implement
|
||||||
|
return
|
||||||
|
|
||||||
|
func request_vote_reply*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteReplay) =
|
||||||
|
if sm.state != RaftNodeState.rnsCandidate:
|
||||||
|
sm.debug "Non candidate can't handle votes"
|
||||||
|
return
|
||||||
|
discard sm.candidate.votes.registerVote(from_id, request.voteGranted)
|
||||||
|
|
||||||
|
case sm.candidate.votes.tallyVote:
|
||||||
|
of RaftElectionResult.Unknown:
|
||||||
|
return
|
||||||
|
of RaftElectionResult.Won:
|
||||||
|
sm.debug "Win election"
|
||||||
|
if (sm.candidate.isPrevote):
|
||||||
|
sm.becomeCandidate(false)
|
||||||
|
else:
|
||||||
|
sm.becomeLeader()
|
||||||
|
of RaftElectionResult.Lost:
|
||||||
|
sm.debug "Lost election"
|
||||||
|
sm.becomeFollower(RaftNodeId())
|
||||||
|
# TODO: become foller
|
||||||
|
|
Loading…
Reference in New Issue