Fix many bugs

This commit is contained in:
Marto 2024-02-05 19:27:28 +02:00
parent 5ab4fd1a77
commit f5a7813572
2 changed files with 38 additions and 44 deletions

View File

@ -163,7 +163,7 @@ type
progress: RaftFollowerProgress
current: seq[RaftNodeId]
RaftFollowerProgressTracker* = object
RaftFollowerProgressTracker* = ref object
id: RaftNodeId
nextIndex: RaftLogIndex
# Index of the highest log entry known to be replicated to this server.
@ -347,6 +347,7 @@ func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): (bool, Ra
func termForIndex*(rf: RaftLog, index: RaftLogIndex): Option[RaftNodeTerm] =
# TODO: snapshot support
assert rf.logEntries.len > index
if rf.logEntries.len > 0 and index >= rf.firstIndex:
return some(rf.logEntries[index].term)
return none(RaftNodeTerm)
@ -376,10 +377,10 @@ proc initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftL
func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] =
for follower in sm.leader.tracker.progress:
if follower.id == id:
return some(follower)
return none(RaftFollowerProgressTracker)
for follower in sm.leader.tracker.progress:
if follower.id == id:
return some(follower)
return none(RaftFollowerProgressTracker)
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.Append, appendRequest: request))
@ -394,22 +395,25 @@ func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteR
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReplay, voteReplay: request))
func sendTo[MsgType](sm: var RaftStateMachine, id: RaftNodeId, request: MsgType) =
sm.debug "Sent to" & $id & $ request
if sm.state.isLeader:
var follower = sm.findFollowerProggressById(id)
if follower.isSome:
follower.get().lastMessageAt = sm.timeNow
else:
sm.debug "Follower not found"
sm.debug "Follower not found: " & $id
sm.debug $sm.leader
sm.sendToImpl(id, request)
func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage =
return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: Vote, voteRequest: RaftRpcVoteRequest())
func replicateTo*(sm: var RaftStateMachine, follower: var RaftFollowerProgressTracker) =
func replicateTo*(sm: var RaftStateMachine, follower: RaftFollowerProgressTracker) =
if follower.nextIndex > sm.log.lastIndex:
return
var previousTerm = sm.log.termForIndex(follower.nextIndex - 1)
sm.debug "replicate to " & $follower[]
if previousTerm.isSome:
let request = RaftRpcAppendRequest(
previousTerm: previousTerm.get(),
@ -427,6 +431,7 @@ func replicateTo*(sm: var RaftStateMachine, follower: var RaftFollowerProgressTr
entries: @[sm.log.getEntryByIndex(follower.nextIndex)])
follower.nextIndex += 1
sm.sendTo(follower.id, request)
sm.debug "exit" & $follower[]
func replicate*(sm: var RaftStateMachine) =
if sm.state.isLeader:
@ -501,14 +506,7 @@ func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) =
func hearthbeat(sm: var RaftStateMachine, follower: var RaftFollowerProgressTracker) =
sm.debug "hearthbear" & $follower.nextIndex
var previousTerm = sm.log.termForIndex(follower.nextIndex - 1)
if previousTerm.isSome:
let request = RaftRpcAppendRequest(
previousTerm: previousTerm.get(),
previousLogIndex: follower.nextIndex - 1,
commitIndex: sm.commitIndex,
entries: @[])
sm.sendTo(follower.id, request)
sm.addEntry(Empty())
func tickLeader*(sm: var RaftStateMachine, now: times.DateTime) =
sm.timeNow = now
@ -521,14 +519,12 @@ func tickLeader*(sm: var RaftStateMachine, now: times.DateTime) =
if not sm.state.isLeader:
sm.debug "tick_leader can be called only on the leader"
return
sm.debug "Tick leader"
for followerIndex in 0..<sm.leader.tracker.progress.len:
var follower = sm.leader.tracker.progress[followerIndex]
if sm.myId != follower.id:
sm.debug "Tick leader" & $follower.matchIndex & $sm.log.lastIndex & $sm.log.lastIndex
if follower.matchIndex < sm.log.lastIndex or follower.commitIndex < sm.commitIndex:
sm.replicateTo(follower)
sm.debug "replicate to" & $follower
#sm.debug "replicate to" & $follower
#sm.debug $(now - follower.lastMessageAt)
if now - follower.lastMessageAt > sm.heartbeatTime:
@ -537,7 +533,7 @@ func tickLeader*(sm: var RaftStateMachine, now: times.DateTime) =
# TODO: implement step down logic
func tick*(sm: var RaftStateMachine, now: times.DateTime) =
sm.debug "Time since last update: " & $(now - sm.timeNow).inMilliseconds & "ms time until election:" & $(sm.randomizedElectionTime - (sm.timeNow - sm.lastElectionTime)).inMilliseconds & "ms"
sm.debug "Term: " & $sm.term & " commit idx " & $sm.commitIndex & " Time since last update: " & $(now - sm.timeNow).inMilliseconds & "ms time until election:" & $(sm.randomizedElectionTime - (sm.timeNow - sm.lastElectionTime)).inMilliseconds & "ms"
sm.timeNow = now
if sm.state.isLeader:
sm.tickLeader(now);
@ -569,10 +565,12 @@ func commit*(sm: var RaftStateMachine) =
for p in sm.leader.tracker.progress:
if p.matchIndex > new_index:
replicationCnt += 1
if replicationCnt > sm.leader.tracker.progress.len:
if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1):
sm.output.committed.add(sm.log.getEntryByIndex(next_index))
sm.commitIndex += next_index;
next_index += 1
else:
break
@ -587,8 +585,9 @@ func appendEntryReplay*(sm: var RaftStateMachine, from_id: RaftNodeId, replay: R
follower.get().commitIndex = max(follower.get().commitIndex, replay.commitIndex)
case replay.result:
of RaftRpcCode.Accepted:
let lestIndex = replay.accepted.lastNewIndex
follower.get().accepted(lestIndex)
let lastIndex = replay.accepted.lastNewIndex
sm.debug "Accpeted" & $from_id & " " & $lastIndex
follower.get().accepted(lastIndex)
# TODO: add leader stepping down logic here
sm.commit()
if not sm.state.isLeader:
@ -610,7 +609,7 @@ func advanceCommitIdx(sm: var RaftStateMachine, leaderIdx: RaftLogIndex) =
# TODO: signal the output for the update
func appendEntry*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcAppendRequest) =
func appendEntry*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcAppendRequest) =
if not sm.state.isFollower:
sm.debug "You can't append append request to the non follower"
return
@ -618,29 +617,27 @@ func appendEntry*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRp
if not match:
let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex)
let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected)
sm.sendTo(sm.myId, responce)
sm.sendTo(fromId, responce)
sm.debug "Reject to apply the entry"
# reject to apply
#sentTo()
for entry in request.entries:
sm.log.appendAsFollower(entry)
sm.advanceCommitIdx(request.commitIndex)
let accepted = RaftRpcAppendReplayAccepted(lastNewIndex: sm.log.lastIndex)
let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted)
sm.sendTo(sm.myId, responce)
sm.sendTo(fromId, responce)
func requestVote*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteRequest, ) =
let canVote = sm.votedFor == from_id or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId()) or (request.isPrevote and request.currentTerm > sm.term)
func requestVote*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteRequest) =
let canVote = sm.votedFor == fromId or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId()) or (request.isPrevote and request.currentTerm > sm.term)
if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm):
if not request.is_prevote:
# TODO: Update election time
sm.votedFor = from_id
sm.votedFor = fromId
let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true, isPrevote: request.is_prevote)
sm.sendTo(from_id, responce)
sm.sendTo(fromId, responce)
else:
let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false, isPrevote: request.is_prevote)
sm.sendTo(from_id, responce)
let responce: RaftRpcVoteReplay = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false, isPrevote: request.is_prevote)
sm.sendTo(fromId, responce)
func requestVoteReply*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteReplay) =
@ -683,16 +680,13 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime
sm.debug "Ignore message with lower term"
else:
# TODO: add also snapshot
if msg.kind == RaftRpcMessageType.Append:
if sm.state.isCandidate:
sm.becomeFollower(msg.sender)
elif sm.state.isFollower:
sm.follower.leader = msg.sender
# TODO: fix time
# TODO: fix time
if sm.state.isCandidate:
if msg.kind == RaftRpcMessageType.Vote:
sm.requestVote(msg.sender, msg.voteRequest)
@ -706,10 +700,10 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime
sm.lastElectionTime = now
if msg.kind == RaftRpcMessageType.Append:
sm.appendEntry(msg.sender, msg.appendRequest)
if msg.kind == RaftRpcMessageType.Vote:
elif msg.kind == RaftRpcMessageType.Vote:
sm.requestVote(msg.sender, msg.voteRequest)
else:
sm.debug "Follower ignore message"
sm.debug "Follower ignore message" & $msg
# TODO: imelement the rest of the state transitions
elif sm.state.isLeader:
if msg.kind == RaftRpcMessageType.Append:

View File

@ -201,7 +201,7 @@ proc consensusstatemachineMain*() =
check output.committed.len == 0
check output.messages.len == 0
check sm.state.isFollower
timeNow += 300.milliseconds
timeNow += 500.milliseconds
sm.tick(timeNow)
output = sm.poll()
check output.logEntries.len == 0
@ -216,7 +216,7 @@ proc consensusstatemachineMain*() =
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 300.milliseconds
timeNow += 1000.milliseconds
sm.tick(timeNow)
var output = sm.poll()
check output.logEntries.len == 0
@ -238,7 +238,7 @@ proc consensusstatemachineMain*() =
var timeNow = times.now()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 301.milliseconds
timeNow += 601.milliseconds
sm.tick(timeNow)
check sm.state.isCandidate
var output = sm.poll()
@ -330,7 +330,7 @@ proc consensusstatemachineMain*() =
var timeNow = times.now()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 301.milliseconds
timeNow += 501.milliseconds
sm.tick(timeNow)
check sm.state.isCandidate
var output = sm.poll()
@ -363,7 +363,7 @@ proc consensusstatemachineMain*() =
var cluster = createCluster(test_ids_3, times.now())
var timeNow = times.now()
var leader: RaftnodeId
for i in 0..<500:
for i in 0..<105:
timeNow += 5.milliseconds
cluster.advance(timeNow)
var maybeLeader = cluster.getLeader()