WIP: Inital consensus state machine work
This commit is contained in:
parent
5bc59956c2
commit
c28243cb8a
|
@ -7,123 +7,121 @@
|
|||
# This file may not be copied, modified, or distributed except according to
|
||||
# those terms.
|
||||
|
||||
import std/tables
|
||||
import std/rlocks
|
||||
import types
|
||||
import chronicles
|
||||
import std/[times]
|
||||
|
||||
type
|
||||
# Node events
|
||||
EventType* = enum
|
||||
VotingTimeout,
|
||||
ElectionTimeout,
|
||||
HeartbeatTimeout,
|
||||
AppendEntriesTimeout
|
||||
HeartbeatReceived,
|
||||
HeartbeatSent,
|
||||
AppendEntriesReceived,
|
||||
AppendEntriesSent,
|
||||
RequestVoteReceived,
|
||||
RequestVoteSent,
|
||||
ClientRequestReceived,
|
||||
ClientRequestProcessed
|
||||
RaftRpcMessageType* = enum
|
||||
Vote = 0,
|
||||
VoteReplay = 1,
|
||||
Append = 2,
|
||||
AppendReplay = 3
|
||||
|
||||
|
||||
# Define callback to use with transitions. It is a function that takes a node as an argument and returns nothing.
|
||||
# It is used to perform some action when a transition is triggered. For example, when a node becomes a leader,
|
||||
# it should start sending heartbeats to other nodes.
|
||||
ConsensusFsmTransitionActionType*[NodeType] = proc(node: NodeType) {.gcsafe.}
|
||||
RaftRpcAppendRequest* = object
|
||||
previousTerm: RaftNodeTerm
|
||||
|
||||
RaftRpcAppendReplay* = object
|
||||
|
||||
# Define logical functions (conditions) computed from our NodeType etc. (Truth Table)
|
||||
LogicalConditionValueType* = bool
|
||||
LogicalCondition*[NodeTytpe, RaftMessageType] =
|
||||
proc(node: NodeTytpe, msg: Option[RaftMessageType]): LogicalConditionValueType
|
||||
LogicalConditionsLut*[RaftNodeState, EventType, NodeType, RaftMessageType] =
|
||||
Table[(RaftNodeState, EventType), seq[LogicalCondition[NodeType, RaftMessageType]]]
|
||||
RaftRpcVoteRequest* = object
|
||||
|
||||
RaftRpcVoteReplay* = object
|
||||
|
||||
LeaderState* = object
|
||||
|
||||
# Define Terminals as a tuple of a Event and a sequence of logical functions (conditions) and their respective values computed from NodeType, NodeTytpe and RaftMessageType
|
||||
# (kind of Truth Table)
|
||||
TerminalSymbol*[EventType, NodeType, RaftMessageType] =
|
||||
(EventType, seq[LogicalConditionValueType])
|
||||
CandidateState* = object
|
||||
votes: seq[RaftNodeId]
|
||||
|
||||
# Define State Transition Rules LUT of the form ( NonTerminal -> Terminal ) -> NonTerminal )
|
||||
# NonTerminal is a NodeState and Terminal is a TerminalSymbol - the tuple (EventType, seq[LogicalConditionValueType])
|
||||
StateTransitionsRulesLut*[RaftNodeState, EventType, NodeType, RaftMessageType] = Table[
|
||||
(RaftNodeState, TerminalSymbol[NodeType, EventType, RaftMessageType]),
|
||||
(RaftNodeState, Option[ConsensusFsmTransitionActionType[NodeType]])
|
||||
]
|
||||
FollowerState* = object
|
||||
leader: RaftNodeId
|
||||
|
||||
# FSM type definition
|
||||
ConsensusFsm*[RaftNodeState, EventType, NodeType, RaftMessageType] = ref object
|
||||
mtx: RLock
|
||||
state: RaftNodeState
|
||||
logicalFunctionsLut: LogicalConditionsLut[RaftNodeState, EventType, NodeType, RaftMessageType]
|
||||
stateTransitionsLut: StateTransitionsRulesLut[RaftNodeState, EventType, NodeType, RaftMessageType]
|
||||
RaftRpcMessage* = object
|
||||
currentTerm: RaftNodeTerm
|
||||
sender: RaftNodeId
|
||||
case kind: RaftRpcMessageType
|
||||
of Vote: voteRequest: RaftRpcVoteRequest
|
||||
of VoteReplay: voteReplay: RaftRpcVoteReplay
|
||||
of Append: appendRequest: RaftRpcAppendRequest
|
||||
of AppendReplay: appendReplay: RaftRpcAppendReplay
|
||||
|
||||
# FSM type constructor
|
||||
proc new*[RaftNodeState, EventType, NodeType, RaftMessageType](
|
||||
T: type ConsensusFsm[RaftNodeState, EventType, NodeType, RaftMessageType], startSymbol: RaftNodeState = rnsFollower): T =
|
||||
Command* = object
|
||||
data: seq[byte]
|
||||
LogEntry* = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
|
||||
term: RaftNodeTerm
|
||||
index: RaftLogIndex
|
||||
# TODO: Add configuration too
|
||||
data: Command
|
||||
|
||||
result = T(mtx: RLock(), state: startSymbol)
|
||||
initRLock(result.mtx)
|
||||
RaftLog* = object
|
||||
logEntries: seq[LogEntry]
|
||||
|
||||
debug "new: ", fsm=repr(result)
|
||||
RaftStateMachineOutput* = object
|
||||
logEntries: seq[LogEntry]
|
||||
# Entries that should be applyed to the "User" State machine
|
||||
committed: seq[LogEntry]
|
||||
messages: seq[RaftRpcMessage]
|
||||
debugLogs: seq[string]
|
||||
|
||||
proc addFsmTransition*[RaftNodeState, EventType, NodeType, RaftMessageType](
|
||||
fsm: ConsensusFsm[RaftNodeState, EventType, NodeType, RaftMessageType],
|
||||
fromState: RaftNodeState,
|
||||
termSymb: TerminalSymbol[EventType, NodeType, RaftMessageType],
|
||||
toState: RaftNodeState,
|
||||
action: Option[ConsensusFsmTransitionActionType]) =
|
||||
RaftStateMachine* = object
|
||||
myId*: RaftNodeId
|
||||
term: RaftNodeTerm
|
||||
commitIndex: RaftLogIndex
|
||||
log: RaftLog
|
||||
output: RaftStateMachineOutput
|
||||
electionTimeout: times.Duration
|
||||
lastUpdate: Time
|
||||
case state: RaftNodeState
|
||||
of rnsFollower: follower : FollowerState
|
||||
of rnsCandidate: candidate: CandidateState
|
||||
of rnsLeader: leader: LeaderState
|
||||
|
||||
fsm.stateTransitionsLut[(fromState.state, termSymb)] = (toState, action)
|
||||
RaftStateMachineConfig* = object
|
||||
|
||||
proc addFsmTransitionLogicalConditions*[RaftNodeState, EventType, NodeType, RaftMessageType](
|
||||
fsm: ConsensusFsm[RaftNodeState, EventType, NodeType, RaftMessageType],
|
||||
state: RaftNodeState,
|
||||
event: EventType,
|
||||
logicalConditions: seq[LogicalCondition[NodeType, RaftMessageType]]) =
|
||||
func append(rf: var RaftLog, entry: LogEntry) =
|
||||
rf.logEntries.add(entry)
|
||||
|
||||
fsm.logicalFunctionsLut[(state, event)] = logicalConditions
|
||||
func debug*(sm: var RaftStateMachine, log: string) =
|
||||
sm.output.debugLogs.add(log)
|
||||
|
||||
func initRaftStateMachine*(config: RaftStateMachineConfig): RaftStateMachine =
|
||||
var st = RaftStateMachine()
|
||||
st.term = 0
|
||||
st.commitIndex = 0
|
||||
st.state = RaftNodeState.rnsFollower
|
||||
return st
|
||||
|
||||
proc computeFsmLogicFunctionsPermutationValuе*[RaftNodeState, NodeType, EventType, RaftMessageType](
|
||||
fsm: ConsensusFsm[RaftNodeState, EventType, NodeType, RaftMessageType],
|
||||
node: NodeType,
|
||||
termSymb: TerminalSymbol,
|
||||
msg: Option[RaftMessageType]): TerminalSymbol =
|
||||
|
||||
let
|
||||
e = termSymb[0]
|
||||
func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage =
|
||||
return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: Vote, voteRequest: RaftRpcVoteRequest())
|
||||
|
||||
debug "computeFSMLogicFunctionsPermutationValue: ", eventType=e, " ", nonTermSymb=nts, " ", msg=msg
|
||||
func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage,currentTime: Time) =
|
||||
if sm.term > msg.currentTerm:
|
||||
sm.debug "Current node is behind"
|
||||
elif sm.term < msg.currentTerm:
|
||||
sm.debug "The leader is behind"
|
||||
else:
|
||||
case sm.state:
|
||||
of rnsCandidate:
|
||||
sm.state = rnsFollower
|
||||
sm.follower.leader = msg.sender
|
||||
of rnsFollower:
|
||||
sm.follower.leader = msg.sender
|
||||
of rnsLeader:
|
||||
sm.debug "For now it should be impossible to have msg from a non leader to a leader"
|
||||
|
||||
var
|
||||
logicFunctionsConds = fsm.logicalFunctionsLut[(e, NodeType)]
|
||||
logicFunctionsCondsValues = seq[LogicalConditionValueType]
|
||||
case msg.kind:
|
||||
of RaftRpcMessageType.Vote:
|
||||
sm.debug "Handle vote request"
|
||||
else:
|
||||
sm.debug "Unhandle msg type"
|
||||
|
||||
debug "computeFSMLogicFunctionsPermutationValue: ", logicFunctionsConds=logicFunctionsConds
|
||||
func addEntry*(sm: var RaftStateMachine, entry: LogEntry) =
|
||||
if sm.state != RaftNodeState.rnsLeader:
|
||||
sm.debug "Error: only the leader can handle new entries"
|
||||
sm.log.append(entry)
|
||||
|
||||
for f in logicFunctionsConds:
|
||||
logicFunctionsCondsValues.add f(node, msg)
|
||||
|
||||
debug "computeFSMLogicFunctionsPermutationValue: ", logicFunctionsCondsValues=logicFunctionsCondsValues
|
||||
|
||||
termSymb[1] = logicFunctionsConds
|
||||
result = termSymb
|
||||
|
||||
proc fsmAdvance*[RaftNodeState, EventType, NodeType, RaftMessageType](
|
||||
fsm: ConsensusFsm[RaftNodeState, EventType, NodeType, RaftMessageType],
|
||||
node: NodeType,
|
||||
termSymb: TerminalSymbol[EventType, NodeType, RaftMessageType],
|
||||
msg: Option[RaftMessageType]): RaftNodeState =
|
||||
|
||||
withRLock():
|
||||
var
|
||||
input = computeFsmLogicFunctionsPermutationValue(fsm, node, termSymb, msg)
|
||||
let trans = fsm.stateTransitionsLut[(fsm.state, input)]
|
||||
let action = trans[1]
|
||||
|
||||
fsm.state = trans[0]
|
||||
debug "ConsensusFsmAdvance", fsmState=fsm.state, " ", input=input, " ", action=repr(action)
|
||||
if action.isSome:
|
||||
action(node)
|
||||
result = fsm.state
|
||||
func poll*(sm: var RaftStateMachine): RaftStateMachineOutput =
|
||||
# Should initiate replication if we have new entries
|
||||
let output = sm.output
|
||||
sm.output = RaftStateMachineOutput()
|
||||
return output
|
|
@ -27,10 +27,9 @@ const
|
|||
|
||||
type
|
||||
RaftNodeState* = enum
|
||||
rnsError = 0, # Error state
|
||||
rnsFollower = 1, # Follower state
|
||||
rnsCandidate = 2 # Candidate state
|
||||
rnsLeader = 3 # Leader state
|
||||
rnsFollower = 0, # Follower state
|
||||
rnsCandidate = 1 # Candidate state
|
||||
rnsLeader = 2 # Leader state
|
||||
|
||||
RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node
|
||||
RaftNodeTerm* = int # Raft Node Term Type
|
||||
|
|
|
@ -14,4 +14,5 @@ import ../misc/test_macro
|
|||
cliBuilder:
|
||||
import ./test_basic_timers,
|
||||
./test_basic_state_machine,
|
||||
./test_basic_cluster_election
|
||||
#./test_basic_cluster_election,
|
||||
./test_consensus_state_machine
|
|
@ -9,16 +9,34 @@
|
|||
|
||||
import unittest2
|
||||
import ../raft/types
|
||||
import basic_state_machine
|
||||
import ../raft/consensus_state_machine
|
||||
import std/[times]
|
||||
|
||||
var
|
||||
csm: ConsensusFsm[RaftNodeState, EventType, RaftNode[SmCommand, SmState], RaftMessageBase[SmCommand, SmState]]
|
||||
proc consensusstatemachineMain*() =
|
||||
|
||||
|
||||
suite "Create and test Consensus State Machine":
|
||||
suite "Basic state machine tests":
|
||||
test "create state machine":
|
||||
let config = RaftStateMachineConfig()
|
||||
let sm = initRaftStateMachine(config)
|
||||
echo sm
|
||||
|
||||
test "Create Consensus State Machine":
|
||||
csm = ConsensusFsm[RaftNodeState, EventType, RaftNode[SmCommand, SmState], RaftMessageBase[SmCommand, SmState]].new
|
||||
check csm != nil
|
||||
test "advance empty state machine":
|
||||
var sm = RaftStateMachine()
|
||||
var msg = sm.createVoteRequest()
|
||||
sm.advance(msg ,getTime())
|
||||
echo sm.poll()
|
||||
echo sm.poll()
|
||||
echo getTime()
|
||||
|
||||
# test "Add state transitions / logic conditions":
|
||||
test "two machines":
|
||||
var sm = RaftStateMachine()
|
||||
var sm2 = RaftStateMachine(myId: genUUID())
|
||||
var msg = sm2.createVoteRequest()
|
||||
sm.advance(msg ,getTime())
|
||||
echo sm2
|
||||
echo getTime()
|
||||
|
||||
|
||||
if isMainModule:
|
||||
consensusstatemachineMain()
|
Loading…
Reference in New Issue