Fix many bugs and improve the election process

This commit is contained in:
Marto 2024-02-01 19:44:07 +02:00
parent 3cde7daebd
commit 5ab4fd1a77
2 changed files with 168 additions and 55 deletions

View File

@ -152,6 +152,7 @@ type
randomizedElectionTime: times.Duration
heartbeatTime: times.Duration
timeNow: times.DateTime
startTime: times.DateTime
electionTimeout: times.Duration
state*: RaftStateMachineState
@ -274,6 +275,7 @@ func tallyVote*(rv: var RaftVotes): RaftElectionResult =
func initRaftLog*(firstIndex: RaftLogIndex): RaftLog =
var log = RaftLog()
assert firstIndex > 0
log.firstIndex = firstIndex
return log
@ -295,7 +297,9 @@ func nextIndex*(rf: RaftLog): int =
func truncateUncomitted*(rf: var RaftLog, index: RaftLogIndex) =
# TODO: We should add support for configurations and snapshots
rf.logEntries.delete(index..<len(rf.logEntries))
if rf.logEntries.len == 0:
return
rf.logEntries.delete((index - rf.firstIndex)..<len(rf.logEntries))
func isUpToDate(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): bool =
return term > rf.lastTerm or (term == rf.lastTerm and index >= rf.lastIndex)
@ -307,11 +311,13 @@ func appendAsLeader(rf: var RaftLog, entry: LogEntry) =
rf.logEntries.add(entry)
func appendAsFollower*(rf: var RaftLog, entry: LogEntry) =
let currentIdx = rf.lastIndex()
assert entry.index > 0
let currentIdx = rf.lastIndex
if entry.index <= currentIdx:
# TODO: The indexing hold only if we keep all entries in memory
# we should change it when we add support for snapshots
if rf.logEntries.len > 0 and entry.term != rf.getEntryByIndex(entry.index).term:
if entry.index >= rf.firstIndex or entry.term != rf.getEntryByIndex(entry.index).term:
rf.truncateUncomitted(entry.index)
rf.logEntries.add(entry)
@ -333,10 +339,11 @@ func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): (bool, Ra
# The follower doesn't have all etries
return (false, 0)
if rf.logEntries[index].term == term:
let i = index - rf.firstIndex
if rf.logEntries[i].term == term:
return (true, 0)
else:
return (false, rf.logEntries[index].term)
return (false, rf.logEntries[i].term)
func termForIndex*(rf: RaftLog, index: RaftLogIndex): Option[RaftNodeTerm] =
# TODO: snapshot support
@ -345,11 +352,11 @@ func termForIndex*(rf: RaftLog, index: RaftLogIndex): Option[RaftNodeTerm] =
return none(RaftNodeTerm)
func debug*(sm: var RaftStateMachine, log: string) =
sm.output.debugLogs.add(log)
sm.output.debugLogs.add("[" & $(sm.timeNow - sm.startTime).inMilliseconds & "ms] [" & (($sm.myId)[0..7]) & "...] [" & $sm.state.state & "]: " & log)
proc resetElectionTimeout*(sm: var RaftStateMachine) =
# TODO actually pick random time
sm.randomizedElectionTime = sm.electionTimeout + times.initDuration(milliseconds = 100 + rand(100))
sm.randomizedElectionTime = sm.electionTimeout + times.initDuration(milliseconds = 100 + rand(200))
proc initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftLog, commitIndex: RaftLogIndex, config: RaftConfig, now: times.DateTime): RaftStateMachine =
var sm = RaftStateMachine()
@ -360,6 +367,7 @@ proc initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftL
sm.config = config
sm.lastElectionTime = now
sm.timeNow = now
sm.startTime = now
sm.myId = id
sm.electionTimeout = times.initDuration(milliseconds = 100)
sm.heartbeatTime = times.initDuration(milliseconds = 50)
@ -398,7 +406,7 @@ func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage =
return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: Vote, voteRequest: RaftRpcVoteRequest())
func replicateTo*(sm: var RaftStateMachine, follower: var RaftFollowerProgressTracker) =
if follower.nextIndex > sm.log.lastIndex():
if follower.nextIndex > sm.log.lastIndex:
return
var previousTerm = sm.log.termForIndex(follower.nextIndex - 1)
@ -409,7 +417,15 @@ func replicateTo*(sm: var RaftStateMachine, follower: var RaftFollowerProgressTr
commitIndex: sm.commitIndex,
entries: @[sm.log.getEntryByIndex(follower.nextIndex)])
follower.nextIndex += 1
sm.sendTo(follower.id, request)
else:
# TODO: we add support for snapshots
let request = RaftRpcAppendRequest(
previousTerm: 0,
previousLogIndex: 1,
commitIndex: sm.commitIndex,
entries: @[sm.log.getEntryByIndex(follower.nextIndex)])
follower.nextIndex += 1
sm.sendTo(follower.id, request)
func replicate*(sm: var RaftStateMachine) =
@ -448,13 +464,10 @@ func becomeLeader*(sm: var RaftStateMachine) =
sm.output.stateChange = true
sm.state = RaftStateMachineState(state: RaftnodeState.rnsLeader, leader: LeaderState())
sm.addEntry(Empty())
sm.leader.tracker = initTracker(sm.config, sm.log.lastIndex, sm.timeNow)
sm.pingLeader = false
#TODO: Update last election time
#TODO: setup the tracket
sm.addEntry(Empty())
#TODO: Update last election time
return
func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) =
@ -487,7 +500,7 @@ func becomeCandidate*(sm: var RaftStateMachine, isPrevote: bool) =
return
func hearthbeat(sm: var RaftStateMachine, follower: var RaftFollowerProgressTracker) =
sm.debug "hearthbear" & $follower.nextIndex
var previousTerm = sm.log.termForIndex(follower.nextIndex - 1)
if previousTerm.isSome:
let request = RaftRpcAppendRequest(
@ -503,22 +516,28 @@ func tickLeader*(sm: var RaftStateMachine, now: times.DateTime) =
# if sm.lastElectionTime - sm.timeNow > sm.electionTimeout:
# sm.becomeFollower(RaftnodeId())
# return
sm.lastElectionTime = now
if not sm.state.isLeader:
sm.debug "tick_leader can be called only on the leader"
return
sm.debug "Tick leader"
for followerIndex in 0..<sm.leader.tracker.progress.len:
var follower = sm.leader.tracker.progress[followerIndex]
if sm.myId != follower.id:
sm.debug "Tick leader" & $follower.matchIndex & $sm.log.lastIndex & $sm.log.lastIndex
if follower.matchIndex < sm.log.lastIndex or follower.commitIndex < sm.commitIndex:
sm.replicateTo(follower)
sm.debug "replicate to" & $follower
#sm.debug $(now - follower.lastMessageAt)
if now - follower.lastMessageAt > sm.heartbeatTime:
sm.debug "heartbeat"
sm.hearthbeat(follower)
# TODO: implement step down logic
func tick*(sm: var RaftStateMachine, now: times.DateTime) =
sm.debug "Time since last update: " & $(now - sm.timeNow)
sm.debug "Time since last update: " & $(now - sm.timeNow).inMilliseconds & "ms time until election:" & $(sm.randomizedElectionTime - (sm.timeNow - sm.lastElectionTime)).inMilliseconds & "ms"
sm.timeNow = now
if sm.state.isLeader:
sm.tickLeader(now);
@ -618,10 +637,10 @@ func requestVote*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRp
sm.votedFor = from_id
let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true, isPrevote: request.is_prevote)
sm.sendTo(sm.myId, responce)
sm.sendTo(from_id, responce)
else:
let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false, isPrevote: request.is_prevote)
sm.sendTo(sm.myId, responce)
sm.sendTo(from_id, responce)
func requestVoteReply*(sm: var RaftStateMachine, from_id: RaftNodeId, request: RaftRpcVoteReplay) =
@ -645,6 +664,7 @@ func requestVoteReply*(sm: var RaftStateMachine, from_id: RaftNodeId, request: R
# TODO: become foller
func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime) =
#sm.debug $msg
if msg.currentTerm > sm.term:
sm.debug "Current node is behind"
var leaderId = RaftnodeId()
@ -663,6 +683,7 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime
sm.debug "Ignore message with lower term"
else:
# TODO: add also snapshot
if msg.kind == RaftRpcMessageType.Append:
if sm.state.isCandidate:
@ -670,7 +691,7 @@ func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime
elif sm.state.isFollower:
sm.follower.leader = msg.sender
# TODO: fix time
#lastElectionTime = now
if sm.state.isCandidate:
if msg.kind == RaftRpcMessageType.Vote:

View File

@ -17,7 +17,6 @@ import tables
type
TestCluster* = object
nodes: Table[RaftnodeId, RaftStateMachine]
messages: Table[RaftnodeId, seq[RaftRpcMessage]]
var test_ids_3 = @[
RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")),
@ -38,34 +37,31 @@ func createConfigFromIds(ids: seq[RaftnodeId]): RaftConfig =
proc createCluster(ids: seq[RaftnodeId], now: times.DateTime) : TestCluster =
var config = createConfigFromIds(ids)
var cluster = TestCluster()
cluster.messages = initTable[RaftnodeId, seq[RaftRpcMessage]]()
cluster.nodes = initTable[RaftnodeId, RaftStateMachine]()
for i in 0..<config.currentSet.len:
let id = config.currentSet[i]
var log = initRaftLog(0)
var log = initRaftLog(1)
var node = initRaftStateMachine(id, 0, log, 0, config, now)
cluster.nodes[id] = node
cluster.messages[id] = @[]
return cluster
func routeMessages(tc: var TestCluster, now: times.DateTime) =
for id, queue in tc.messages:
for msg in queue:
tc.nodes[msg.receiver].advance(msg, now)
tc.messages[id] = @[]
func advance(tc: var TestCluster, now: times.DateTime) =
var outputs = initTable[RaftnodeId, RaftStateMachineOutput]()
proc advance(tc: var TestCluster, now: times.DateTime) =
for id, node in tc.nodes:
tc.nodes[id].tick(now)
outputs[id] = tc.nodes[id].poll()
for id, output in outputs:
var output = tc.nodes[id].poll()
for msg in output.debugLogs:
echo $msg
for msg in output.messages:
tc.messages[id].add(msg)
echo "rpc:" & $msg
tc.nodes[msg.receiver].advance(msg, now)
func getLeader(tc: TestCluster): Option[RaftStateMachine] =
for id, node in tc.nodes:
if node.state.isLeader:
return some(node)
return none(RaftStateMachine)
proc consensusstatemachineMain*() =
@ -98,37 +94,44 @@ proc consensusstatemachineMain*() =
suite "Entry log tests":
test "append entry as leadeer":
var log = initRaftLog(0)
var log = initRaftLog(1)
log.appendAsLeader(0, 1, Command())
log.appendAsLeader(0, 2, Command())
check log.lastTerm() == 0
log.appendAsLeader(1, 2, Command())
check log.lastTerm() == 1
test "append entry as follower":
var log = initRaftLog(0)
log.appendAsFollower(0, 0, Command())
var log = initRaftLog(1)
log.appendAsFollower(0, 1, Command())
check log.lastTerm() == 0
check log.lastIndex() == 0
check log.lastIndex() == 1
check log.entriesCount == 1
log.appendAsFollower(0, 1, Command())
check log.lastTerm() == 0
check log.lastIndex() == 1
check log.entriesCount == 2
log.appendAsFollower(1, 1, Command())
check log.entriesCount == 1
discard log.matchTerm(1, 1)
log.appendAsFollower(1, 2, Command())
check log.lastTerm() == 1
check log.lastIndex() == 1
check log.lastIndex() == 2
check log.entriesCount == 2
log.appendAsFollower(2, 0, Command())
log.appendAsFollower(1, 3, Command())
check log.lastTerm() == 1
check log.lastIndex() == 3
check log.entriesCount == 3
log.appendAsFollower(1, 2, Command())
check log.lastTerm() == 1
check log.lastIndex() == 2
check log.entriesCount == 2
log.appendAsFollower(2, 1, Command())
check log.lastTerm() == 2
check log.lastIndex() == 0
check log.lastIndex() == 1
check log.entriesCount == 1
suite "3 node cluster":
var timeNow = times.now()
var cluster = createCluster(test_ids_3, timeNow)
var t = now()
# cluster.advance(t)
# cluster.routeMessages(t)
suite "Single node election tracker":
test "unknown":
@ -188,7 +191,7 @@ proc consensusstatemachineMain*() =
test "election":
var timeNow = times.now()
var config = createConfigFromIds(test_ids_1)
var log = initRaftLog(0)
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 99.milliseconds
@ -210,7 +213,7 @@ proc consensusstatemachineMain*() =
test "append entry":
var timeNow = times.now()
var config = createConfigFromIds(test_ids_1)
var log = initRaftLog(0)
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 300.milliseconds
@ -231,7 +234,7 @@ proc consensusstatemachineMain*() =
let id1 = test_ids_3[0]
let id2 = test_ids_3[1]
var config = createConfigFromIds(@[id1, id2])
var log = initRaftLog(0)
var log = initRaftLog(1)
var timeNow = times.now()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
@ -266,12 +269,8 @@ proc consensusstatemachineMain*() =
block:
output = sm.poll()
timeNow += 100.milliseconds
echo sm
echo "lol" & $output
sm.tick(timeNow)
output = sm.poll()
echo sm
echo "lol" & $output
# if the leader get a message with higher term it should become follower
block:
timeNow += 201.milliseconds
@ -284,6 +283,99 @@ proc consensusstatemachineMain*() =
output = sm.poll()
check output.stateChange == true
check sm.state.isFollower
suite "3 nodes cluster":
test "election failed":
let mainNodeId = test_ids_3[0]
let id2 = test_ids_3[1]
let id3 = test_ids_3[2]
var config = createConfigFromIds(test_ids_3)
var log = initRaftLog(1)
var timeNow = times.now()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 301.milliseconds
sm.tick(timeNow)
check sm.state.isCandidate
var output = sm.poll()
check output.votedFor.isSome
check output.votedFor.get() == mainNodeId
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false, isPrevote: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == false
check sm.state.isCandidate
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false, isPrevote: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == true
check sm.state.isFollower
timeNow += 1.milliseconds
test "election":
let mainNodeId = test_ids_3[0]
let id2 = test_ids_3[1]
let id3 = test_ids_3[2]
var config = createConfigFromIds(test_ids_3)
var log = initRaftLog(1)
var timeNow = times.now()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 301.milliseconds
sm.tick(timeNow)
check sm.state.isCandidate
var output = sm.poll()
check output.votedFor.isSome
check output.votedFor.get() == mainNodeId
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false, isPrevote: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == false
check sm.state.isCandidate
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true, isPrevote: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == true
check sm.state.isLeader
timeNow += 1.milliseconds
suite "3 nodes cluester":
test "election":
var cluster = createCluster(test_ids_3, times.now())
var timeNow = times.now()
var leader: RaftnodeId
for i in 0..<500:
timeNow += 5.milliseconds
cluster.advance(timeNow)
var maybeLeader = cluster.getLeader()
if leader == RaftnodeId():
if maybeLeader.isSome:
leader = maybeLeader.get().myId
else:
if maybeLeader.isSome:
check leader == maybeLeader.get().myId
else:
check false
if isMainModule:
consensusstatemachineMain()