From e1df04eb5371abf63b9d8b77f457ab4acb57643b Mon Sep 17 00:00:00 2001 From: Raycho Mukelov Date: Mon, 14 Aug 2023 23:49:21 +0300 Subject: [PATCH] Make better Raft Protocol Definition --- raft.nim | 4 +- raft.nimble | 5 +- raft/consensus_module.nim | 14 +++- raft/protocol.nim | 59 ++++++++++------ raft/raft_api.nim | 76 +++++++++++++------- raft/types.nim | 143 +++++++++++++++++++++----------------- 6 files changed, 187 insertions(+), 114 deletions(-) diff --git a/raft.nim b/raft.nim index f6e04bb..90960d6 100644 --- a/raft.nim +++ b/raft.nim @@ -8,7 +8,7 @@ # those terms. import - raft/raft_api + raft/api export - raft_api, types, protocol + api, types, protocol diff --git a/raft.nimble b/raft.nimble index 2baa6d2..c81c8ce 100644 --- a/raft.nimble +++ b/raft.nimble @@ -18,10 +18,7 @@ skipDirs = @["tests"] requires "nim >= 1.6.0" requires "stew >= 0.1.0" -requires "nimcrypto >= 0.5.4" requires "unittest2 >= 0.0.4" -requires "chronicles >= 0.10.2" -requires "eth >= 1.0.0" -requires "chronos >= 3.2.0" +requires "uuid4 >= 0.9.3" # Helper functions \ No newline at end of file diff --git a/raft/consensus_module.nim b/raft/consensus_module.nim index c6bd388..063ee45 100644 --- a/raft/consensus_module.nim +++ b/raft/consensus_module.nim @@ -5,4 +5,16 @@ # * MIT license ([LICENSE-MIT](LICENSE-MIT)) # at your option. # This file may not be copied, modified, or distributed except according to -# those terms. \ No newline at end of file +# those terms. + +import protocol +import types + +proc RaftConsensusProcessRequestVote*(consensus: RaftConsensusModule, msg: RaftMessageRequestVote): RaftMessageRequestVoteResponse = + discard + +proc RaftConsensusProcessAppendEntries*(consensus: RaftConsensusModule, msg: RaftMessageAppendEntries): RaftMessageAppendEntriesResponse = + discard + +proc RaftConsensusQuorumMin(consensus: RaftConsensusModule): bool = + discard \ No newline at end of file diff --git a/raft/protocol.nim b/raft/protocol.nim index 94db99b..4851b9f 100644 --- a/raft/protocol.nim +++ b/raft/protocol.nim @@ -7,11 +7,10 @@ # This file may not be copied, modified, or distributed except according to # those terms. - # # - # Raft Messages Protocol definition # - # # + # # + # Raft Protocol definition # + # # import types -import options type # Raft Node Messages OPs @@ -20,29 +19,49 @@ type rmoAppendLogEntry = 1, rmoInstallSnapshot = 2 # For dynamic adding of new Raft Nodes - RaftMessagePayloadChecksum* = object # Checksum probably will be a SHA3 hash not sure about this at this point - RaftMessagePayload*[LogEntryDataType] = ref object - data*: RaftNodeLogEntry[LogEntryDataType] - checksum*: RaftMessagePayloadChecksum + RaftMessageRespoonseError* = enum # Raft message response errors + rmreSuccess = 0, + rmreFail = 1 - RaftMessage*[LogEntryDataType] = ref object of RaftMessageBase - op*: RaftMessageOps # Message Op - Ask For Votes, Append Entry(ies), Install Snapshot etc. - payload*: Option[seq[RaftMessagePayload[LogEntryDataType]]] # Optional Message Payload(s) - e.g. log entry(ies). Will be empty for a Heart-Beat # Heart-Beat will be a message with Append Entry(ies) Op and empty payload + RaftMessageResponseBase* = ref object of RootObj + msgId*: RaftMessageId # Original Message ID + senderId*: RaftNodeId # Sender Raft Node ID + respondentId: RaftNodeId # Responding RaftNodeId + senderTerm*: RaftNodeTerm # Sender Raft Node Term - RaftMessageResponse*[SmStateType] = ref object of RaftMessageBase - success*: bool # Indicates success/failure - state*: Option[SmStateType] # Raft Abstract State Machine State + RaftMessageRequestVote* = ref object of RaftMessageBase + lastLogTerm*: RaftNodeTerm + lastLogIndex*: RaftLogIndex + + RaftMessageRequestVoteResponse* = ref object of RaftMessageResponseBase + granted*: bool + + RaftMessageAppendEntries*[SmCommandType] = ref object of RaftMessageBase + prevLogIndex*: RaftLogIndex + prevLogTerm*: RaftNodeTerm + commitIndex*: RaftLogIndex + logEntries*: Option[seq[RaftNodeLogEntry[SmCommandType]]] # Optional log entry(ies). Will be empty for a Heart-Beat + + RaftMessageAppendEntriesResponse*[SmStateType] = ref object of RaftMessageResponseBase + success*: bool + lastLogIndex*: RaftLogIndex + state*: Option[SmStateType] # Optional Raft Abstract State Machine State # Raft Node Client Request/Response definitions - RaftNodeClientRequestOps = enum - rncroRequestState = 0, - rncroAppendNewEntry = 1 + RaftNodeClientRequestOps* = enum + rncroRequestSmState = 0, + rncroExecSmCommand = 1 - RaftNodeClientRequest*[LogEntryDataType] = ref object + RaftNodeClientResponseError = enum + rncrSuccess = 0, + rncrFail = 1, + rncrNotLeader = 2 + + RaftNodeClientRequest*[SmCommandType] = ref object op*: RaftNodeClientRequestOps - payload*: Option[RaftMessagePayload[LogEntryDataType]] # Optional RaftMessagePayload carrying a Log Entry + payload*: Option[SmCommandType] # Optional RaftMessagePayload carrying a Log Entry RaftNodeClientResponse*[SmStateType] = ref object - success*: bool # Indicate succcess + error*: RaftNodeClientResponseError state*: Option[SmStateType] # Optional Raft Abstract State Machine State raftNodeRedirectId*: Option[RaftNodeId] # Optional Raft Node ID to redirect the request to in case of failure \ No newline at end of file diff --git a/raft/raft_api.nim b/raft/raft_api.nim index f31caf3..e0d2055 100644 --- a/raft/raft_api.nim +++ b/raft/raft_api.nim @@ -9,76 +9,104 @@ import types import protocol +import consensus_module export types, protocol # Raft Node Public API procedures / functions -proc RaftNodeCreateNew*[LogEntryDataType, SmStateType]( # Create New Raft Node +proc RaftNodeCreateNew*[SmCommandType, SmStateType]( # Create New Raft Node id: RaftNodeId, peers: RaftNodePeers, persistentStorage: RaftNodePersistentStorage, - msgSendCallback: RaftMessageSendCallback): RaftNode[LogEntryDataType, SmStateType] = + msgSendCallback: RaftMessageSendCallback): RaftNode[SmCommandType, SmStateType] = discard -proc RaftNodeLoad*[LogEntryDataType, SmStateType]( +proc RaftNodeLoad*[SmCommandType, SmStateType]( persistentStorage: RaftNodePersistentStorage, # Load Raft Node From Storage - msgSendCallback: RaftMessageSendCallback): Result[RaftNode[LogEntryDataType, SmStateType], string] = + msgSendCallback: RaftMessageSendCallback): Result[RaftNode[SmCommandType, SmStateType], string] = discard -proc RaftNodeStop*(node: RaftNode) = +proc RaftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = discard -proc RaftNodeStart*(node: RaftNode) = +proc RaftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = discard -func RaftNodeIdGet*(node: RaftNode): RaftNodeId = # Get Raft Node ID +func RaftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId = # Get Raft Node ID discard -func RaftNodeStateGet*(node: RaftNode): RaftNodeState = # Get Raft Node State +func RaftNodeStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeState = # Get Raft Node State discard -func RaftNodeTermGet*(node: RaftNode): RaftNodeTerm = # Get Raft Node Term +func RaftNodeTermGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeTerm = # Get Raft Node Term discard -func RaftNodePeersGet*(node: RaftNode): RaftNodePeers = # Get Raft Node Peers +func RaftNodePeersGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodePeers = # Get Raft Node Peers discard -func RaftNodeIsLeader*(node: RaftNode): bool = # Check if Raft Node is Leader +func RaftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool = # Check if Raft Node is Leader discard -proc RaftNodeMessageDeliver*(node: RaftNode, raftMessage: RaftMessageBase): RaftMessageResponse {.discardable.} = # Deliver Raft Message to the Raft Node +# Deliver Raft Message to the Raft Node and dispatch it +proc RaftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase): RaftMessageResponseBase = discard -proc RaftNodeRequest*(node: RaftNode, req: RaftNodeClientRequest): RaftNodeClientResponse = # Process RaftNodeClientRequest +# Process RaftNodeClientRequests +proc RaftNodeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]): RaftNodeClientResponse[SmStateType] = discard -proc RaftNodeLogIndexGet*(node: RaftNode): RaftLogIndex = +proc RaftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex = discard -proc RaftNodeLogEntryGet*(node: RaftNode, logIndex: RaftLogIndex): Result[RaftNodeLogEntry, string] = +proc RaftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): Result[RaftNodeLogEntry[SmCommandType], string] = discard # Abstract State Machine Ops -func RaftNodeSmStateGet*[LogEntryDataType, SmStateType](node: RaftNode[LogEntryDataType, SmStateType]): SmStateType = +func RaftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType = node.stateMachine.state -proc RaftNodeSmInit[LogEntryDataType, SmStateType](stateMachine: var RaftNodeStateMachine[LogEntryDataType, SmStateType]) = +proc RaftNodeSmInit[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) = mixin RaftSmInit RaftSmInit(stateMachine) -proc RaftNodeSmApply[LogEntryDataType, SmStateType](stateMachine: RaftNodeStateMachine[LogEntryDataType, SmStateType], logEntry: LogEntryDataType) = +proc RaftNodeSmApply[SmCommandType, SmStateType](stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType], command: SmCommandType) = mixin RaftSmApply - RaftSmApply(stateMachine, logEntry) + RaftSmApply(stateMachine, command) -# Timer manipulation -proc RaftTimerCreate*[TimerDurationType](timerInterval: TimerDurationType, repeat: bool, timer_callback: RaftTimerCallback): TimerId = # I guess Duration should be monotonic +# Private Abstract Timer manipulation Ops +proc RaftTimerCreate[TimerDurationType](timerInterval: TimerDurationType, timer_callback: RaftTimerCallback): TimerId = # I guess Duration should be monotonic mixin RaftTimerCreateCustomImpl - RaftTimerCreateCustomImpl(timerInterval, repeat, timer_callback) + RaftTimerCreateCustomImpl(timerInterval, timer_callback) -template RaftTimerCancel*(TimerId) = +template RaftTimerCancel(TimerId) = mixin RaftTimerCancelCustomImpl RaftTimerCancelCustomImpl(TimerId) -template RaftTimerIsExpired*(TimerId): bool = +template RaftTimerIsExpired(TimerId): bool = mixin RaftTimerIsExpiredImpl RaftTimerIsExpiredImpl(TimerId) +# Private Log Ops +proc RaftNodeLogAppend[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) = + discard + +proc RaftNodeLastLogIndex[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): uint64 = + discard + +proc RaftNodeLogTruncate[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: uint64) = + discard + +# Private Timers Create Ops +proc RaftNodeScheduleHeartBeat[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId = + discard + +proc RaftNodeScheduleHeartBeatTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId = + discard + +proc scheduleElectionTimeOut[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId = + discard + +proc scheduleRequestVoteTimeout[SmCommandType, SmStateType, TimerDurationType](node: RaftNode[SmCommandType, SmStateType]): TimerId = + discard + +proc RaftNodeHeartBeatTimeout[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) = + discard diff --git a/raft/types.nim b/raft/types.nim index 93a1b0c..e113c1b 100644 --- a/raft/types.nim +++ b/raft/types.nim @@ -7,117 +7,134 @@ # This file may not be copied, modified, or distributed except according to # those terms. -# Raft Node Public Types. -# I guess that at some point these can be moved to a separate file called raft_consensus_types.nim for example +# Raft Node Public Types import std/locks -import stew/results -import eth/keyfile import std/sets +import options +import stew/results +import uuid4 + +export results, options -export results type - # Raft Node basic definitions - Blob* = seq[byte] - RaftNodeState* = enum - UNKNOWN = 0, - FOLLOWER = 1, - LEADER = 2 + rnsUnknown = 0, + rnsFollower = 1, + rnsCandidate = 2 + rnsLeader = 3 - RaftNodeId* = UUID # UUID uniquely identifying every Raft Node - RaftNodePeers* = seq[RaftNodeId] # List of Raft Node Peers IDs + RaftNodeId* = Uuid # uuid4 uniquely identifying every Raft Node RaftNodeTerm* = uint64 # Raft Node Term Type RaftLogIndex* = uint64 # Raft Node Log Index Type + RaftNodePeer* = object # Raft Node Peer object + id*: RaftNodeId + nextIndex*: RaftLogIndex # For each peer Raft Node, index of the next log entry to send to that Node + # (initialized to leader last log index + 1) + matchIndex*: RaftLogIndex # For each peer Raft Node, index of highest log entry known to be replicated on Node + # (initialized to 0, increases monotonically) + hasVoted*: bool # Indicates if this peer have voted for this Raft Node During Election + canVote*: bool # Indicates if this peer can vote + + RaftNodePeers* = seq[RaftNodePeer] # List of Raft Node Peers + + # Raft Node Abstract State Machine type - RaftNodeStateMachine*[LogEntryDataType, SmStateType] = ref object # Some probably opaque State Machine Impelementation to be used by the Raft Node - # providing at minimum operations for initialization, querying the current state - # and RaftNodeLogEntry application + RaftNodeStateMachine*[SmCommandType, SmStateType] = object # Some opaque State Machine Impelementation to be used by the Raft Node + # providing at minimum operations for initialization, querying the current state + # and RaftNodeLogEntry (SmCommandType) application state: SmStateType # Raft Node Persistent Storage basic definition - RaftNodePersistentStorage* = ref object # Should be some kind of Persistent Transactional Store Wrapper + RaftNodePersistentStorage*[SmCommandType, SmStateType] = object # Should be some kind of Persistent Transactional Store Wrapper # Basic modules (algos) definitions - RaftNodeAccessCallback[LogEntryDataType, SmStateType] = proc: RaftNode[LogEntryDataType, SmStateType] {.nimcall, gcsafe.} # This should be implementes as a closure holding the RaftNode + RaftNodeAccessCallback[SmCommandType, SmStateType] = proc: RaftNode[SmCommandType, SmStateType] {.nimcall, gcsafe.} # This should be implementes as a closure holding the RaftNode - RaftConsensusModule*[LogEntryDataType, SmStateType] = object of RootObj - stateTransitionsFsm: seq[byte] # I plan to use nim.fsm https://github.com/ba0f3/fsm.nim - raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType, SmStateType] + RaftConsensusModule*[SmCommandType, SmStateType] = object of RootObj + stateTransitionsFsm: seq[byte] # I plan to use nim.fsm https://github.com/ba0f3/fsm.nim + raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType] - RaftLogCompactionModule*[LogEntryDataType, SmStateType] = object of RootObj - raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType, SmStateType] + RaftLogCompactionModule*[SmCommandType, SmStateType] = object of RootObj + raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType] - RaftMembershipChangeModule*[LogEntryDataType, SmStateType] = object of RootObj - raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType, SmStateType] + RaftMembershipChangeModule*[SmCommandType, SmStateType] = object of RootObj + raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType] # Callback for sending messages out of this Raft Node - RaftMessageId* = UUID # UUID assigned to every Raft Node Message, - # so it can be matched with it's corresponding response etc. + RaftMessageId* = Uuid # UUID assigned to every Raft Node Message, + # so it can be matched with it's corresponding response etc. - RaftMessageSendCallback* = proc (raftMessage: RaftMessageBase) {.nimcall, gcsafe.} # Callback for Sending Raft Node Messages - # out of this Raft Node. Can be used for broadcasting - # (a Heart-Beat for example) - - # Raft Node basic Log definitions - RaftNodeLogEntry*[LogEntryDataType] = ref object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.) - term*: RaftNodeTerm - data*: LogEntryDataType - - RaftNodeLog*[LogEntryDataType] = ref object # Needs more elaborate definition. - # Probably this will be a RocksDB/MDBX/SQLite Store Wrapper etc. - logData*: seq[RaftNodeLogEntry[LogEntryDataType]] # Raft Node Log Data - - # Base type for Raft message objects - RaftMessageBase* = ref object of RootObj # Base Type for Raft Node Messages + RaftMessageBase* = ref object of RootObj # Base Type for Raft Protocol Messages msgId*: RaftMessageId # Message UUID senderId*: RaftNodeId # Sender Raft Node ID senderTerm*: RaftNodeTerm # Sender Raft Node Term - peers*: RaftNodePeers # List of Raft Node IDs, which should receive this message + + RaftMessageSendCallback* = proc (recipient: RaftNodeId, raftMessage: RaftMessageBase) {.nimcall, gcsafe.} # Callback for Sending Raft Node Messages + # out of this Raft Node. + + # For later use when adding/removing new nodes (dynamic configuration chganges) + RaftNodeConfiguration* = object + + # Raft Node Log definition + LogEntryType* = enum + etUnknown = 0, + etConfiguration = 1, + etData = 2, + etNoOp = 3 + + RaftNodeLogEntry*[SmCommandType] = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.) + term*: RaftNodeTerm + index*: RaftLogIndex + clusterTime*: object + entryType*: LogEntryType # Type of entry - data to append, configuration or no op etc. + configuration: Option[RaftNodeConfiguration] # Node configuration + data*: Option[SmCommandType] # Entry data (State Machine Command) - this is mutually exclusive with configuration + # depending on entryType field + + RaftNodeLog*[SmCommandType] = object # Needs more elaborate definition. + # Probably this will be a RocksDB/MDBX/SQLite Store Wrapper etc. + logData*: seq[RaftNodeLogEntry[SmCommandType]] # Raft Node Log Data # Timer types - TimerId* = UUID + TimerId* = Uuid RaftTimerCallback* = proc (timerId: TimerId) {.nimcall, gcsafe.} # Pass any function wrapped in a closure # Raft Node Object type - RaftNode*[LogEntryDataType, SmStateType] = ref object + RaftNode*[SmCommandType, SmStateType] = object # Timers - activeTimers: HashSet[TimerId] + activeTimersSet: HashSet[TimerId] + requestVoteTimeout: uint64 + heartBeatTimeOut: uint64 + appendEntriesTimeOut: uint64 - # Mtx definitions go here + # Mtx definition(s) go here raftStateMutex: Lock - raftLogMutex: Lock - raftCommMutexReceiveMsg: Lock - raftCommMutexClientResponse: Lock # Modules (Algos) - consensusModule: RaftConsensusModule[LogEntryDataType, SmStateType] - logCompactionModule: RaftLogCompactionModule[LogEntryDataType, SmStateType] - membershipChangeModule: RaftMembershipChangeModule[LogEntryDataType, SmStateType] + consensusModule: RaftConsensusModule[SmCommandType, SmStateType] + logCompactionModule: RaftLogCompactionModule[SmCommandType, SmStateType] + membershipChangeModule: RaftMembershipChangeModule[SmCommandType, SmStateType] # Misc msgSendCallback: RaftMessageSendCallback - persistentStorage: RaftNodePersistentStorage + persistentStorage: RaftNodePersistentStorage[SmCommandType, SmStateType] # Persistent state id: RaftNodeId # This Raft Node ID state: RaftNodeState # This Raft Node State currentTerm: RaftNodeTerm # Latest term this Raft Node has seen (initialized to 0 on first boot, increases monotonically) - log: RaftNodeLog[LogEntryDataType] # This Raft Node Log votedFor: RaftNodeId # Candidate RaftNodeId that received vote in current term (or nil/zero if none), # also used to redirect Client Requests in case this Raft Node is not the leader + log: RaftNodeLog[SmCommandType] # This Raft Node Log + stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType] # Not sure for now putting it here. I assume that persisting the State Machine's + # state is enough to consider it 'persisted' peers: RaftNodePeers # This Raft Node Peers IDs. I am not sure if this must be persistent or volatile but making it persistent # makes sense for the moment - stateMachine: RaftNodeStateMachine[LogEntryDataType, SmStateType] # Not sure for now putting it here. I assume that persisting the State Machine's - # state is enough to consider it 'persisted' + # Volatile state commitIndex: RaftLogIndex # Index of highest log entry known to be committed (initialized to 0, increases monotonically) lastApplied: RaftLogIndex # Index of highest log entry applied to state machine (initialized to 0, increases monotonically) - - # Volatile state on leaders - nextIndex: seq[RaftLogIndex] # For each peer Raft Node, index of the next log entry to send to that Node - # (initialized to leader last log index + 1) - matchIndex: seq[RaftLogIndex] # For each peer Raft Node, index of highest log entry known to be replicated on Node - # (initialized to 0, increases monotonically) + currentLeaderId: RaftNodeId # The ID of the cirrent leader Raft Node or 0/nil if None is leader (election is in progress etc.) \ No newline at end of file