Make better Raft Protocol Definition

This commit is contained in:
Raycho Mukelov 2023-08-14 23:49:21 +03:00
parent 361c67803d
commit e1df04eb53
6 changed files with 187 additions and 114 deletions

View File

@ -8,7 +8,7 @@
# those terms.
import
raft/raft_api
raft/api
export
raft_api, types, protocol
api, types, protocol

View File

@ -18,10 +18,7 @@ skipDirs = @["tests"]
requires "nim >= 1.6.0"
requires "stew >= 0.1.0"
requires "nimcrypto >= 0.5.4"
requires "unittest2 >= 0.0.4"
requires "chronicles >= 0.10.2"
requires "eth >= 1.0.0"
requires "chronos >= 3.2.0"
requires "uuid4 >= 0.9.3"
# Helper functions

View File

@ -6,3 +6,15 @@
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import protocol
import types
proc RaftConsensusProcessRequestVote*(consensus: RaftConsensusModule, msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse =
discard
proc RaftConsensusProcessAppendEntries*(consensus: RaftConsensusModule, msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse =
discard
proc RaftConsensusQuorumMin(consensus: RaftConsensusModule): bool =
discard

View File

@ -8,10 +8,9 @@
# those terms.
# #
# Raft Messages Protocol definition #
# Raft Protocol definition #
# #
import types
import options
type
# Raft Node Messages OPs
@ -20,29 +19,49 @@ type
rmoAppendLogEntry = 1,
rmoInstallSnapshot = 2 # For dynamic adding of new Raft Nodes
RaftMessagePayloadChecksum* = object # Checksum probably will be a SHA3 hash not sure about this at this point
RaftMessagePayload*[LogEntryDataType] = ref object
data*: RaftNodeLogEntry[LogEntryDataType]
checksum*: RaftMessagePayloadChecksum
RaftMessageRespoonseError* = enum # Raft message response errors
rmreSuccess = 0,
rmreFail = 1
RaftMessage*[LogEntryDataType] = ref object of RaftMessageBase
op*: RaftMessageOps # Message Op - Ask For Votes, Append Entry(ies), Install Snapshot etc.
payload*: Option[seq[RaftMessagePayload[LogEntryDataType]]] # Optional Message Payload(s) - e.g. log entry(ies). Will be empty for a Heart-Beat # Heart-Beat will be a message with Append Entry(ies) Op and empty payload
RaftMessageResponseBase* = ref object of RootObj
msgId*: RaftMessageId # Original Message ID
senderId*: RaftNodeId # Sender Raft Node ID
respondentId: RaftNodeId # Responding RaftNodeId
senderTerm*: RaftNodeTerm # Sender Raft Node Term
RaftMessageResponse*[SmStateType] = ref object of RaftMessageBase
success*: bool # Indicates success/failure
state*: Option[SmStateType] # Raft Abstract State Machine State
RaftMessageRequestVote* = ref object of RaftMessageBase
lastLogTerm*: RaftNodeTerm
lastLogIndex*: RaftLogIndex
RaftMessageRequestVoteResponse* = ref object of RaftMessageResponseBase
granted*: bool
RaftMessageAppendEntries*[SmCommandType] = ref object of RaftMessageBase
prevLogIndex*: RaftLogIndex
prevLogTerm*: RaftNodeTerm
commitIndex*: RaftLogIndex
logEntries*: Option[seq[RaftNodeLogEntry[SmCommandType]]] # Optional log entry(ies). Will be empty for a Heart-Beat
RaftMessageAppendEntriesResponse*[SmStateType] = ref object of RaftMessageResponseBase
success*: bool
lastLogIndex*: RaftLogIndex
state*: Option[SmStateType] # Optional Raft Abstract State Machine State
# Raft Node Client Request/Response definitions
RaftNodeClientRequestOps = enum
rncroRequestState = 0,
rncroAppendNewEntry = 1
RaftNodeClientRequestOps* = enum
rncroRequestSmState = 0,
rncroExecSmCommand = 1
RaftNodeClientRequest*[LogEntryDataType] = ref object
RaftNodeClientResponseError = enum
rncrSuccess = 0,
rncrFail = 1,
rncrNotLeader = 2
RaftNodeClientRequest*[SmCommandType] = ref object
op*: RaftNodeClientRequestOps
payload*: Option[RaftMessagePayload[LogEntryDataType]] # Optional RaftMessagePayload carrying a Log Entry
payload*: Option[SmCommandType] # Optional RaftMessagePayload carrying a Log Entry
RaftNodeClientResponse*[SmStateType] = ref object
success*: bool # Indicate succcess
error*: RaftNodeClientResponseError
state*: Option[SmStateType] # Optional Raft Abstract State Machine State
raftNodeRedirectId*: Option[RaftNodeId] # Optional Raft Node ID to redirect the request to in case of failure

View File

@ -9,76 +9,104 @@
import types
import protocol
import consensus_module
export types, protocol
# Raft Node Public API procedures / functions
proc RaftNodeCreateNew*[LogEntryDataType, SmStateType]( # Create New Raft Node
proc RaftNodeCreateNew*[SmCommandType, SmStateType]( # Create New Raft Node
id: RaftNodeId, peers: RaftNodePeers,
persistentStorage: RaftNodePersistentStorage,
msgSendCallback: RaftMessageSendCallback): RaftNode[LogEntryDataType, SmStateType] =
msgSendCallback: RaftMessageSendCallback): RaftNode[SmCommandType, SmStateType] =
discard
proc RaftNodeLoad*[LogEntryDataType, SmStateType](
proc RaftNodeLoad*[SmCommandType, SmStateType](
persistentStorage: RaftNodePersistentStorage, # Load Raft Node From Storage
msgSendCallback: RaftMessageSendCallback): Result[RaftNode[LogEntryDataType, SmStateType], string] =
msgSendCallback: RaftMessageSendCallback): Result[RaftNode[SmCommandType, SmStateType], string] =
discard
proc RaftNodeStop*(node: RaftNode) =
proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard
proc RaftNodeStart*(node: RaftNode) =
proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard
func RaftNodeIdGet*(node: RaftNode): RaftNodeId = # Get Raft Node ID
func RaftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId = # Get Raft Node ID
discard
func RaftNodeStateGet*(node: RaftNode): RaftNodeState = # Get Raft Node State
func RaftNodeStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeState = # Get Raft Node State
discard
func RaftNodeTermGet*(node: RaftNode): RaftNodeTerm = # Get Raft Node Term
func RaftNodeTermGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeTerm = # Get Raft Node Term
discard
func RaftNodePeersGet*(node: RaftNode): RaftNodePeers = # Get Raft Node Peers
func RaftNodePeersGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodePeers = # Get Raft Node Peers
discard
func RaftNodeIsLeader*(node: RaftNode): bool = # Check if Raft Node is Leader
func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool = # Check if Raft Node is Leader
discard
proc RaftNodeMessageDeliver*(node: RaftNode, raftMessage: RaftMessageBase): RaftMessageResponse {.discardable.} = # Deliver Raft Message to the Raft Node
# Deliver Raft Message to the Raft Node and dispatch it
proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): RaftMessageResponseBase =
discard
proc RaftNodeRequest*(node: RaftNode, req: RaftNodeClientRequest): RaftNodeClientResponse = # Process RaftNodeClientRequest
# Process RaftNodeClientRequests
proc RaftNodeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): RaftNodeClientResponse[SmStateType] =
discard
proc RaftNodeLogIndexGet*(node: RaftNode): RaftLogIndex =
proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
discard
proc RaftNodeLogEntryGet*(node: RaftNode, logIndex: RaftLogIndex): Result[RaftNodeLogEntry, string] =
proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): Result[RaftNodeLogEntry[SmCommandType], string] =
discard
# Abstract State Machine Ops
func RaftNodeSmStateGet*[LogEntryDataType, SmStateType](node: RaftNode[LogEntryDataType, SmStateType]): SmStateType =
func RaftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType =
node.stateMachine.state
proc RaftNodeSmInit[LogEntryDataType, SmStateType](stateMachine: var RaftNodeStateMachine[LogEntryDataType, SmStateType]) =
proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) =
mixin RaftSmInit
RaftSmInit(stateMachine)
proc RaftNodeSmApply[LogEntryDataType, SmStateType](stateMachine: RaftNodeStateMachine[LogEntryDataType, SmStateType], logEntry: LogEntryDataType) =
proc RaftNodeSmApply[SmCommandType, SmStateType](stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType], command: SmCommandType) =
mixin RaftSmApply
RaftSmApply(stateMachine, logEntry)
RaftSmApply(stateMachine, command)
# Timer manipulation
proc RaftTimerCreate*[TimerDurationType](timerInterval: TimerDurationType, repeat: bool, timer_callback: RaftTimerCallback): TimerId = # I guess Duration should be monotonic
# Private Abstract Timer manipulation Ops
proc RaftTimerCreate[TimerDurationType](timerInterval: TimerDurationType, timer_callback: RaftTimerCallback): TimerId = # I guess Duration should be monotonic
mixin RaftTimerCreateCustomImpl
RaftTimerCreateCustomImpl(timerInterval, repeat, timer_callback)
RaftTimerCreateCustomImpl(timerInterval, timer_callback)
template RaftTimerCancel*(TimerId) =
template RaftTimerCancel(TimerId) =
mixin RaftTimerCancelCustomImpl
RaftTimerCancelCustomImpl(TimerId)
template RaftTimerIsExpired*(TimerId): bool =
template RaftTimerIsExpired(TimerId): bool =
mixin RaftTimerIsExpiredImpl
RaftTimerIsExpiredImpl(TimerId)
# Private Log Ops
proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
discard
proc RaftNodeLastLogIndex[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): uint64 =
discard
proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) =
discard
# Private Timers Create Ops
proc RaftNodeScheduleHeartBeat[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId =
discard
proc RaftNodeScheduleHeartBeatTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId =
discard
proc scheduleElectionTimeOut[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId =
discard
proc scheduleRequestVoteTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId =
discard
proc RaftNodeHeartBeatTimeout[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
discard

View File

@ -7,117 +7,134 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
# Raft Node Public Types.
# I guess that at some point these can be moved to a separate file called raft_consensus_types.nim for example
# Raft Node Public Types
import std/locks
import stew/results
import eth/keyfile
import std/sets
import options
import stew/results
import uuid4
export results, options
export results
type
# Raft Node basic definitions
Blob* = seq[byte]
RaftNodeState* = enum
UNKNOWN = 0,
FOLLOWER = 1,
LEADER = 2
rnsUnknown = 0,
rnsFollower = 1,
rnsCandidate = 2
rnsLeader = 3
RaftNodeId* = UUID # UUID uniquely identifying every Raft Node
RaftNodePeers* = seq[RaftNodeId] # List of Raft Node Peers IDs
RaftNodeId* = Uuid # uuid4 uniquely identifying every Raft Node
RaftNodeTerm* = uint64 # Raft Node Term Type
RaftLogIndex* = uint64 # Raft Node Log Index Type
RaftNodePeer* = object # Raft Node Peer object
id*: RaftNodeId
nextIndex*: RaftLogIndex # For each peer Raft Node, index of the next log entry to send to that Node
# (initialized to leader last log index + 1)
matchIndex*: RaftLogIndex # For each peer Raft Node, index of highest log entry known to be replicated on Node
# (initialized to 0, increases monotonically)
hasVoted*: bool # Indicates if this peer have voted for this Raft Node During Election
canVote*: bool # Indicates if this peer can vote
RaftNodePeers* = seq[RaftNodePeer] # List of Raft Node Peers
# Raft Node Abstract State Machine type
RaftNodeStateMachine*[LogEntryDataType, SmStateType] = ref object # Some probably opaque State Machine Impelementation to be used by the Raft Node
RaftNodeStateMachine*[SmCommandType, SmStateType] = object # Some opaque State Machine Impelementation to be used by the Raft Node
# providing at minimum operations for initialization, querying the current state
# and RaftNodeLogEntry application
# and RaftNodeLogEntry (SmCommandType) application
state: SmStateType
# Raft Node Persistent Storage basic definition
RaftNodePersistentStorage* = ref object # Should be some kind of Persistent Transactional Store Wrapper
RaftNodePersistentStorage*[SmCommandType, SmStateType] = object # Should be some kind of Persistent Transactional Store Wrapper
# Basic modules (algos) definitions
RaftNodeAccessCallback[LogEntryDataType, SmStateType] = proc: RaftNode[LogEntryDataType, SmStateType] {.nimcall, gcsafe.} # This should be implementes as a closure holding the RaftNode
RaftNodeAccessCallback[SmCommandType, SmStateType] = proc: RaftNode[SmCommandType, SmStateType] {.nimcall, gcsafe.} # This should be implementes as a closure holding the RaftNode
RaftConsensusModule*[LogEntryDataType, SmStateType] = object of RootObj
RaftConsensusModule*[SmCommandType, SmStateType] = object of RootObj
stateTransitionsFsm: seq[byte] # I plan to use nim.fsm https://github.com/ba0f3/fsm.nim
raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType, SmStateType]
raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType]
RaftLogCompactionModule*[LogEntryDataType, SmStateType] = object of RootObj
raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType, SmStateType]
RaftLogCompactionModule*[SmCommandType, SmStateType] = object of RootObj
raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType]
RaftMembershipChangeModule*[LogEntryDataType, SmStateType] = object of RootObj
raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType, SmStateType]
RaftMembershipChangeModule*[SmCommandType, SmStateType] = object of RootObj
raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType]
# Callback for sending messages out of this Raft Node
RaftMessageId* = UUID # UUID assigned to every Raft Node Message,
RaftMessageId* = Uuid # UUID assigned to every Raft Node Message,
# so it can be matched with it's corresponding response etc.
RaftMessageSendCallback* = proc (raftMessage: RaftMessageBase) {.nimcall, gcsafe.} # Callback for Sending Raft Node Messages
# out of this Raft Node. Can be used for broadcasting
# (a Heart-Beat for example)
# Raft Node basic Log definitions
RaftNodeLogEntry*[LogEntryDataType] = ref object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
data*: LogEntryDataType
RaftNodeLog*[LogEntryDataType] = ref object # Needs more elaborate definition.
# Probably this will be a RocksDB/MDBX/SQLite Store Wrapper etc.
logData*: seq[RaftNodeLogEntry[LogEntryDataType]] # Raft Node Log Data
# Base type for Raft message objects
RaftMessageBase* = ref object of RootObj # Base Type for Raft Node Messages
RaftMessageBase* = ref object of RootObj # Base Type for Raft Protocol Messages
msgId*: RaftMessageId # Message UUID
senderId*: RaftNodeId # Sender Raft Node ID
senderTerm*: RaftNodeTerm # Sender Raft Node Term
peers*: RaftNodePeers # List of Raft Node IDs, which should receive this message
RaftMessageSendCallback* = proc (recipient: RaftNodeId, raftMessage: RaftMessageBase) {.nimcall, gcsafe.} # Callback for Sending Raft Node Messages
# out of this Raft Node.
# For later use when adding/removing new nodes (dynamic configuration chganges)
RaftNodeConfiguration* = object
# Raft Node Log definition
LogEntryType* = enum
etUnknown = 0,
etConfiguration = 1,
etData = 2,
etNoOp = 3
RaftNodeLogEntry*[SmCommandType] = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
index*: RaftLogIndex
clusterTime*: object
entryType*: LogEntryType # Type of entry - data to append, configuration or no op etc.
configuration: Option[RaftNodeConfiguration] # Node configuration
data*: Option[SmCommandType] # Entry data (State Machine Command) - this is mutually exclusive with configuration
# depending on entryType field
RaftNodeLog*[SmCommandType] = object # Needs more elaborate definition.
# Probably this will be a RocksDB/MDBX/SQLite Store Wrapper etc.
logData*: seq[RaftNodeLogEntry[SmCommandType]] # Raft Node Log Data
# Timer types
TimerId* = UUID
TimerId* = Uuid
RaftTimerCallback* = proc (timerId: TimerId) {.nimcall, gcsafe.} # Pass any function wrapped in a closure
# Raft Node Object type
RaftNode*[LogEntryDataType, SmStateType] = ref object
RaftNode*[SmCommandType, SmStateType] = object
# Timers
activeTimers: HashSet[TimerId]
activeTimersSet: HashSet[TimerId]
requestVoteTimeout: uint64
heartBeatTimeOut: uint64
appendEntriesTimeOut: uint64
# Mtx definitions go here
# Mtx definition(s) go here
raftStateMutex: Lock
raftLogMutex: Lock
raftCommMutexReceiveMsg: Lock
raftCommMutexClientResponse: Lock
# Modules (Algos)
consensusModule: RaftConsensusModule[LogEntryDataType, SmStateType]
logCompactionModule: RaftLogCompactionModule[LogEntryDataType, SmStateType]
membershipChangeModule: RaftMembershipChangeModule[LogEntryDataType, SmStateType]
consensusModule: RaftConsensusModule[SmCommandType, SmStateType]
logCompactionModule: RaftLogCompactionModule[SmCommandType, SmStateType]
membershipChangeModule: RaftMembershipChangeModule[SmCommandType, SmStateType]
# Misc
msgSendCallback: RaftMessageSendCallback
persistentStorage: RaftNodePersistentStorage
persistentStorage: RaftNodePersistentStorage[SmCommandType, SmStateType]
# Persistent state
id: RaftNodeId # This Raft Node ID
state: RaftNodeState # This Raft Node State
currentTerm: RaftNodeTerm # Latest term this Raft Node has seen (initialized to 0 on first boot, increases monotonically)
log: RaftNodeLog[LogEntryDataType] # This Raft Node Log
votedFor: RaftNodeId # Candidate RaftNodeId that received vote in current term (or nil/zero if none),
# also used to redirect Client Requests in case this Raft Node is not the leader
log: RaftNodeLog[SmCommandType] # This Raft Node Log
stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType] # Not sure for now putting it here. I assume that persisting the State Machine's
# state is enough to consider it 'persisted'
peers: RaftNodePeers # This Raft Node Peers IDs. I am not sure if this must be persistent or volatile but making it persistent
# makes sense for the moment
stateMachine: RaftNodeStateMachine[LogEntryDataType, SmStateType] # Not sure for now putting it here. I assume that persisting the State Machine's
# state is enough to consider it 'persisted'
# Volatile state
commitIndex: RaftLogIndex # Index of highest log entry known to be committed (initialized to 0, increases monotonically)
lastApplied: RaftLogIndex # Index of highest log entry applied to state machine (initialized to 0, increases monotonically)
# Volatile state on leaders
nextIndex: seq[RaftLogIndex] # For each peer Raft Node, index of the next log entry to send to that Node
# (initialized to leader last log index + 1)
matchIndex: seq[RaftLogIndex] # For each peer Raft Node, index of highest log entry known to be replicated on Node
# (initialized to 0, increases monotonically)
currentLeaderId: RaftNodeId # The ID of the cirrent leader Raft Node or 0/nil if None is leader (election is in progress etc.)