Give random generator as ctor argument to the raft state machine

This commit is contained in:
Marto 2024-02-15 17:16:30 +02:00
parent 526984804e
commit c96adcfa10
3 changed files with 83 additions and 58 deletions

View File

@ -15,9 +15,6 @@ import state
import std/[times]
import std/random
randomize()
type
RaftRpcMessageType* = enum
VoteRequest = 0,
@ -30,11 +27,13 @@ type
Accepted = 1
DebugLogLevel* = enum
Critical = 0,
Error = 1,
Warning = 2,
Debug = 3,
Info = 4,
None = 0
Critical = 1,
Error = 2,
Warning = 3,
Debug = 4,
Info = 5,
All = 6,
DebugLogEntry* = object
level*: DebugLogLevel
@ -113,6 +112,7 @@ type
timeNow: times.DateTime
startTime: times.DateTime
electionTimeout: times.Duration
randomGenerator: Rand
state*: RaftStateMachineState
@ -126,7 +126,7 @@ func candidate*(sm: var RaftStateMachine): var CandidateState =
return sm.state.candidate
func addDebugLogEntry(sm: var RaftStateMachine, level: DebugLogLevel, msg: string) =
sm.output.debugLogs.add(DebugLogEntry(time: sm.timeNow, level: level, msg: msg, nodeId: sm.myId))
sm.output.debugLogs.add(DebugLogEntry(time: sm.timeNow, state: sm.state.state, level: level, msg: msg, nodeId: sm.myId))
func debug*(sm: var RaftStateMachine, log: string) =
sm.addDebugLogEntry(DebugLogLevel.Debug, log)
@ -143,13 +143,11 @@ func info*(sm: var RaftStateMachine, log: string) =
func critical*(sm: var RaftStateMachine, log: string) =
sm.addDebugLogEntry(DebugLogLevel.Critical, log)
proc resetElectionTimeout*(sm: var RaftStateMachine) =
func resetElectionTimeout*(sm: var RaftStateMachine) =
# TODO actually pick random time
sm.randomizedElectionTime = sm.electionTimeout + times.initDuration(milliseconds = 100 + rand(200))
sm.randomizedElectionTime = sm.electionTimeout + times.initDuration(milliseconds = 100 + sm.randomGenerator.rand(200))
proc initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftLog, commitIndex: RaftLogIndex, config: RaftConfig, now: times.DateTime): RaftStateMachine =
func initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftLog, commitIndex: RaftLogIndex, config: RaftConfig, now: times.DateTime, randomGenerator: Rand): RaftStateMachine =
var sm = RaftStateMachine()
sm.term = currentTerm
sm.log = log
@ -162,6 +160,7 @@ proc initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftL
sm.myId = id
sm.electionTimeout = times.initDuration(milliseconds = 100)
sm.heartbeatTime = times.initDuration(milliseconds = 50)
sm.randomGenerator = randomGenerator
sm.resetElectionTimeout()
return sm
@ -280,8 +279,8 @@ func becomeCandidate*(sm: var RaftStateMachine) =
let request = RaftRpcVoteRequest(currentTerm: sm.term, lastLogIndex: sm.log.lastIndex, lastLogTerm: sm.log.lastTerm, force: false)
sm.sendTo(nodeId, request)
sm.debug "Elecation won" & $(sm.candidate.votes) & $sm.myId
if sm.candidate.votes.tallyVote == RaftElectionResult.Won:
sm.debug "Elecation won" & $(sm.candidate.votes) & $sm.myId
sm.becomeLeader()
return
@ -422,7 +421,7 @@ func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: Ra
of RaftElectionResult.Unknown:
return
of RaftElectionResult.Won:
sm.debug "Win election"
sm.debug "Elecation won" & $(sm.candidate.votes) & $sm.myId
sm.becomeLeader()
of RaftElectionResult.Lost:
sm.debug "Lost election"

View File

@ -10,7 +10,7 @@ type
rnsLeader = 2 # Leader state
RaftStateMachineState* = object
case state: RaftNodeState
case state*: RaftNodeState
of rnsFollower: follower: FollowerState
of rnsCandidate: candidate: CandidateState
of rnsLeader: leader: LeaderState

View File

@ -14,7 +14,7 @@ import ../src/raft/log
import ../src/raft/tracker
import ../src/raft/state
import std/sets
import std/[times, sequtils]
import std/[times, sequtils, random]
import uuids
import tables
import std/algorithm
@ -50,7 +50,7 @@ proc createCluster(ids: seq[RaftnodeId], now: times.DateTime) : TestCluster =
for i in 0..<config.currentSet.len:
let id = config.currentSet[i]
var log = initRaftLog(1)
var node = initRaftStateMachine(id, 0, log, 0, config, now)
var node = initRaftStateMachine(id, 0, log, 0, config, now, initRand(i + 42))
cluster.nodes[id] = node
return cluster
@ -82,38 +82,43 @@ proc advance(tc: var TestCluster, now: times.DateTime, logLevel: DebugLogLevel =
tc.nodes[id].tick(now)
var output = tc.nodes[id].poll()
debugLogs.add(output.debugLogs)
for msg in output.messages:
echo "rpc:" & $msg
for msg in output.messages:
if not tc.blockedMsgRoutingSet.contains(msg.sender) and not tc.blockedMsgRoutingSet.contains(msg.receiver):
if DebugLogLevel.Debug <= logLevel:
echo now.format("HH:mm:ss:fff") & "rpc:" & $msg
tc.nodes[msg.receiver].advance(msg, now)
debugLogs.sort(cmpLogs)
for msg in debugLogs:
if msg.level <= logLevel:
echo $msg
else:
if DebugLogLevel.Debug <= logLevel:
echo "[" & now.format("HH:mm:ss:fff") & "] rpc message is blocked: " & $msg & $tc.blockedMsgRoutingSet
debugLogs.sort(cmpLogs)
for msg in debugLogs:
if msg.level <= logLevel:
echo $msg
func getLeader(tc: TestCluster): Option[RaftStateMachine] =
var leader = none(RaftStateMachine)
for id, node in tc.nodes:
if node.state.isLeader:
return some(node)
return none(RaftStateMachine)
if not leader.isSome() or leader.get().term < node.term:
leader = some(node)
return leader
proc consensusstatemachineMain*() =
suite "Basic state machine tests":
test "create state machine":
var cluster = createCluster(test_ids_1, times.now())
echo cluster
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_1, timeNow)
test "tick empty state machine":
var timeNow = times.now()
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var config = createConfigFromIds(test_ids_1)
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
sm.tick(times.now())
echo sm.poll()
echo sm.poll()
echo getTime()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
timeNow += 5.milliseconds
sm.tick(timeNow)
suite "Entry log tests":
test "append entry as leadeer":
@ -152,7 +157,7 @@ proc consensusstatemachineMain*() =
check log.entriesCount == 1
suite "3 node cluster":
var timeNow = times.now()
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
var t = now()
@ -212,10 +217,10 @@ proc consensusstatemachineMain*() =
suite "Single node cluster":
test "election":
var timeNow = times.now()
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var config = createConfigFromIds(test_ids_1)
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
check sm.state.isFollower
timeNow += 99.milliseconds
sm.tick(timeNow)
@ -234,10 +239,10 @@ proc consensusstatemachineMain*() =
check sm.term == 1
test "append entry":
var timeNow = times.now()
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var config = createConfigFromIds(test_ids_1)
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
check sm.state.isFollower
timeNow += 1000.milliseconds
sm.tick(timeNow)
@ -258,8 +263,8 @@ proc consensusstatemachineMain*() =
let id2 = test_ids_3[1]
var config = createConfigFromIds(@[id1, id2])
var log = initRaftLog(1)
var timeNow = times.now()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
check sm.state.isFollower
timeNow += 601.milliseconds
sm.tick(timeNow)
@ -313,8 +318,8 @@ proc consensusstatemachineMain*() =
let id3 = test_ids_3[2]
var config = createConfigFromIds(test_ids_3)
var log = initRaftLog(1)
var timeNow = times.now()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
check sm.state.isFollower
timeNow += 501.milliseconds
sm.tick(timeNow)
@ -350,8 +355,8 @@ proc consensusstatemachineMain*() =
let id3 = test_ids_3[2]
var config = createConfigFromIds(test_ids_3)
var log = initRaftLog(1)
var timeNow = times.now()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
check sm.state.isFollower
timeNow += 501.milliseconds
sm.tick(timeNow)
@ -383,8 +388,8 @@ proc consensusstatemachineMain*() =
suite "3 nodes cluester":
test "election":
var cluster = createCluster(test_ids_3, times.now())
var timeNow = times.now()
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
var leader: RaftnodeId
var hasLeader = false
for i in 0..<105:
@ -404,9 +409,9 @@ proc consensusstatemachineMain*() =
check hasLeader
test "1 node is not responding":
var cluster = createCluster(test_ids_3, times.now())
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
cluster.blockMsgRouting(test_ids_3[0])
var timeNow = times.now()
var leader: RaftnodeId
var hasLeader = false
for i in 0..<105:
@ -426,10 +431,10 @@ proc consensusstatemachineMain*() =
check hasLeader
test "2 nodes is not responding":
var cluster = createCluster(test_ids_3, times.now())
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
cluster.blockMsgRouting(test_ids_3[0])
cluster.blockMsgRouting(test_ids_3[1])
var timeNow = times.now()
var leader: RaftnodeId
for i in 0..<105:
timeNow += 5.milliseconds
@ -440,23 +445,44 @@ proc consensusstatemachineMain*() =
test "1 nodes is not responding new leader reelection":
var cluster = createCluster(test_ids_3, times.now())
var timeNow = times.now()
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
var leader: RaftnodeId
var firstLeaderId = RaftnodeId()
var secondLeaderId = RaftnodeId()
for i in 0..<305:
timeNow += 5.milliseconds
cluster.advance(timeNow, DebugLogLevel.Debug)
cluster.advance(timeNow)
var maybeLeader = cluster.getLeader()
if maybeLeader.isSome():
if maybeLeader.isSome() and firstLeaderId == RaftNodeId():
# we will block the comunication and will try to elect new leader
firstLeaderId = maybeLeader.get().myId
cluster.blockMsgRouting(firstLeaderId)
echo "Block comunication with: " & $firstLeaderId
if firstLeaderId != RaftnodeId() and maybeLeader.isSome() and maybeLeader.get().myId != firstLeaderId:
secondLeaderId = maybeLeader.get().myId
check secondLeaderId != RaftnodeId()
check secondLeaderId != RaftnodeId() and firstLeaderId != secondLeaderId
test "After reaelection leader should become follower":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
var leader: RaftnodeId
var firstLeaderId = RaftnodeId()
var secondLeaderId = RaftnodeId()
for i in 0..<305:
timeNow += 5.milliseconds
cluster.advance(timeNow)
var maybeLeader = cluster.getLeader()
if maybeLeader.isSome() and firstLeaderId == RaftNodeId():
# we will block the comunication and will try to elect new leader
firstLeaderId = maybeLeader.get().myId
cluster.blockMsgRouting(firstLeaderId)
echo "Block comunication with: " & $firstLeaderId
if firstLeaderId != RaftnodeId() and maybeLeader.isSome() and maybeLeader.get().myId != firstLeaderId:
secondLeaderId = maybeLeader.get().myId
cluster.allowMsgRouting(firstLeaderId)
if isMainModule: