nim-raft/raft/raft_api.nim

204 lines
8.4 KiB
Nim
Raw Normal View History

# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import types
import protocol
2023-08-14 20:49:21 +00:00
import consensus_module
2023-09-03 03:27:27 +00:00
import log_ops
import ../db/kvstore_mdbx
2023-09-04 09:47:27 +00:00
import chronicles
import std/random
2023-09-04 09:47:27 +00:00
export
types,
protocol,
consensus_module,
log_ops,
chronicles
2023-09-04 09:47:27 +00:00
# Forward declarations
2023-10-13 06:03:42 +00:00
proc raftNodeSmInit*[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
2023-08-31 14:05:41 +00:00
2023-09-04 09:47:27 +00:00
# Raft Node Public API
2023-09-11 16:55:30 +00:00
proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType];
id: RaftNodeId; peersIds: seq[RaftNodeId];
2023-08-31 20:52:52 +00:00
# persistentStorage: RaftNodePersistentStorage,
2023-09-11 16:55:30 +00:00
msgSendCallback: RaftMessageSendCallback;
electionTimeout: int=150;
heartBeatTimeout: int=150;
2023-10-20 03:56:02 +00:00
appendEntriesRespTimeout: int=20;
votingRespTimeout: int=20;
heartBeatRespTimeout: int=10
2023-09-11 16:55:30 +00:00
): T =
2023-08-31 14:05:41 +00:00
var
2023-08-31 20:52:52 +00:00
peers: RaftNodePeers
for peerId in peersIds:
2023-10-20 00:35:42 +00:00
peers.add(RaftNodePeer(id: peerId, nextIndex: 1, matchIndex: 0, hasVoted: false, canVote: true))
2023-08-31 20:52:52 +00:00
result = T(
2023-08-31 20:52:52 +00:00
id: id, state: rnsFollower, currentTerm: 0, peers: peers, commitIndex: 0, lastApplied: 0,
2023-09-06 16:18:02 +00:00
msgSendCallback: msgSendCallback, votedFor: DefaultUUID, currentLeaderId: DefaultUUID,
2023-10-20 03:56:02 +00:00
electionTimeout: electionTimeout, heartBeatTimeout: heartBeatTimeout, appendEntriesRespTimeout: appendEntriesRespTimeout,
heartBeatRespTimeout: heartBeatRespTimeout, votingRespTimeout: votingRespTimeout, hrtBtSuccess: false
2023-08-31 14:05:41 +00:00
)
2023-10-13 04:24:35 +00:00
raftNodeSmInit(result.stateMachine)
2023-09-06 19:27:22 +00:00
initRLock(result.raftStateMutex)
2023-10-13 04:24:35 +00:00
proc raftNodeLoad*[SmCommandType, SmStateType](
persistentStorage: RaftNodePersistentStorage, # Load Raft Node From Storage
2023-08-14 20:49:21 +00:00
msgSendCallback: RaftMessageSendCallback): Result[RaftNode[SmCommandType, SmStateType], string] =
discard
2023-10-13 04:24:35 +00:00
proc raftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId {.gcsafe.} = # Get Raft Node ID
2023-09-06 19:27:22 +00:00
withRLock(node.raftStateMutex):
2023-09-06 16:18:02 +00:00
result = node.id
2023-10-13 04:24:35 +00:00
proc raftNodeStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeState = # Get Raft Node State
2023-09-06 19:27:22 +00:00
withRLock(node.raftStateMutex):
2023-09-06 16:18:02 +00:00
result = node.state
2023-10-13 04:24:35 +00:00
proc raftNodeTermGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeTerm = # Get Raft Node Term
2023-09-06 19:27:22 +00:00
withRLock(node.raftStateMutex):
2023-09-06 16:18:02 +00:00
result = node.currentTerm
2023-10-13 04:24:35 +00:00
func raftNodePeersGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodePeers = # Get Raft Node Peers
2023-09-06 19:27:22 +00:00
withRLock(node.raftStateMutex):
2023-09-06 16:18:02 +00:00
result = node.peers
2023-10-13 04:24:35 +00:00
func raftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool = # Check if Raft Node is Leader
2023-09-06 19:27:22 +00:00
withRLock(node.raftStateMutex):
2023-09-06 16:18:02 +00:00
result = node.state == rnsLeader
2023-08-14 20:49:21 +00:00
# Deliver Raft Message to the Raft Node and dispatch it
2023-10-13 04:24:35 +00:00
proc raftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase[SmCommandType, SmStateType]):
Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
var
rm = RaftMessage[SmCommandType, SmStateType](raftMessage)
case rm.op # Dispatch different Raft Message types based on the operation code
of rmoRequestVote:
2023-10-13 04:24:35 +00:00
result = raftNodeHandleRequestVote(node, rm)
2023-09-04 09:47:27 +00:00
of rmoAppendLogEntry:
if rm.logEntries.isSome:
2023-10-13 04:24:35 +00:00
result = raftNodeHandleAppendEntries(node, rm)
2023-09-06 16:18:02 +00:00
else:
2023-10-13 04:24:35 +00:00
result = raftNodeHandleHeartBeat(node, rm)
2023-09-04 09:47:27 +00:00
else: discard
# Process Raft Node Client Requests
2023-10-13 04:24:35 +00:00
proc raftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]):
Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} =
2023-10-20 00:35:42 +00:00
withRLock(node.raftStateMutex):
if not raftNodeIsLeader(node):
return RaftNodeClientResponse(nodeId: node.id, error: rncreNotLeader, currentLeaderId: node.currentLeaderId)
case req.op
of rncroExecSmCommand:
let resFut = await raftNodeReplicateSmCommand(node, req.smCommand)
if resFut.read:
return RaftNodeClientResponse(nodeId: node.id, error: rncreSuccess, state: raftNodeStateGet(node))
else:
return RaftNodeClientResponse(nodeId: node.id, error: rncreFail, state: raftNodeStateGet(node))
of rncroRequestSmState:
2023-10-13 04:24:35 +00:00
return RaftNodeClientResponse(nodeId: node.id, error: rncreSuccess, state: raftNodeStateGet(node))
2023-10-20 00:35:42 +00:00
2023-09-04 09:47:27 +00:00
else:
2023-10-20 00:35:42 +00:00
raiseAssert "Unknown client request operation."
# Abstract State Machine Ops
2023-10-13 04:24:35 +00:00
func raftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType =
withRLock(node.raftStateMutex):
node.stateMachine.state
2023-10-13 06:03:42 +00:00
proc raftNodeSmInit*[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) =
2023-10-13 04:24:35 +00:00
mixin raftSmInit
2023-10-13 06:03:42 +00:00
raftSmInit(stateMachine)
2023-10-13 04:24:35 +00:00
2023-10-13 06:03:42 +00:00
proc raftNodeSmApply*[SmCommandType, SmStateType](stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType], command: SmCommandType) =
2023-10-13 04:24:35 +00:00
mixin raftSmApply
2023-10-13 06:03:42 +00:00
raftSmApply(stateMachine, command)
2023-09-06 16:18:02 +00:00
# Private Abstract Timer creation
2023-10-13 04:24:35 +00:00
template raftTimerCreate*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] =
mixin raftTimerCreateCustomImpl
raftTimerCreateCustomImpl(timerInterval, timerCallback)
# Timers scheduling stuff etc.
2023-10-13 04:24:35 +00:00
proc raftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
2023-09-23 01:26:45 +00:00
withRLock(node.raftStateMutex):
2023-10-13 04:24:35 +00:00
node.heartBeatTimer = raftTimerCreate(node.heartBeatTimeout, proc() = asyncSpawn raftNodeSendHeartBeat(node))
2023-10-13 04:24:35 +00:00
proc raftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
2023-09-06 19:27:22 +00:00
debug "Raft Node sending Heart-Beat to peers", node_id=node.id
2023-10-20 01:41:10 +00:00
withRLock(node.raftStateMutex):
2023-10-20 03:56:02 +00:00
var hrtBtFuts: seq[Future[RaftMessageResponseBase[SmCommandType, SmStateType]]]
2023-10-20 01:41:10 +00:00
for raftPeer in node.peers:
let msgHrtBt = RaftMessage[SmCommandType, SmStateType](
op: rmoAppendLogEntry, senderId: node.id, receiverId: raftPeer.id,
senderTerm: raftNodeTermGet(node), commitIndex: node.commitIndex,
prevLogIndex: raftNodeLogIndexGet(node) - 1, prevLogTerm: if raftNodeLogIndexGet(node) > 0: raftNodeLogEntryGet(node, raftNodeLogIndexGet(node) - 1).term else: 0
)
2023-10-20 03:56:02 +00:00
hrtBtFuts.add(node.msgSendCallback(msgHrtBt))
let allHrtBtFuts = allFutures(hrtBtFuts)
await allHrtBtFuts or raftTimerCreate(node.heartBeatRespTimeout, proc()=discard)
var successCnt = 0
for fut in hrtBtFuts:
if fut.finished:
let resp = RaftMessageResponse[SmCommandType, SmStateType](fut.read)
if resp.success:
successCnt.inc
if successCnt >= (node.peers.len div 2 + node.peers.len mod 2):
node.hrtBtSuccess = true
2023-10-20 01:41:10 +00:00
2023-10-13 04:24:35 +00:00
raftNodeScheduleHeartBeat(node)
2023-09-06 16:18:02 +00:00
2023-10-13 04:24:35 +00:00
proc raftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
2023-09-23 01:26:45 +00:00
withRLock(node.raftStateMutex):
2023-10-13 04:24:35 +00:00
node.electionTimeoutTimer = raftTimerCreate(node.electionTimeout + rand(node.electionTimeout), proc =
asyncSpawn raftNodeStartElection(node)
2023-09-23 01:26:45 +00:00
)
2023-10-20 03:56:02 +00:00
node.hrtBtSuccess = false
# Raft Node Control
2023-10-13 04:24:35 +00:00
proc raftNodeCancelTimers*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
2023-09-06 19:27:22 +00:00
withRLock(node.raftStateMutex):
if node.heartBeatTimer != nil:
2023-09-07 03:56:35 +00:00
asyncSpawn cancelAndWait(node.heartBeatTimer)
if node.electionTimeoutTimer != nil:
2023-09-07 03:56:35 +00:00
asyncSpawn cancelAndWait(node.electionTimeoutTimer )
2023-09-03 00:53:48 +00:00
2023-10-13 04:24:35 +00:00
proc raftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
2023-09-03 00:53:48 +00:00
# Try to stop gracefully
2023-09-06 19:27:22 +00:00
withRLock(node.raftStateMutex):
# Abort election if in election
if node.state == rnsCandidate:
2023-10-13 04:24:35 +00:00
raftNodeAbortElection(node)s
2023-09-23 01:26:45 +00:00
# Cancel pending timers (if any)
2023-10-13 04:24:35 +00:00
raftNodeCancelTimers(node)
2023-10-13 04:24:35 +00:00
proc raftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
2023-09-23 01:26:45 +00:00
randomize()
withRLock(node.raftStateMutex):
node.state = rnsFollower
debug "Start Raft Node", node_id=node.id, state=node.state
2023-10-13 04:24:35 +00:00
raftNodeScheduleElectionTimeout(node)