Refactoring

This commit is contained in:
Marto 2024-02-07 15:54:25 +02:00
parent 2ecc344aa1
commit 47e1e36294
3 changed files with 43 additions and 59 deletions

View File

@ -20,9 +20,9 @@ randomize()
type
RaftRpcMessageType* = enum
Vote = 0,
VoteRequest = 0,
VoteReplay = 1,
Append = 2,
AppendRequest = 2,
AppendReplay = 3
RaftRpcCode* = enum
@ -54,20 +54,17 @@ type
currentTerm*: RaftNodeTerm
lastLogIndex*: RaftLogIndex
lastLogTerm*: RaftNodeTerm
isPrevote*: bool
force*: bool
RaftRpcVoteReplay* = object
currentTerm*: RaftNodeTerm
voteGranted*: bool
isPrevote*: bool
LeaderState* = object
tracker: RaftTracker
CandidateState* = object
votes: RaftVotes
isPrevote: bool
FollowerState* = object
leader: RaftNodeId
@ -77,9 +74,9 @@ type
sender*: RaftNodeId
receiver*: RaftNodeId
case kind*: RaftRpcMessageType
of Vote: voteRequest*: RaftRpcVoteRequest
of VoteRequest: voteRequest*: RaftRpcVoteRequest
of VoteReplay: voteReplay*: RaftRpcVoteReplay
of Append: appendRequest*: RaftRpcAppendRequest
of AppendRequest: appendRequest*: RaftRpcAppendRequest
of AppendReplay: appendReplay*: RaftRpcAppendReplay
RaftStateMachineOutput* = object
@ -176,13 +173,13 @@ func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option
return sm.leader.tracker.find(id)
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.Append, appendRequest: request))
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendRequest, appendRequest: request))
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendReplay) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendReplay, appendReplay: request))
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteRequest) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.Vote, voteRequest: request))
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteRequest, voteRequest: request))
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteReplay) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReplay, voteReplay: request))
@ -199,7 +196,7 @@ func sendTo[MsgType](sm: var RaftStateMachine, id: RaftNodeId, request: MsgType)
sm.sendToImpl(id, request)
func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage =
return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: Vote, voteRequest: RaftRpcVoteRequest())
return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: VoteRequest, voteRequest: RaftRpcVoteRequest())
func replicateTo*(sm: var RaftStateMachine, follower: RaftFollowerProgressTracker) =
if follower.nextIndex > sm.log.lastIndex:
@ -268,7 +265,7 @@ func becomeLeader*(sm: var RaftStateMachine) =
#TODO: Update last election time
return
func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) =
func becomeCandidate*(sm: var RaftStateMachine) =
#TODO: implement
if not sm.state.isCandidate:
sm.output.stateChange = true
@ -285,14 +282,10 @@ func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) =
sm.votedFor = nodeId
continue
let request = RaftRpcVoteRequest(currentTerm: sm.term, lastLogIndex: sm.log.lastIndex, lastLogTerm: sm.log.lastTerm, isPrevote: isPrevote, force: false)
let request = RaftRpcVoteRequest(currentTerm: sm.term, lastLogIndex: sm.log.lastIndex, lastLogTerm: sm.log.lastTerm, force: false)
sm.sendTo(nodeId, request)
sm.debug "Elecation won" & $(sm.candidate.votes) & $sm.myId
if sm.candidate.votes.tallyVote == RaftElectionResult.Won:
if isPrevote:
sm.becomeCandidate(false)
else:
sm.becomeLeader()
return
@ -310,7 +303,7 @@ func tickLeader*(sm: var RaftStateMachine, now: times.DateTime) =
sm.lastElectionTime = now
if not sm.state.isLeader:
sm.debug "tick_leader can be called only on the leader"
sm.debug "tickLeader can be called only on the leader"
return
for followerIndex in 0..<sm.leader.tracker.progress.len:
var follower = sm.leader.tracker.progress[followerIndex]
@ -332,7 +325,7 @@ func tick*(sm: var RaftStateMachine, now: times.DateTime) =
sm.tickLeader(now);
elif sm.state.isFollower and sm.timeNow - sm.lastElectionTime > sm.randomizedElectionTime:
sm.debug "Become candidate"
sm.becomeCandidate(false)
sm.becomeCandidate()
func poll*(sm: var RaftStateMachine): RaftStateMachineOutput =
@ -351,17 +344,17 @@ func poll*(sm: var RaftStateMachine): RaftStateMachineOutput =
func commit*(sm: var RaftStateMachine) =
if not sm.state.isLeader:
return
var new_index = sm.commitIndex
var next_index = sm.commitIndex + 1
while next_index < sm.log.lastIndex:
var newIndex = sm.commitIndex
var nextIndex = sm.commitIndex + 1
while nextIndex < sm.log.lastIndex:
var replicationCnt = 0
for p in sm.leader.tracker.progress:
if p.matchIndex > new_index:
if p.matchIndex > newIndex:
replicationCnt += 1
if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1):
sm.output.committed.add(sm.log.getEntryByIndex(next_index))
sm.commitIndex += next_index;
next_index += 1
sm.output.committed.add(sm.log.getEntryByIndex(nextIndex))
sm.commitIndex += nextIndex;
nextIndex += 1
else:
break
@ -386,7 +379,7 @@ func appendEntryReplay*(sm: var RaftStateMachine, fromId: RaftNodeId, replay: Ra
of RaftRpcCode.Rejected:
if replay.rejected.nonMatchingIndex == 0 and replay.rejected.lastIdx == 0:
sm.replicateTo(follower.get())
follower.get().next_index = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1)
follower.get().nextIndex = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1)
# if commit apply configuration that removes current follower
# we should take it again
var follower2 = sm.findFollowerProggressById(fromId)
@ -394,9 +387,9 @@ func appendEntryReplay*(sm: var RaftStateMachine, fromId: RaftNodeId, replay: Ra
sm.replicateTo(follower2.get())
func advanceCommitIdx(sm: var RaftStateMachine, leaderIdx: RaftLogIndex) =
let new_idx = min(leaderIdx, sm.log.lastIndex)
if new_idx > sm.commitIndex:
sm.commitIndex = new_idx
let newIdx = min(leaderIdx, sm.log.lastIndex)
if newIdx > sm.commitIndex:
sm.commitIndex = newIdx
# TODO: signal the output for the update
@ -418,52 +411,44 @@ func appendEntry*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpc
sm.sendTo(fromId, responce)
func requestVote*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteRequest) =
let canVote = sm.votedFor == fromId or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId()) or (request.isPrevote and request.currentTerm > sm.term)
let canVote = sm.votedFor == fromId or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId())
if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm):
if not request.is_prevote:
# TODO: Update election time
sm.votedFor = fromId
let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true, isPrevote: request.is_prevote)
let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true)
sm.sendTo(fromId, responce)
else:
let responce: RaftRpcVoteReplay = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false, isPrevote: request.is_prevote)
let responce: RaftRpcVoteReplay = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false)
sm.sendTo(fromId, responce)
func requestVoteReply*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteReplay) =
func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteReplay) =
if not sm.state.isCandidate:
sm.debug "Non candidate can't handle votes"
return
discard sm.candidate.votes.registerVote(from_id, request.voteGranted)
discard sm.candidate.votes.registerVote(fromId, request.voteGranted)
case sm.candidate.votes.tallyVote:
of RaftElectionResult.Unknown:
return
of RaftElectionResult.Won:
sm.debug "Win election"
if (sm.candidate.isPrevote):
sm.becomeCandidate(false)
else:
sm.becomeLeader()
sm.becomeLeader()
of RaftElectionResult.Lost:
sm.debug "Lost election"
sm.becomeFollower(RaftNodeId())
# TODO: become foller
func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime) =
#sm.debug $msg
if msg.currentTerm > sm.term:
sm.debug "Current node is behind"
var leaderId = RaftnodeId()
if msg.kind == RaftRpcMessageType.Append:
if msg.kind == RaftRpcMessageType.AppendRequest:
leaderId = msg.sender
sm.becomeFollower(leaderId)
# TODO: implement pre vote
sm.term = msg.currentTerm
sm.votedFor = RaftnodeId()
elif msg.currentTerm < sm.term:
if msg.kind == RaftRpcMessageType.Append:
if msg.kind == RaftRpcMessageType.AppendRequest:
# Instruct leader to step down
let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: 0, lastIdx: sm.log.lastIndex)
let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected)
@ -472,14 +457,14 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime
sm.debug "Ignore message with lower term"
else:
# TODO: add also snapshot
if msg.kind == RaftRpcMessageType.Append:
if msg.kind == RaftRpcMessageType.AppendRequest:
if sm.state.isCandidate:
sm.becomeFollower(msg.sender)
elif sm.state.isFollower:
sm.follower.leader = msg.sender
# TODO: fix time
if sm.state.isCandidate:
if msg.kind == RaftRpcMessageType.Vote:
if msg.kind == RaftRpcMessageType.VoteRequest:
sm.requestVote(msg.sender, msg.voteRequest)
elif msg.kind == RaftRpcMessageType.VoteReplay:
sm.debug "Apply vote"
@ -489,19 +474,19 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime
elif sm.state.isFollower:
if msg.sender == sm.follower.leader:
sm.lastElectionTime = now
if msg.kind == RaftRpcMessageType.Append:
if msg.kind == RaftRpcMessageType.AppendRequest:
sm.appendEntry(msg.sender, msg.appendRequest)
elif msg.kind == RaftRpcMessageType.Vote:
elif msg.kind == RaftRpcMessageType.VoteRequest:
sm.requestVote(msg.sender, msg.voteRequest)
else:
sm.debug "Follower ignore message" & $msg
# TODO: imelement the rest of the state transitions
elif sm.state.isLeader:
if msg.kind == RaftRpcMessageType.Append:
if msg.kind == RaftRpcMessageType.AppendRequest:
sm.debug "Ignore message leader append his entries directly"
elif msg.kind == RaftRpcMessageType.AppendReplay:
sm.appendEntryReplay(msg.sender, msg.appendReplay)
elif msg.kind == RaftRpcMessageType.Vote:
elif msg.kind == RaftRpcMessageType.VoteRequest:
sm.requestVote(msg.sender, msg.voteRequest)
else:
sm.debug "Leader ignore message"

View File

@ -67,7 +67,6 @@ func appendAsFollower*(rf: var RaftLog, entry: LogEntry) =
if entry.index <= currentIdx:
# TODO: The indexing hold only if we keep all entries in memory
# we should change it when we add support for snapshots
if entry.index >= rf.firstIndex or entry.term != rf.getEntryByIndex(entry.index).term:
rf.truncateUncomitted(entry.index)
rf.logEntries.add(entry)

View File

@ -238,7 +238,7 @@ proc consensusstatemachineMain*() =
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true, isPrevote: false)
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
@ -250,7 +250,7 @@ proc consensusstatemachineMain*() =
# Older messages should be ignored
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: (output.term - 1), voteGranted: true, isPrevote: false)
let voteRaplay = RaftRpcVoteReplay(currentTerm: (output.term - 1), voteGranted: true)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
sm.advance(msg, timeNow)
output = sm.poll()
@ -269,7 +269,7 @@ proc consensusstatemachineMain*() =
output = sm.poll()
let entry = LogEntry(term: (output.term + 1), index: 101, kind: RaftLogEntryType.rletEmpty, empty: true)
let appendRequest = RaftRpcAppendRequest(previousTerm: (output.term + 1), previousLogIndex: 100, commitIndex: 99, entries: @[entry])
let msg = RaftRpcMessage(currentTerm: (output.term + 1), sender: id2, receiver:id1, kind: RaftRpcMessageType.Append, appendRequest: appendRequest)
let msg = RaftRpcMessage(currentTerm: (output.term + 1), sender: id2, receiver:id1, kind: RaftRpcMessageType.AppendRequest, appendRequest: appendRequest)
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == true
@ -292,7 +292,7 @@ proc consensusstatemachineMain*() =
check output.votedFor.get() == mainNodeId
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false, isPrevote: false)
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
@ -302,7 +302,7 @@ proc consensusstatemachineMain*() =
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false, isPrevote: false)
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
@ -329,7 +329,7 @@ proc consensusstatemachineMain*() =
check output.votedFor.get() == mainNodeId
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false, isPrevote: false)
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
@ -339,7 +339,7 @@ proc consensusstatemachineMain*() =
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true, isPrevote: false)
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)