More refactoring

This commit is contained in:
Marto 2024-02-07 17:02:07 +02:00
parent 47e1e36294
commit fc50bf1271
6 changed files with 81 additions and 57 deletions

View File

@ -5,3 +5,19 @@ This project aims to develop an implementation of the Raft consensus protocol th
We plan to leverage the implementation to create a highly-efficient setup for operating a redundant set of Nimbus beacon nodes and/or validator clients that rely on BLS threshold signatures to achieve improved resilience and security. Further details can be found in our roadmap here: We plan to leverage the implementation to create a highly-efficient setup for operating a redundant set of Nimbus beacon nodes and/or validator clients that rely on BLS threshold signatures to achieve improved resilience and security. Further details can be found in our roadmap here:
https://github.com/status-im/nimbus-eth2/issues/3416 https://github.com/status-im/nimbus-eth2/issues/3416
This project is heavily inspired by Raft implementation in ScyllaDB
https://github.com/scylladb/scylladb/tree/master/raft
# Design goals
The main goal is to separate implementation of the raft state machin from the other implementation details like (storage, rpc etc)
In order to achive this we want to keep the State machine absolutly deterministic every interaction the the world like
networking, logging, acquiring current time, random number generation, disc operation etc must happened trough the state machine interface.
It will ensure better testability and integrability.
# Run test
`./run_all_tests.sh`

View File

@ -10,6 +10,7 @@
import types import types
import log import log
import tracker import tracker
import state
import std/[times] import std/[times]
import std/sequtils import std/sequtils
@ -47,8 +48,7 @@ type
term: RaftNodeTerm term: RaftNodeTerm
case result: RaftRpcCode: case result: RaftRpcCode:
of Accepted: accepted: RaftRpcAppendReplayAccepted of Accepted: accepted: RaftRpcAppendReplayAccepted
of Rejected: rejected: RaftRpcAppendReplayRejected of Rejected: rejected: RaftRpcAppendReplayRejected
RaftRpcVoteRequest* = object RaftRpcVoteRequest* = object
currentTerm*: RaftNodeTerm currentTerm*: RaftNodeTerm
@ -60,15 +60,6 @@ type
currentTerm*: RaftNodeTerm currentTerm*: RaftNodeTerm
voteGranted*: bool voteGranted*: bool
LeaderState* = object
tracker: RaftTracker
CandidateState* = object
votes: RaftVotes
FollowerState* = object
leader: RaftNodeId
RaftRpcMessage* = object RaftRpcMessage* = object
currentTerm*: RaftNodeTerm currentTerm*: RaftNodeTerm
sender*: RaftNodeId sender*: RaftNodeId
@ -89,12 +80,6 @@ type
votedFor*: Option[RaftNodeId] votedFor*: Option[RaftNodeId]
stateChange*: bool stateChange*: bool
RaftStateMachineState* = object
case state: RaftNodeState
of rnsFollower: follower : FollowerState
of rnsCandidate: candidate: CandidateState
of rnsLeader: leader: LeaderState
RaftStateMachine* = object RaftStateMachine* = object
myId*: RaftNodeId myId*: RaftNodeId
term*: RaftNodeTerm term*: RaftNodeTerm
@ -117,25 +102,6 @@ type
state*: RaftStateMachineState state*: RaftStateMachineState
func isLeader*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsLeader
func isFollower*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsFollower
func isCandidate*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsCandidate
func leader*(s: var RaftStateMachineState): var LeaderState =
return s.leader
func follower*(s: var RaftStateMachineState): var FollowerState =
return s.follower
func candidate*(s: var RaftStateMachineState): var CandidateState =
return s.candidate
func leader*(sm: var RaftStateMachine): var LeaderState = func leader*(sm: var RaftStateMachine): var LeaderState =
return sm.state.leader return sm.state.leader
@ -146,7 +112,7 @@ func candidate*(sm: var RaftStateMachine): var CandidateState =
return sm.state.candidate return sm.state.candidate
func debug*(sm: var RaftStateMachine, log: string) = func debug*(sm: var RaftStateMachine, log: string) =
sm.output.debugLogs.add("[" & $(sm.timeNow - sm.startTime).inMilliseconds & "ms] [" & (($sm.myId)[0..7]) & "...] [" & $sm.state.state & "]: " & log) sm.output.debugLogs.add("[" & $(sm.timeNow - sm.startTime).inMilliseconds & "ms] [" & (($sm.myId)[0..7]) & "...] [" & $sm.state & "]: " & log)
proc resetElectionTimeout*(sm: var RaftStateMachine) = proc resetElectionTimeout*(sm: var RaftStateMachine) =
# TODO actually pick random time # TODO actually pick random time
@ -157,7 +123,7 @@ proc initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftL
sm.term = currentTerm sm.term = currentTerm
sm.log = log sm.log = log
sm.commitIndex = commitIndex sm.commitIndex = commitIndex
sm.state = RaftStateMachineState(state: RaftnodeState.rnsFollower) sm.state = initFollower(RaftNodeId())
sm.config = config sm.config = config
sm.lastElectionTime = now sm.lastElectionTime = now
sm.timeNow = now sm.timeNow = now
@ -168,7 +134,6 @@ proc initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftL
sm.resetElectionTimeout() sm.resetElectionTimeout()
return sm return sm
func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] = func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] =
return sm.leader.tracker.find(id) return sm.leader.tracker.find(id)
@ -247,7 +212,7 @@ func becomeFollower*(sm: var RaftStateMachine, leaderId: RaftNodeId) =
if sm.myId == leaderId: if sm.myId == leaderId:
sm.debug "Can't be follower of itself" sm.debug "Can't be follower of itself"
sm.output.stateChange = not sm.state.isFollower sm.output.stateChange = not sm.state.isFollower
sm.state = RaftStateMachineState(state: RaftNodeState.rnsFollower, follower: FollowerState(leader: leaderId)) sm.state = initFollower(leaderId)
if leaderId != RaftnodeId(): if leaderId != RaftnodeId():
sm.pingLeader = false sm.pingLeader = false
# TODO: Update last election time # TODO: Update last election time
@ -258,9 +223,8 @@ func becomeLeader*(sm: var RaftStateMachine) =
return return
sm.output.stateChange = true sm.output.stateChange = true
sm.state = RaftStateMachineState(state: RaftnodeState.rnsLeader, leader: LeaderState())
sm.addEntry(Empty()) sm.addEntry(Empty())
sm.leader.tracker = initTracker(sm.config, sm.log.lastIndex, sm.timeNow) sm.state = initLeader(sm.config, sm.log.lastIndex, sm.timeNow)
sm.pingLeader = false sm.pingLeader = false
#TODO: Update last election time #TODO: Update last election time
return return
@ -270,7 +234,7 @@ func becomeCandidate*(sm: var RaftStateMachine) =
if not sm.state.isCandidate: if not sm.state.isCandidate:
sm.output.stateChange = true sm.output.stateChange = true
sm.state = RaftStateMachineState(state: RaftnodeState.rnsCandidate, candidate: CandidateState(votes: initVotes(sm.config))) sm.state = initCandidate(sm.config)
sm.lastElectionTime = sm.timeNow sm.lastElectionTime = sm.timeNow
# TODO: Add configuration change logic # TODO: Add configuration change logic
@ -327,7 +291,6 @@ func tick*(sm: var RaftStateMachine, now: times.DateTime) =
sm.debug "Become candidate" sm.debug "Become candidate"
sm.becomeCandidate() sm.becomeCandidate()
func poll*(sm: var RaftStateMachine): RaftStateMachineOutput = func poll*(sm: var RaftStateMachine): RaftStateMachineOutput =
# Should initiate replication if we have new entries # Should initiate replication if we have new entries
if sm.state.isLeader: if sm.state.isLeader:
@ -338,7 +301,6 @@ func poll*(sm: var RaftStateMachine): RaftStateMachineOutput =
let output = sm.output let output = sm.output
sm.output = RaftStateMachineOutput() sm.output = RaftStateMachineOutput()
return output return output
func commit*(sm: var RaftStateMachine) = func commit*(sm: var RaftStateMachine) =
@ -392,7 +354,6 @@ func advanceCommitIdx(sm: var RaftStateMachine, leaderIdx: RaftLogIndex) =
sm.commitIndex = newIdx sm.commitIndex = newIdx
# TODO: signal the output for the update # TODO: signal the output for the update
func appendEntry*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcAppendRequest) = func appendEntry*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcAppendRequest) =
if not sm.state.isFollower: if not sm.state.isFollower:
sm.debug "You can't append append request to the non follower" sm.debug "You can't append append request to the non follower"
@ -419,7 +380,6 @@ func requestVote*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpc
let responce: RaftRpcVoteReplay = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false) let responce: RaftRpcVoteReplay = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false)
sm.sendTo(fromId, responce) sm.sendTo(fromId, responce)
func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteReplay) = func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteReplay) =
if not sm.state.isCandidate: if not sm.state.isCandidate:
sm.debug "Non candidate can't handle votes" sm.debug "Non candidate can't handle votes"

57
raft/state.nim Normal file
View File

@ -0,0 +1,57 @@
import types
import tracker
import std/[times]
type
RaftNodeState = enum
rnsFollower = 0, # Follower state
rnsCandidate = 1 # Candidate state
rnsLeader = 2 # Leader state
RaftStateMachineState* = object
case state: RaftNodeState
of rnsFollower: follower: FollowerState
of rnsCandidate: candidate: CandidateState
of rnsLeader: leader: LeaderState
LeaderState* = object
tracker*: RaftTracker
CandidateState* = object
votes*: RaftVotes
FollowerState* = object
leader*: RaftNodeId
func `$`*(s: RaftStateMachineState): string =
return $s.state
func initLeader*(cfg: RaftConfig, index: RaftLogIndex, now: times.DateTime): RaftStateMachineState =
var state = RaftStateMachineState(state: RaftnodeState.rnsLeader, leader: LeaderState())
state.leader.tracker = initTracker(cfg, index, now)
return state
func initFollower*(leaderId: RaftNodeId): RaftStateMachineState =
return RaftStateMachineState(state: RaftNodeState.rnsFollower, follower: FollowerState(leader: leaderId))
func initCandidate*(cfg: RaftConfig): RaftStateMachineState =
return RaftStateMachineState(state: RaftnodeState.rnsCandidate, candidate: CandidateState(votes: initVotes(cfg)))
func isLeader*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsLeader
func isFollower*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsFollower
func isCandidate*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsCandidate
func leader*(s: var RaftStateMachineState): var LeaderState =
return s.leader
func follower*(s: var RaftStateMachineState): var FollowerState =
return s.follower
func candidate*(s: var RaftStateMachineState): var CandidateState =
return s.candidate

View File

@ -76,7 +76,6 @@ func tallyVote*(rv: var RaftVotes): RaftElectionResult =
# TODO: Add support for configuration # TODO: Add support for configuration
return rv.current.tallyVote() return rv.current.tallyVote()
func find*(ls: RaftTracker, id: RaftnodeId): Option[RaftFollowerProgressTracker] = func find*(ls: RaftTracker, id: RaftnodeId): Option[RaftFollowerProgressTracker] =
for follower in ls.progress: for follower in ls.progress:
if follower.id == id: if follower.id == id:

View File

@ -26,11 +26,6 @@ const
DefaultUUID* = initUUID(0, 0) # 00000000-0000-0000-0000-000000000000 DefaultUUID* = initUUID(0, 0) # 00000000-0000-0000-0000-000000000000
type type
RaftNodeState* = enum
rnsFollower = 0, # Follower state
rnsCandidate = 1 # Candidate state
rnsLeader = 2 # Leader state
RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node
RaftNodeTerm* = int # Raft Node Term Type RaftNodeTerm* = int # Raft Node Term Type
RaftLogIndex* = int # Raft Node Log Index Type RaftLogIndex* = int # Raft Node Log Index Type

View File

@ -12,6 +12,7 @@ import ../raft/types
import ../raft/consensus_state_machine import ../raft/consensus_state_machine
import ../raft/log import ../raft/log
import ../raft/tracker import ../raft/tracker
import ../raft/state
import std/[times, sequtils] import std/[times, sequtils]
import uuids import uuids
import tables import tables
@ -57,16 +58,13 @@ proc advance(tc: var TestCluster, now: times.DateTime) =
#echo "rpc:" & $msg #echo "rpc:" & $msg
tc.nodes[msg.receiver].advance(msg, now) tc.nodes[msg.receiver].advance(msg, now)
func getLeader(tc: TestCluster): Option[RaftStateMachine] = func getLeader(tc: TestCluster): Option[RaftStateMachine] =
for id, node in tc.nodes: for id, node in tc.nodes:
if node.state.isLeader: if node.state.isLeader:
return some(node) return some(node)
return none(RaftStateMachine) return none(RaftStateMachine)
proc consensusstatemachineMain*() = proc consensusstatemachineMain*() =
suite "Basic state machine tests": suite "Basic state machine tests":
test "create state machine": test "create state machine":
@ -367,6 +365,5 @@ proc consensusstatemachineMain*() =
else: else:
check false check false
if isMainModule: if isMainModule:
consensusstatemachineMain() consensusstatemachineMain()