Code refactoring

This commit is contained in:
Marto 2024-02-07 15:29:53 +02:00
parent b4f8fd9999
commit 2ecc344aa1
19 changed files with 228 additions and 1273 deletions

7
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,7 @@
{
"nim.projectMapping": [{
"projectFile": "raft.nim",
"fileRegex": ".*\\.nim"
}
]
}

View File

@ -7,9 +7,7 @@
# This file may not be copied, modified, or distributed except according to
# those terms.
import
raft/raft_api
import raft/consensus_state_machine
export
raft_api,
consensus_state_machine
import raft/types
export consensus_state_machine
export types

View File

@ -1,227 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import types
import protocol
import log_ops
import chronicles
import async_util
proc raftNodeQuorumMin[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool =
result = false
var cnt = 0
for peer in node.peers:
if peer.hasVoted:
cnt.inc
if cnt >= (node.peers.len div 2 + node.peers.len mod 2):
result = true
proc raftNodeCheckCommitIndex*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]) =
withRLock(node.raftStateMutex):
if msg.commitIndex > node.commitIndex:
let newcommitIndex = min(msg.commitIndex, raftNodeLogIndexGet(node))
while node.commitIndex < newcommitIndex:
node.commitIndex.inc
raftNodeApplyLogEntry(node, node.commitIndex)
proc raftNodeHandleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
RaftMessageResponse[SmCommandType, SmStateType] =
debug "Received heart-beat", node_id=node.id, sender_id=msg.sender_id, node_current_term=node.currentTerm, sender_term=msg.senderTerm
result = RaftMessageResponse[SmCommandType, SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false)
withRLock(node.raftStateMutex):
if msg.senderTerm >= node.currentTerm:
raftNodeCancelTimers(node)
if node.state == rnsCandidate:
raftNodeAbortElection(node)
result.success = true
node.state = rnsFollower
node.currentTerm = msg.senderTerm
node.votedFor = DefaultUUID
node.currentLeaderId = msg.senderId
raftNodeCheckCommitIndex(node, msg)
raftNodeScheduleElectionTimeout(node)
proc raftNodeHandleRequestVote*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
RaftMessageResponse[SmCommandType, SmStateType] =
result = RaftMessageResponse[SmCommandType, SmStateType](op: rmoRequestVote, msgId: msg.msgId, senderId: node.id, receiverId: msg.senderId, granted: false)
withRLock(node.raftStateMutex):
if msg.senderTerm > node.currentTerm and node.votedFor == DefaultUUID:
if msg.lastLogTerm > raftNodeLogEntryGet(node, raftNodeLogIndexGet(node)).term or
(msg.lastLogTerm == raftNodeLogEntryGet(node, raftNodeLogIndexGet(node)).term and msg.lastLogIndex >= raftNodeLogIndexGet(node)):
if node.electionTimeoutTimer != nil:
asyncSpawn cancelAndWait(node.electionTimeoutTimer)
node.votedFor = msg.senderId
node.currentLeaderId = DefaultUUID
result.granted = true
raftNodeScheduleElectionTimeout(node)
proc raftNodeAbortElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
withRLock(node.raftStateMutex):
node.state = rnsFollower
for fut in node.votesFuts:
asyncSpawn cancelAndWait(fut)
proc raftNodeStartElection*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
mixin raftNodeScheduleElectionTimeout, raftTimerCreate
withRLock(node.raftStateMutex):
if node.state == rnsLeader and node.hrtBtSuccess:
raftNodeScheduleElectionTimeout(node)
return
if node.state == rnsLeader and not node.hrtBtSuccess:
raftNodeCancelTimers(node)
debug "Raft Node transition to follower - unsuccsessful heart beat rounds", node_id=node.id
node.state = rnsFollower
node.currentLeaderId = DefaultUUID
node.votedFor = DefaultUUID
raftNodeScheduleElectionTimeout(node)
return
raftNodeScheduleElectionTimeout(node)
while node.votesFuts.len > 0:
discard node.votesFuts.pop
node.currentTerm.inc
node.state = rnsCandidate
node.votedFor = node.id
debug "Raft Node started election", node_id=node.id, state=node.state, voted_for=node.votedFor
for peer in node.peers:
peer.hasVoted = false
node.votesFuts.add(node.msgSendCallback(
RaftMessage[SmCommandType, SmStateType](
op: rmoRequestVote, msgId: genUUID(), senderId: node.id,
receiverId: peer.id, lastLogTerm: raftNodeLogEntryGet(node, raftNodeLogIndexGet(node)).term,
lastLogIndex: raftNodeLogIndexGet(node), senderTerm: node.currentTerm)
)
)
# Wait for votes or voting timeout
let all = allFutures(node.votesFuts)
await all or raftTimerCreate(node.votingRespTimeout, proc()=discard)
if not all.finished:
debug "Raft Node Voting timeout", node_id=node.id
# Process votes (if any)
for voteFut in node.votesFuts:
if voteFut.finished and not voteFut.cancelled:
let respVote = RaftMessageResponse[SmCommandType, SmStateType](voteFut.read)
debug "Received vote", node_id=node.id, sender_id=respVote.senderId, granted=respVote.granted
for p in node.peers:
if p.id == respVote.senderId:
p.hasVoted = respVote.granted
else:
await cancelAndWait(voteFut)
withRLock(node.raftStateMutex):
if node.state == rnsCandidate:
if raftNodeQuorumMin(node):
await cancelAndWait(node.electionTimeoutTimer)
raftNodeScheduleElectionTimeout(node)
debug "Raft Node transition to leader", node_id=node.id
node.state = rnsLeader # Transition to leader state and send Heart-Beat to establish this node as the cluster leader
asyncSpawn raftNodeSendHeartBeat(node)
else:
node.state = rnsFollower
node.currentLeaderId = DefaultUUID
node.votedFor = DefaultUUID
proc raftNodeHandleAppendEntries*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], msg: RaftMessage[SmCommandType, SmStateType]):
RaftMessageResponse[SmCommandType, SmStateType] =
result = RaftMessageResponse[SmCommandType, SmStateType](op: rmoAppendLogEntry, senderId: node.id, receiverId: msg.senderId, msgId: msg.msgId, success: false)
withRLock(node.raftStateMutex):
if msg.senderTerm >= node.currentTerm:
raftNodeCancelTimers(node)
if node.state == rnsCandidate:
raftNodeAbortElection(node)
node.currentTerm = msg.senderTerm
node.votedFor = DefaultUUID
node.state = rnsFollower
node.currentLeaderId = msg.senderId
raftNodeScheduleElectionTimeout(node)
if msg.senderTerm < node.currentTerm:
return
if msg.prevLogIndex > raftNodeLogIndexGet(node):
return
if msg.prevLogIndex == raftNodeLogIndexGet(node):
if msg.prevLogTerm != raftNodeLogEntryGet(node, raftNodeLogIndexGet(node)).term:
return
if msg.prevLogIndex < raftNodeLogIndexGet(node):
if msg.prevLogTerm != raftNodeLogEntryGet(node, msg.prevLogIndex).term:
raftNodeLogTruncate(node, msg.prevLogIndex)
return
if msg.logEntries.isSome:
for entry in msg.logEntries.get:
raftNodeLogAppend(node, entry)
raftNodeCheckCommitIndex(node, msg)
result.success = true
proc raftNodeReplicateSmCommand*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], cmd: SmCommandType): Future[bool] {.async.} =
mixin RaftLogEntry, raftTimerCreate
result = false
withRLock(node.raftStateMutex):
var
logEntry: RaftLogEntry[SmCommandType](term: node.currentTerm, data: cmd, entryType: etData)
raftNodeLogAppend(node, logEntry)
for fut in node.replicateFuts:
discard fut
node.replicateFuts.clear
for peer in node.peers:
var
msg: RaftMessage[SmCommandType, SmStateType] = RaftMessage[SmCommandType, SmStateType](
op: rmoAppendLogEntry, msgId: genUUID(), senderId: node.id, receiverId: peer.id,
senderTerm: node.currentTerm, prevLogIndex: raftNodeLogIndexGet(node) - 1,
prevLogTerm: raftNodeLogEntryGet(node, raftNodeLogIndexGet(node) - 1).term,
commitIndex: node.commitIndex, entries: @[logEntry]
)
node.replicateFuts.add(node.msgSendCallback(msg))
let allReplicateFuts = allFutures(node.replicateFuts)
await allReplicateFuts or raftTimerCreate(node.appendEntriesRespTimeout, proc()=discard)
if not allReplicateFuts.finished:
debug "Raft Node Replication timeout", node_id=node.id
var replicateCnt = 0
for fut in node.replicateFuts:
if fut.finished and not fut.cancelled:
let resp = RaftMessageResponse[SmCommandType, SmStateType](fut.read)
if resp.success:
replicateCnt.inc
info "Raft Node Replication success", node_id=node.id, sender_id=resp.senderId
else:
info "Raft Node Replication failed", node_id=node.id, sender_id=resp.senderId
if replicateCnt >= (node.peers.len div 2 + node.peers.len mod 2):
node.commitIndex = raftNodeLogIndexGet(node)
raftNodeApplyLogEntry(node, node.commitIndex) # Apply to state machine
result = true

View File

@ -8,10 +8,14 @@
# those terms.
import types
import log
import tracker
import std/[times]
import std/sequtils
import std/random
randomize()
type
@ -25,17 +29,6 @@ type
Rejected = 0,
Accepted = 1
RaftElectionResult* = enum
Unknown = 0,
Won = 1,
Lost = 2
RaftLogEntryType* = enum
rletCommand = 0,
rletConfig = 1,
rletEmpty = 2
RaftRpcAppendRequest* = object
previousTerm*: RaftNodeTerm
previousLogIndex*: RaftLogIndex
@ -72,15 +65,6 @@ type
LeaderState* = object
tracker: RaftTracker
RaftElectionTracker* = object
all: seq[RaftNodeId]
responded: seq[RaftNodeId]
granted: int
RaftVotes* = object
voters: seq[RaftNodeId]
current: RaftElectionTracker
CandidateState* = object
votes: RaftVotes
isPrevote: bool
@ -98,24 +82,6 @@ type
of Append: appendRequest*: RaftRpcAppendRequest
of AppendReplay: appendReplay*: RaftRpcAppendReplay
Command* = object
data: seq[byte]
Config* = object
Empty* = object
LogEntry* = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
index*: RaftLogIndex
# TODO: Add configuration too
case kind*: RaftLogEntryType:
of rletCommand: command*: Command
of rletConfig: config*: Config
of rletEmpty: empty*: bool
RaftLog* = object
logEntries: seq[LogEntry]
firstIndex: RaftLogIndex
RaftStateMachineOutput* = object
logEntries*: seq[LogEntry]
# Entries that should be applyed to the "User" State machine
@ -126,9 +92,6 @@ type
votedFor*: Option[RaftNodeId]
stateChange*: bool
RaftConfig* = object
currentSet*: seq[RaftNodeId]
RaftStateMachineState* = object
case state: RaftNodeState
of rnsFollower: follower : FollowerState
@ -157,20 +120,6 @@ type
state*: RaftStateMachineState
RaftFollowerProgress = seq[RaftFollowerProgressTracker]
RaftTracker* = object
progress: RaftFollowerProgress
current: seq[RaftNodeId]
RaftFollowerProgressTracker* = ref object
id: RaftNodeId
nextIndex: RaftLogIndex
# Index of the highest log entry known to be replicated to this server.
matchIndex: RaftLogIndex
commitIndex: RaftLogIndex
replayedIndex: RaftLogIndex
lastMessageAt: times.DateTime
func isLeader*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsLeader
@ -190,28 +139,6 @@ func follower*(s: var RaftStateMachineState): var FollowerState =
func candidate*(s: var RaftStateMachineState): var CandidateState =
return s.candidate
func find(ls: RaftTracker, id: RaftnodeId): RaftFollowerProgressTracker =
for follower in ls.progress:
if follower.id == id:
return follower
func initFollowerProgressTracker*(follower: RaftNodeId, nextIndex: RaftLogIndex, now: times.DateTime): RaftFollowerProgressTracker =
return RaftFollowerProgressTracker(id: follower, nextIndex: nextIndex, matchIndex: 0, commitIndex: 0, replayedIndex: 0, lastMessageAt: now)
func accepted*(fpt: var RaftFollowerProgressTracker, index: RaftLogIndex)=
fpt.matchIndex = max(fpt.matchIndex, index)
fpt.nextIndex = max(fpt.nextIndex, index)
func initTracker(config: RaftConfig, nextIndex: RaftLogIndex, now: times.DateTime): RaftTracker =
var tracker = RaftTracker()
for node in config.currentSet:
tracker.progress.add(initFollowerProgressTracker(node, nextIndex, now))
tracker.current.add(node)
return tracker
func leader*(sm: var RaftStateMachine): var LeaderState =
return sm.state.leader
@ -221,137 +148,6 @@ func follower*(sm: var RaftStateMachine): var FollowerState =
func candidate*(sm: var RaftStateMachine): var CandidateState =
return sm.state.candidate
func contains(a: seq[RaftNodeId], id: RaftNodeId): bool =
var found = false
for n in a:
if n == id:
found = true
break
return found
func initElectionTracker*(nodes: seq[RaftNodeId]): RaftElectionTracker =
var r = RaftElectionTracker()
r.all = nodes
r.granted = 0
return r
func registerVote*(ret: var RaftElectionTracker, nodeId: RaftNodeId, granted: bool): bool =
if not ret.all.contains nodeId:
return false
if not ret.responded.contains nodeId:
ret.responded.add(nodeId)
if granted:
ret.granted += 1
return true
func tallyVote*(ret: var RaftElectionTracker): RaftElectionResult =
let quorym = int(len(ret.all) / 2) + 1
if ret.granted >= quorym:
return RaftElectionResult.Won
let unkown = len(ret.all) - len(ret.responded)
if ret.granted + unkown >= quorym:
return RaftElectionResult.Unknown
else:
return RaftElectionResult.Lost
func initVotes*(nodes: seq[RaftNodeId]): RaftVotes =
var r = RaftVotes(voters: nodes, current: initElectionTracker(nodes))
return r
func initVotes*(config: RaftConfig): RaftVotes =
var r = RaftVotes(voters: config.currentSet, current: initElectionTracker(config.currentSet))
return r
func registerVote*(rv: var RaftVotes, nodeId: RaftNodeId, granted: bool): bool =
# TODO: Add support for configuration
return rv.current.registerVote(nodeId, granted)
func tallyVote*(rv: var RaftVotes): RaftElectionResult =
# TODO: Add support for configuration
return rv.current.tallyVote()
func initRaftLog*(firstIndex: RaftLogIndex): RaftLog =
var log = RaftLog()
assert firstIndex > 0
log.firstIndex = firstIndex
return log
func lastTerm*(rf: RaftLog): RaftNodeTerm =
# Not sure if it's ok, maybe we should return optional value
let size = rf.logEntries.len
if size == 0:
return 0
return rf.logEntries[size - 1].term
func entriesCount*(rf: RaftLog): int =
return rf.logEntries.len
func lastIndex*(rf: RaftLog): RaftNodeTerm =
return rf.logEntries.len + rf.firstIndex - 1
func nextIndex*(rf: RaftLog): int =
return rf.lastIndex + 1
func truncateUncomitted*(rf: var RaftLog, index: RaftLogIndex) =
# TODO: We should add support for configurations and snapshots
if rf.logEntries.len == 0:
return
rf.logEntries.delete((index - rf.firstIndex)..<len(rf.logEntries))
func isUpToDate(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): bool =
return term > rf.lastTerm or (term == rf.lastTerm and index >= rf.lastIndex)
func getEntryByIndex(rf: RaftLog, index: RaftLogIndex): LogEntry =
return rf.logEntries[index - rf.firstIndex]
func appendAsLeader(rf: var RaftLog, entry: LogEntry) =
rf.logEntries.add(entry)
func appendAsFollower*(rf: var RaftLog, entry: LogEntry) =
assert entry.index > 0
let currentIdx = rf.lastIndex
if entry.index <= currentIdx:
# TODO: The indexing hold only if we keep all entries in memory
# we should change it when we add support for snapshots
if entry.index >= rf.firstIndex or entry.term != rf.getEntryByIndex(entry.index).term:
rf.truncateUncomitted(entry.index)
rf.logEntries.add(entry)
func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletCommand, command: data))
func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, empty: bool) =
rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletEmpty, empty: true))
func appendAsFollower*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
rf.appendAsFollower(LogEntry(term: term, index: index, kind: rletCommand, command: data))
func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): (bool, RaftNodeTerm) =
if len(rf.logEntries) == 0:
return (true, 0)
# TODO: We should add support for snapshots
if index > len(rf.logEntries):
# The follower doesn't have all etries
return (false, 0)
let i = index - rf.firstIndex
if rf.logEntries[i].term == term:
return (true, 0)
else:
return (false, rf.logEntries[i].term)
func termForIndex*(rf: RaftLog, index: RaftLogIndex): Option[RaftNodeTerm] =
# TODO: snapshot support
assert rf.logEntries.len > index
if rf.logEntries.len > 0 and index >= rf.firstIndex:
return some(rf.logEntries[index].term)
return none(RaftNodeTerm)
func debug*(sm: var RaftStateMachine, log: string) =
sm.output.debugLogs.add("[" & $(sm.timeNow - sm.startTime).inMilliseconds & "ms] [" & (($sm.myId)[0..7]) & "...] [" & $sm.state.state & "]: " & log)
@ -377,10 +173,7 @@ proc initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftL
func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] =
for follower in sm.leader.tracker.progress:
if follower.id == id:
return some(follower)
return none(RaftFollowerProgressTracker)
return sm.leader.tracker.find(id)
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.Append, appendRequest: request))
@ -570,15 +363,13 @@ func commit*(sm: var RaftStateMachine) =
sm.commitIndex += next_index;
next_index += 1
else:
break
break
func appendEntryReplay*(sm: var RaftStateMachine, from_id: RaftNodeId, replay: RaftRpcAppendReplay) =
func appendEntryReplay*(sm: var RaftStateMachine, fromId: RaftNodeId, replay: RaftRpcAppendReplay) =
if not sm.state.isLeader:
sm.debug "You can't append append replay to the follower"
return
var follower = sm.findFollowerProggressById(from_id)
var follower = sm.findFollowerProggressById(fromId)
if not follower.isSome:
sm.debug "Can't find the follower"
return
@ -586,7 +377,7 @@ func appendEntryReplay*(sm: var RaftStateMachine, from_id: RaftNodeId, replay: R
case replay.result:
of RaftRpcCode.Accepted:
let lastIndex = replay.accepted.lastNewIndex
sm.debug "Accpeted" & $from_id & " " & $lastIndex
sm.debug "Accpeted" & $fromId & " " & $lastIndex
follower.get().accepted(lastIndex)
# TODO: add leader stepping down logic here
sm.commit()
@ -598,7 +389,7 @@ func appendEntryReplay*(sm: var RaftStateMachine, from_id: RaftNodeId, replay: R
follower.get().next_index = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1)
# if commit apply configuration that removes current follower
# we should take it again
var follower2 = sm.findFollowerProggressById(from_id)
var follower2 = sm.findFollowerProggressById(fromId)
if follower2.isSome:
sm.replicateTo(follower2.get())

104
raft/log.nim Normal file
View File

@ -0,0 +1,104 @@
import types
import std/sequtils
type
RaftLogEntryType* = enum
rletCommand = 0,
rletConfig = 1,
rletEmpty = 2
Command* = object
data: seq[byte]
Config* = object
Empty* = object
LogEntry* = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
index*: RaftLogIndex
# TODO: Add configuration too
case kind*: RaftLogEntryType:
of rletCommand: command*: Command
of rletConfig: config*: Config
of rletEmpty: empty*: bool
RaftLog* = object
logEntries: seq[LogEntry]
firstIndex: RaftLogIndex
func initRaftLog*(firstIndex: RaftLogIndex): RaftLog =
var log = RaftLog()
assert firstIndex > 0
log.firstIndex = firstIndex
return log
func lastTerm*(rf: RaftLog): RaftNodeTerm =
# Not sure if it's ok, maybe we should return optional value
let size = rf.logEntries.len
if size == 0:
return 0
return rf.logEntries[size - 1].term
func entriesCount*(rf: RaftLog): int =
return rf.logEntries.len
func lastIndex*(rf: RaftLog): RaftNodeTerm =
return rf.logEntries.len + rf.firstIndex - 1
func nextIndex*(rf: RaftLog): int =
return rf.lastIndex + 1
func truncateUncomitted*(rf: var RaftLog, index: RaftLogIndex) =
# TODO: We should add support for configurations and snapshots
if rf.logEntries.len == 0:
return
rf.logEntries.delete((index - rf.firstIndex)..<len(rf.logEntries))
func isUpToDate*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): bool =
return term > rf.lastTerm or (term == rf.lastTerm and index >= rf.lastIndex)
func getEntryByIndex*(rf: RaftLog, index: RaftLogIndex): LogEntry =
return rf.logEntries[index - rf.firstIndex]
func appendAsLeader*(rf: var RaftLog, entry: LogEntry) =
rf.logEntries.add(entry)
func appendAsFollower*(rf: var RaftLog, entry: LogEntry) =
assert entry.index > 0
let currentIdx = rf.lastIndex
if entry.index <= currentIdx:
# TODO: The indexing hold only if we keep all entries in memory
# we should change it when we add support for snapshots
if entry.index >= rf.firstIndex or entry.term != rf.getEntryByIndex(entry.index).term:
rf.truncateUncomitted(entry.index)
rf.logEntries.add(entry)
func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletCommand, command: data))
func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, empty: bool) =
rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletEmpty, empty: true))
func appendAsFollower*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
rf.appendAsFollower(LogEntry(term: term, index: index, kind: rletCommand, command: data))
func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): (bool, RaftNodeTerm) =
if len(rf.logEntries) == 0:
return (true, 0)
# TODO: We should add support for snapshots
if index > len(rf.logEntries):
# The follower doesn't have all etries
return (false, 0)
let i = index - rf.firstIndex
if rf.logEntries[i].term == term:
return (true, 0)
else:
return (false, rf.logEntries[i].term)
func termForIndex*(rf: RaftLog, index: RaftLogIndex): Option[RaftNodeTerm] =
# TODO: snapshot support
assert rf.logEntries.len > index
if rf.logEntries.len > 0 and index >= rf.firstIndex:
return some(rf.logEntries[index].term)
return none(RaftNodeTerm)

View File

@ -1,39 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import types
import chronicles
# Private Log Ops
proc raftNodeLogIndexGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftLogIndex =
len(node.log.logData)
proc raftNodeLogEntryGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logIndex: RaftLogIndex): RaftNodeLogEntry[SmCommandType] =
if logIndex > 0:
result = node.log.logData[logIndex - 1]
proc raftNodeLogAppend*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], logEntry: RaftNodeLogEntry[SmCommandType]) =
node.log.logData.add(logEntry)
proc raftNodeLogTruncate*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], truncateIndex: RaftLogIndex) =
debug "Truncating log to index: ", truncateIndex=truncateIndex, ld=repr(node.log.logData)
# node.log.logData = node.log.logData[:truncateIndex]
proc raftNodeApplyLogEntry*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], entryIndex: RaftLogIndex) =
mixin raftNodeSmApply
let logEntry = raftNodeLogEntryGet(node, entryIndex)
if entryIndex > node.lastApplied:
debug "Applying log entry: ", node_id=node.id, entryIndex=entryIndex, entry=repr(logEntry)
raftNodeSmApply(node.stateMachine, raftNodeLogEntryGet(node, entryIndex).data.get)
node.lastApplied = entryIndex
else:
debug "Log entry already applied: ", node_id=node.id, entryIndex=entryIndex, entry=repr(logEntry)

View File

@ -1,61 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
# **************************** #
# Raft Protocol definition #
# **************************** #
import types
type
RaftMessage*[SmCommandType, SmStateType] = ref object of RaftMessageBase[SmCommandType, SmStateType]
senderTerm*: RaftNodeTerm # Sender Raft Node Term
case op*: RaftMessageOps
of rmoRequestVote:
lastLogTerm*: RaftNodeTerm
lastLogIndex*: RaftLogIndex
of rmoAppendLogEntry:
prevLogIndex*: RaftLogIndex
prevLogTerm*: RaftNodeTerm
commitIndex*: RaftLogIndex
logEntries*: Option[seq[RaftNodeLogEntry[SmCommandType]]] # Optional log entry(ies). Will be empty for a Heart-Beat
of rmoInstallSnapshot:
discard
RaftMessageResponse*[SmCommandType, SmStateType] = ref object of RaftMessageResponseBase[SmCommandType, SmStateType]
case op*: RaftMessageOps
of rmoRequestVote:
granted*: bool # Is vote granted by the Raft node, from we requested vote?
of rmoAppendLogEntry:
success*: bool
lastLogIndex*: RaftLogIndex
state*: Option[SmStateType] # Optional Raft Abstract State Machine State
of rmoInstallSnapshot:
discard
# Raft Node Client Request/Response definitions
RaftNodeClientRequestOps* = enum
rncroRequestSmState = 0,
rncroExecSmCommand = 1
RaftNodeClientResponseError* = enum
rncreSuccess = 0,
rncreFail = 1,
rncreNotLeader = 2,
rncreStopped = 3
RaftNodeClientRequest*[SmCommandType] = ref object
op*: RaftNodeClientRequestOps
nodeId*: RaftNodeId
payload*: Option[SmCommandType] # Optional RaftMessagePayload carrying a Log Entry
RaftNodeClientResponse*[SmStateType] = ref object
nodeId*: RaftNodeId
error*: RaftNodeClientResponseError
state*: Option[SmStateType] # Optional Raft Abstract State Machine State
raftNodeRedirectId*: Option[RaftNodeId] # Optional Raft Node ID to redirect the request to in case of failure

View File

@ -1,208 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import types
import protocol
import consensus_module
import log_ops
import chronicles
import std/random
export
types,
protocol,
consensus_module,
log_ops,
chronicles,
random
# Forward declarations
proc raftNodeSmInit*[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType])
# Raft Node Public API
proc new*[SmCommandType, SmStateType](T: type RaftNode[SmCommandType, SmStateType];
id: RaftNodeId; peersIds: seq[RaftNodeId];
# persistentStorage: RaftNodePersistentStorage,
msgSendCallback: RaftMessageSendCallback;
electionTimeout: int=150;
heartBeatTimeout: int=150;
appendEntriesRespTimeout: int=20;
votingRespTimeout: int=20;
heartBeatRespTimeout: int=10
): T =
var
peers: RaftNodePeers
for peerId in peersIds:
peers.add(RaftNodePeer(id: peerId, nextIndex: 1, matchIndex: 0, hasVoted: false, canVote: true))
result = T(
id: id, state: rnsFollower, currentTerm: 0, peers: peers, commitIndex: 0, lastApplied: 0,
msgSendCallback: msgSendCallback, votedFor: DefaultUUID, currentLeaderId: DefaultUUID,
electionTimeout: electionTimeout, heartBeatTimeout: heartBeatTimeout, appendEntriesRespTimeout: appendEntriesRespTimeout,
heartBeatRespTimeout: heartBeatRespTimeout, votingRespTimeout: votingRespTimeout, hrtBtSuccess: false
)
raftNodeSmInit(result.stateMachine)
initRLock(result.raftStateMutex)
proc raftNodeLoad*[SmCommandType, SmStateType](
persistentStorage: RaftNodePersistentStorage, # Load Raft Node From Storage
msgSendCallback: RaftMessageSendCallback): Result[RaftNode[SmCommandType, SmStateType], string] =
discard
proc raftNodeIdGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeId {.gcsafe.} = # Get Raft Node ID
withRLock(node.raftStateMutex):
result = node.id
proc raftNodeStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeState = # Get Raft Node State
withRLock(node.raftStateMutex):
result = node.state
proc raftNodeTermGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodeTerm = # Get Raft Node Term
withRLock(node.raftStateMutex):
result = node.currentTerm
func raftNodePeersGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): RaftNodePeers = # Get Raft Node Peers
withRLock(node.raftStateMutex):
result = node.peers
func raftNodeIsLeader*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): bool = # Check if Raft Node is Leader
withRLock(node.raftStateMutex):
result = node.state == rnsLeader
# Deliver Raft Message to the Raft Node and dispatch it
proc raftNodeMessageDeliver*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], raftMessage: RaftMessageBase[SmCommandType, SmStateType]):
Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
var
rm = RaftMessage[SmCommandType, SmStateType](raftMessage)
case rm.op # Dispatch different Raft Message types based on the operation code
of rmoRequestVote:
result = raftNodeHandleRequestVote(node, rm)
of rmoAppendLogEntry:
if rm.logEntries.isSome:
result = raftNodeHandleAppendEntries(node, rm)
else:
result = raftNodeHandleHeartBeat(node, rm)
else: discard
# Process Raft Node Client Requests
proc raftNodeServeClientRequest*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType], req: RaftNodeClientRequest[SmCommandType]):
Future[RaftNodeClientResponse[SmStateType]] {.async, gcsafe.} =
withRLock(node.raftStateMutex):
if not raftNodeIsLeader(node):
return RaftNodeClientResponse(nodeId: node.id, error: rncreNotLeader, currentLeaderId: node.currentLeaderId)
case req.op
of rncroExecSmCommand:
let resFut = await raftNodeReplicateSmCommand(node, req.smCommand)
if resFut.read:
return RaftNodeClientResponse(nodeId: node.id, error: rncreSuccess, state: raftNodeStateGet(node))
else:
return RaftNodeClientResponse(nodeId: node.id, error: rncreFail, state: raftNodeStateGet(node))
of rncroRequestSmState:
return RaftNodeClientResponse(nodeId: node.id, error: rncreSuccess, state: raftNodeStateGet(node))
else:
raiseAssert "Unknown client request operation."
# Abstract State Machine Ops
func raftNodeSmStateGet*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]): SmStateType =
withRLock(node.raftStateMutex):
node.stateMachine.state
proc raftNodeSmInit*[SmCommandType, SmStateType](stateMachine: var RaftNodeStateMachine[SmCommandType, SmStateType]) =
mixin raftSmInit
raftSmInit(stateMachine)
proc raftNodeSmApply*[SmCommandType, SmStateType](stateMachine: RaftNodeStateMachine[SmCommandType, SmStateType], command: SmCommandType) =
mixin raftSmApply
raftSmApply(stateMachine, command)
# Private Abstract Timer creation
template raftTimerCreate*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] =
mixin raftTimerCreateCustomImpl
raftTimerCreateCustomImpl(timerInterval, timerCallback)
# Timers scheduling stuff etc.
proc raftNodeScheduleHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
withRLock(node.raftStateMutex):
node.heartBeatTimer = raftTimerCreate(node.heartBeatTimeout, proc() = asyncSpawn raftNodeSendHeartBeat(node))
proc raftNodeSendHeartBeat*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) {.async.} =
debug "Raft Node sending Heart-Beat to peers", node_id=node.id
withRLock(node.raftStateMutex):
var hrtBtFuts: seq[Future[RaftMessageResponseBase[SmCommandType, SmStateType]]]
for raftPeer in node.peers:
let msgHrtBt = RaftMessage[SmCommandType, SmStateType](
op: rmoAppendLogEntry, senderId: node.id, receiverId: raftPeer.id,
senderTerm: raftNodeTermGet(node), commitIndex: node.commitIndex,
prevLogIndex: raftNodeLogIndexGet(node) - 1, prevLogTerm: if raftNodeLogIndexGet(node) > 0: raftNodeLogEntryGet(node, raftNodeLogIndexGet(node) - 1).term else: 0
)
hrtBtFuts.add(node.msgSendCallback(msgHrtBt))
let allHrtBtFuts = allFutures(hrtBtFuts)
await allHrtBtFuts or raftTimerCreate(node.heartBeatRespTimeout, proc()=discard)
var successCnt = 0
for fut in hrtBtFuts:
if fut.finished:
let resp = RaftMessageResponse[SmCommandType, SmStateType](fut.read)
if resp.success:
successCnt.inc
else:
await cancelAndWait(fut)
if successCnt >= (node.peers.len div 2 + node.peers.len mod 2):
node.hrtBtSuccess = true
else:
node.hrtBtSuccess = false
raftNodeScheduleHeartBeat(node)
proc raftNodeScheduleElectionTimeout*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
withRLock(node.raftStateMutex):
node.electionTimeoutTimer = raftTimerCreate(node.electionTimeout + rand(node.electionTimeout), proc =
asyncSpawn raftNodeStartElection(node)
)
node.hrtBtSuccess = false
# Raft Node Control
proc raftNodeCancelTimers*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
withRLock(node.raftStateMutex):
if node.heartBeatTimer != nil:
asyncSpawn cancelAndWait(node.heartBeatTimer)
if node.electionTimeoutTimer != nil:
asyncSpawn cancelAndWait(node.electionTimeoutTimer )
proc raftNodeStop*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
# Try to stop gracefully
withRLock(node.raftStateMutex):
# Abort election if in election
if node.state == rnsCandidate:
raftNodeAbortElection(node)s
# Cancel pending timers (if any)
raftNodeCancelTimers(node)
proc raftNodeStart*[SmCommandType, SmStateType](node: RaftNode[SmCommandType, SmStateType]) =
randomize()
withRLock(node.raftStateMutex):
node.state = rnsFollower
debug "Start Raft Node", node_id=node.id, state=node.state
raftNodeScheduleElectionTimeout(node)

99
raft/tracker.nim Normal file
View File

@ -0,0 +1,99 @@
import types
import std/sequtils
import std/[times]
type
RaftElectionResult* = enum
Unknown = 0,
Won = 1,
Lost = 2
RaftElectionTracker* = object
all: seq[RaftNodeId]
responded: seq[RaftNodeId]
granted: int
RaftVotes* = object
voters*: seq[RaftNodeId]
current: RaftElectionTracker
RaftFollowerProgress = seq[RaftFollowerProgressTracker]
RaftTracker* = object
progress*: RaftFollowerProgress
current: seq[RaftNodeId]
RaftFollowerProgressTracker* = ref object
id*: RaftNodeId
nextIndex*: RaftLogIndex
# Index of the highest log entry known to be replicated to this server.
matchIndex*: RaftLogIndex
commitIndex*: RaftLogIndex
replayedIndex: RaftLogIndex
lastMessageAt*: times.DateTime
func initElectionTracker*(nodes: seq[RaftNodeId]): RaftElectionTracker =
var r = RaftElectionTracker()
r.all = nodes
r.granted = 0
return r
func registerVote*(ret: var RaftElectionTracker, nodeId: RaftNodeId, granted: bool): bool =
if not ret.all.contains nodeId:
return false
if not ret.responded.contains nodeId:
ret.responded.add(nodeId)
if granted:
ret.granted += 1
return true
func tallyVote*(ret: var RaftElectionTracker): RaftElectionResult =
let quorym = int(len(ret.all) / 2) + 1
if ret.granted >= quorym:
return RaftElectionResult.Won
let unkown = len(ret.all) - len(ret.responded)
if ret.granted + unkown >= quorym:
return RaftElectionResult.Unknown
else:
return RaftElectionResult.Lost
func initVotes*(nodes: seq[RaftNodeId]): RaftVotes =
var r = RaftVotes(voters: nodes, current: initElectionTracker(nodes))
return r
func initVotes*(config: RaftConfig): RaftVotes =
var r = RaftVotes(voters: config.currentSet, current: initElectionTracker(config.currentSet))
return r
func registerVote*(rv: var RaftVotes, nodeId: RaftNodeId, granted: bool): bool =
# TODO: Add support for configuration
return rv.current.registerVote(nodeId, granted)
func tallyVote*(rv: var RaftVotes): RaftElectionResult =
# TODO: Add support for configuration
return rv.current.tallyVote()
func find*(ls: RaftTracker, id: RaftnodeId): Option[RaftFollowerProgressTracker] =
for follower in ls.progress:
if follower.id == id:
return some(follower)
return none(RaftFollowerProgressTracker)
func initFollowerProgressTracker*(follower: RaftNodeId, nextIndex: RaftLogIndex, now: times.DateTime): RaftFollowerProgressTracker =
return RaftFollowerProgressTracker(id: follower, nextIndex: nextIndex, matchIndex: 0, commitIndex: 0, replayedIndex: 0, lastMessageAt: now)
func initTracker*(config: RaftConfig, nextIndex: RaftLogIndex, now: times.DateTime): RaftTracker =
var tracker = RaftTracker()
for node in config.currentSet:
tracker.progress.add(initFollowerProgressTracker(node, nextIndex, now))
tracker.current.add(node)
return tracker
func accepted*(fpt: var RaftFollowerProgressTracker, index: RaftLogIndex)=
fpt.matchIndex = max(fpt.matchIndex, index)
fpt.nextIndex = max(fpt.nextIndex, index)

View File

@ -34,126 +34,5 @@ type
RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node
RaftNodeTerm* = int # Raft Node Term Type
RaftLogIndex* = int # Raft Node Log Index Type
RaftNodePeer* = ref object # Raft Node Peer object
id*: RaftNodeId
nextIndex*: RaftLogIndex # For each peer Raft Node, index of the next log entry to send to that Node
# (initialized to leader last log index + 1)
matchIndex*: RaftLogIndex # For each peer Raft Node, index of highest log entry known to be replicated on Node
# (initialized to 0, increases monotonically)
hasVoted*: bool # Indicates if this peer have voted for this Raft Node During Election
canVote*: bool # Indicates if this peer can vote
appendEntriesTimer*: Future[void]
RaftNodePeers* = seq[RaftNodePeer] # List of Raft Node Peers
# Raft Node Abstract State Machine type
RaftNodeStateMachine*[SmCommandType, SmStateType] = ref object # Some opaque State Machine Impelementation to be used by the Raft Node
# providing at minimum operations for initialization, querying the current state
# and RaftNodeLogEntry (SmCommandType) application
state*: ref SmStateType
# Raft Node Persistent Storage basic definition
RaftNodePersistentStorage*[SmCommandType, SmStateType] = object # Should be some kind of Persistent Transactional Store Wrapper
# Basic modules (algos) definitions
RaftNodeAccessCallback[SmCommandType, SmStateType] = proc: RaftNode[SmCommandType, SmStateType] {.nimcall, gcsafe.} # This should be implementes as a closure holding the RaftNode
RaftConsensusModule*[SmCommandType, SmStateType] = object of RootObj
stateTransitionsFsm: seq[byte] # I plan to use nim.fsm https://github.com/ba0f3/fsm.nim
gatheredVotesCount: int
raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType]
RaftLogCompactionModule*[SmCommandType, SmStateType] = object of RootObj
raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType]
RaftMembershipChangeModule*[SmCommandType, SmStateType] = object of RootObj
raftNodeAccessCallback: RaftNodeAccessCallback[SmCommandType, SmStateType]
# Callback for sending messages out of this Raft Node
RaftMessageId* = UUID # UUID assigned to every Raft Node Message,
# so it can be matched with it's corresponding response etc.
# Raft Node Messages OPs
RaftMessageOps* = enum
rmoRequestVote = 0, # Request Raft Node vote during election.
rmoAppendLogEntry = 1, # Append log entry (when replicating) or represent a Heart-Beat
# if log entries are missing.
rmoInstallSnapshot = 2 # For dynamic adding of new Raft Nodes to speed up the new nodes
# when they have to catch-up to the currently replicated log.
RaftMessageBase*[SmCommandType, SmStateType] = ref object of RootObj # Base Type for Raft Protocol Messages.
msgId*: RaftMessageId # Message UUID.
senderId*: RaftNodeId # Sender Raft Node ID.
receiverId*: RaftNodeId # Receiver Raft Node ID.
RaftMessageResponseBase*[SmCommandType, SmStateType] = ref object of RaftMessageBase[SmCommandType, SmStateType]
# Callback for Sending Raft Node Messages out of this Raft Node.
RaftMessageSendCallback*[SmCommandType, SmStateType] = proc (raftMessage: RaftMessageBase[SmCommandType, SmStateType]):
Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.}
# For later use when adding/removing new nodes (dynamic configuration chganges)
RaftNodeConfiguration* = ref object
peers*: RaftNodePeers
# Raft Node Log definition
LogEntryType* = enum
etUnknown = 0,
etConfiguration = 1,
etData = 2,
etNoOp = 3
RaftNodeLogEntry*[SmCommandType] = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
entryType*: LogEntryType # Type of entry - data to append, configuration or no op etc.
data*: Option[SmCommandType] # Entry data (State Machine Command) - this is mutually exclusive with configuration
# depending on entryType field
configuration*: Option[RaftNodeConfiguration] # Node configuration
RaftNodeLog*[SmCommandType] = object # Needs more elaborate definition.
# Probably this will be a RocksDB/MDBX/SQLite Store Wrapper etc.
logData*: seq[RaftNodeLogEntry[SmCommandType]] # Raft Node Log Data
RaftTimerCallback* = proc () {.gcsafe.} # Pass any function wrapped in a closure
# Raft Node Object type
RaftNode*[SmCommandType, SmStateType] = ref object
# Timers
votesFuts*: seq[Future[RaftMessageResponseBase[SmCommandType, SmStateType]]]
replicateFuts*: seq[Future[RaftMessageResponseBase[SmCommandType, SmStateType]]]
electionTimeout*: int
heartBeatTimeout*: int
appendEntriesRespTimeout*: int
votingRespTimeout*: int
heartBeatRespTimeout*: int
heartBeatTimer*: Future[void]
electionTimeoutTimer*: Future[void]
# Mtx definition(s) go here
raftStateMutex*: RLock
# Misc
msgSendCallback*: RaftMessageSendCallback[SmCommandType, SmStateType]
persistentStorage: RaftNodePersistentStorage[SmCommandType, SmStateType]
hrtBtSuccess*: bool
# Persistent state
id*: RaftNodeId # This Raft Node ID
state*: RaftNodeState # This Raft Node State
currentTerm*: RaftNodeTerm # Latest term this Raft Node has seen (initialized to 0 on first boot, increases monotonically)
votedFor*: RaftNodeId # Candidate RaftNodeId that received vote in current term (or DefaultUUID if none),
# also used to redirect Client Requests in case this Raft Node is not the leader
log*: RaftNodeLog[SmCommandType] # This Raft Node Log
stateMachine*: RaftNodeStateMachine[SmCommandType, SmStateType] # Not sure for now putting it here. I assume that persisting the State Machine's
# state is enough to consider it 'persisted'
peers*: RaftNodePeers # This Raft Node Peers IDs. I am not sure if this must be persistent or volatile but making it persistent
# makes sense for the moment
# Volatile state
commitIndex*: RaftLogIndex # Index of highest log entry known to be committed (initialized to 0, increases monotonically)
lastApplied*: RaftLogIndex # Index of highest log entry applied to state machine (initialized to 0, increases monotonically)
currentLeaderId*: RaftNodeId # The ID of the current leader Raft Node or DefaultUUID if None is leader (election is in progress etc.)
RaftConfig* = object
currentSet*: seq[RaftNodeId]

View File

@ -12,7 +12,4 @@ import ../misc/test_macro
{. warning[UnusedImport]:off .}
cliBuilder:
import ./test_basic_timers,
./test_basic_state_machine,
#./test_basic_cluster_election,
./test_consensus_state_machine
import ./test_consensus_state_machine

View File

@ -1,67 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import basic_timers
import basic_state_machine
import std/tables
export raft_api
type
BasicRaftNode* = RaftNode[SmCommand, SmState]
BasicRaftCluster* = ref object
nodes*: Table[RaftNodeId, BasicRaftNode]
nodesLock*: RLock
networkDelayJitter*: int
proc basicRaftClusterRaftMessageSendCallbackCreateWithNetDelay[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
await raftTimerCreate(rand(cluster.networkDelayJitter), proc()=discard) # Simulate network delay
result = await cluster.nodes[msg.receiverId].raftNodeMessageDeliver(msg)
proc basicRaftClusterRaftMessageSendCallbackCreate[SmCommandType, SmStateType](cluster: BasicRaftCluster): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
result = await cluster.nodes[msg.receiverId].raftNodeMessageDeliver(msg)
proc basicRaftClusterStart*(cluster: BasicRaftCluster) =
for id, node in cluster.nodes:
raftNodeStart(node)
proc basicRaftClusterGetLeaderId*(cluster: BasicRaftCluster): UUID =
result = DefaultUUID
withRLock(cluster.nodesLock):
for id, node in cluster.nodes:
if raftNodeIsLeader(node):
return raftNodeIdGet(node)
proc basicRaftClusterClientRequest*(cluster: BasicRaftCluster, req: RaftNodeClientRequest): Future[RaftNodeClientResponse] {.async.} =
case req.op:
of rncroRequestSmState:
var
nodeId = cluster.nodesIds[basicRaftClusterGetLeaderId(cluster)]
result = await cluster.nodes[nodeId].raftNodeServeClientRequest(req)
of rncroExecSmCommand:
discard
proc basicRaftClusterInit*(nodesIds: seq[RaftNodeId], networkDelayJitter: int=10, electionTimeout: int=150, heartBeatTimeout: int=75, appendEntriesRespTimeout: int=20, votingRespTimeout: int=10,
heartBeatRespTimeout: int=10): BasicRaftCluster =
new(result)
for nodeId in nodesIds:
var
peersIds = nodesIds
peersIds.del(peersIds.find(nodeId))
result.networkDelayJitter = networkDelayJitter
result.nodes[nodeId] = BasicRaftNode.new(nodeId, peersIds,
basicRaftClusterRaftMessageSendCallbackCreate[SmCommand, SmState](result),
electionTimeout, heartBeatTimeout, appendEntriesRespTimeout, votingRespTimeout, heartBeatRespTimeout)

View File

@ -1,38 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import std/tables
import ../raft/raft_api
export tables, raft_api
type
SmState* = Table[string, string]
SmCommands* = enum
scSet = 0,
scDel = 1
SmCommand* = object
cmd*: SmCommands
key*: string
val*: string
RaftBasicSm* = RaftNodeStateMachine[SmCommand, SmState]
proc raftSmInit*(stateMachine: var RaftBasicSm) =
new(stateMachine)
new(stateMachine.state)
proc raftSmApply*(stateMachine: RaftBasicSm, command: SmCommand) =
case command.cmd:
of scSet:
stateMachine.state[command.key] = command.val
of scDel:
stateMachine.state.del(command.key)

View File

@ -1,18 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import ../raft/raft_api
export raft_api
proc raftTimerCreateCustomImpl*(timerInterval: int, timerCallback: RaftTimerCallback): Future[void] {.async.} =
var f = sleepAsync(milliseconds(timerInterval))
await f
if f.finished and not f.cancelled:
timerCallback()

View File

@ -1,104 +0,0 @@
import basic_state_machine
import basic_cluster
import std/json
import msgpack4nim
import strutils
import std/strformat
import httpclient
import os
import std/threadpool
type
RaftPeerConf* = object
id*: UUID
host*: string
port*: int
RaftPeersConfContainer* = seq[RaftPeerConf]
proc loadConfig(): RaftPeersConfContainer =
var
conf: RaftPeersConfContainer
let jsonFile = "raft_node_config.json"
# read and parse file
let jsConf = parseFile(jsonFile)
for n in jsConf["raftPeers"]:
conf.add(RaftPeerConf(id: parseUUID(n["id"].getStr), host: n["host"].getStr, port: n["port"].getInt))
result = conf
proc raftPipesRead[SmCommandType, SmStateType](node: BasicRaftNode, port: int) =
var
fifoRead = fmt"RAFTNODERECEIVEMSGPIPE{port}"
fifoWrite = fmt"RAFTNODESENDMSGRESPPIPE{port}"
frFD = open(fifoRead, fmRead)
fwFD = open(fifoWrite, fmAppend)
var
ss = MsgStream.init(frFD.readAll)
xx: RaftMessage[SmCommandType, SmStateType]
ss.unpack(xx) #and here too
debug "Received Req: ", req=repr(xx)
var
r = waitFor raftNodeMessageDeliver(node, xx)
resp = RaftMessageResponse[SmCommandType, SmStateType](r)
rs = MsgStream.init()
rs.pack(resp)
fwFD.write(rs.data)
proc testRaftMessageSendCallbackCreate[SmCommandType, SmStateType](conf: RaftPeersConfContainer): RaftMessageSendCallback[SmCommandType, SmStateType] =
proc (msg: RaftMessageBase[SmCommandType, SmStateType]): Future[RaftMessageResponseBase[SmCommandType, SmStateType]] {.async, gcsafe.} =
var
host: string
port: int
resp: Response
xx: RaftMessageResponse[SmCommandType, SmStateType]
client = newHttpClient(timeout=50)
m = RaftMessage[SmCommandType, SmStateType](msg)
s = MsgStream.init() # besides MsgStream, you can also use Nim StringStream or FileStream
for c in conf:
if c.id == msg.receiverId:
host = c.host
port = c.port
s.pack(m) #here the magic happened
debug "Sending Req: ", req=fmt"http://{host}:{port}", data=s.data
resp = client.post(fmt"http://{host}:{port}", s.data)
s = MsgStream.init(resp.body)
s.unpack(xx) #and here too
result = xx
proc main() =
var conf = loadConfig()
var
nodesIds: seq[UUID]
node: BasicRaftNode
for c in conf:
debug "single conf", single_conf=c
nodesIds.add(c.id)
var
nodeId = parseUUID(paramStr(1))
peersIds = nodesIds
port: int
idx = peersIds.find(nodeId)
port = conf[idx].port
peersIds.del(idx)
node = BasicRaftNode.new(nodeId, peersIds, testRaftMessageSendCallbackCreate[SmCommand, SmState](conf))
raftNodeStart(node)
spawn raftPipesRead[SmCommand, SmState](node, port)
runForever()
if isMainModule:
main()

View File

@ -1,43 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import unittest2
import basic_cluster
proc basicClusterElectionMain*() =
var
cluster: BasicRaftCluster
nodesIds = newSeq[RaftNodeId](5)
suite "Basic Raft Cluster Election Tests":
test "Basic Raft Cluster Init (5 nodes)":
for i in 0..4:
nodesIds[i] = genUUID()
cluster = basicRaftClusterInit(nodesIds, 15, 150, 75, 10, 10, 10)
check cluster != nil
test "Start Basic Raft Cluster and wait it to converge for a 2 seconds interval (Elect a Leader)":
basicRaftClusterStart(cluster)
let dur = seconds(2)
waitFor sleepAsync(dur)
let
leaderId = basicRaftClusterGetLeaderId(cluster)
check leaderId != DefaultUUID
test "Check for leader every second for a 10 second interval":
let dur = seconds(1)
for i in 0..117:
waitFor sleepAsync(dur)
let
leaderId = basicRaftClusterGetLeaderId(cluster)
check leaderId != DefaultUUID
if isMainModule:
basicClusterElectionMain()

View File

@ -1,49 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import unittest2
import basic_state_machine
proc basicStateMachineMain*() =
var
sm: RaftBasicSm
smCommandsLog: seq[SmCommand]
suite "Test Basic State Machine Implementation ":
test "Test Init":
raftSmInit(sm)
check sm != nil and sm.state != nil and sm.state.len == 0
test "Init commands Log":
smCommandsLog.add(SmCommand(cmd: scSet, key: "a", val: "a"))
smCommandsLog.add(SmCommand(cmd: scSet, key: "b", val: "b"))
smCommandsLog.add(SmCommand(cmd: scSet, key: "c", val: "c"))
smCommandsLog.add(SmCommand(cmd: scSet, key: "d", val: "d"))
smCommandsLog.add(SmCommand(cmd: scSet, key: "e", val: "e"))
smCommandsLog.add(SmCommand(cmd: scSet, key: "f", val: "f"))
smCommandsLog.add(SmCommand(cmd: scDel, key: "a"))
smCommandsLog.add(SmCommand(cmd: scDel, key: "a"))
smCommandsLog.add(SmCommand(cmd: scSet, key: "a", val: "a"))
smCommandsLog.add(SmCommand(cmd: scDel, key: "a"))
smCommandsLog.add(SmCommand(cmd: scSet, key: "g", val: "g"))
smCommandsLog.add(SmCommand(cmd: scDel, key: "d"))
smCommandsLog.add(SmCommand(cmd: scSet, key: "h", val: "h"))
check smCommandsLog.len == 13
test "Apply commands from the Log and check result":
for c in smCommandsLog:
raftSmApply(sm, c)
check sm.state[] == {"b": "b", "c": "c", "e": "e", "f": "f", "g": "g", "h": "h"}.toTable
if isMainModule:
basicStateMachineMain()

View File

@ -1,68 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import unittest2
import basic_timers
import random
const
MAX_TIMERS = 50
SLOW_TIMERS_MIN = 300
SLOW_TIMERS_MAX = 400
FAST_TIMERS_MIN = 10
FAST_TIMERS_MAX = 100
WAIT_FOR_SLOW_TIMERS = 200
FINAL_WAIT = 300
proc basicTimersMain*() =
var
slowTimers: array[0..MAX_TIMERS, Future[void]]
fastTimers: array[0..MAX_TIMERS, Future[void]]
var
slowCnt: ref int
RaftDummyTimerCallback = proc () {.nimcall, gcsafe.} = discard
RaftTimerCallbackCnt = proc (cnt: ref int): RaftTimerCallback =
proc () {.gcsafe.} = cnt[].inc
slowCnt = new(int)
slowCnt[] = 0
suite "Create and test basic timers":
test "Create 'slow' and 'fast' timers":
for i in 0..MAX_TIMERS:
slowTimers[i] = raftTimerCreateCustomImpl(max(SLOW_TIMERS_MIN, rand(SLOW_TIMERS_MAX)), RaftTimerCallbackCnt(slowCnt))
for i in 0..MAX_TIMERS:
fastTimers[i] = raftTimerCreateCustomImpl(max(FAST_TIMERS_MIN, rand(FAST_TIMERS_MAX)), RaftDummyTimerCallback)
test "Wait for and cancel 'slow' timers":
waitFor sleepAsync(milliseconds(WAIT_FOR_SLOW_TIMERS))
for i in 0..MAX_TIMERS:
if not slowTimers[i].finished:
asyncSpawn cancelAndWait(slowTimers[i])
test "Final wait timers and check consistency":
var
pass = true
waitFor sleepAsync(milliseconds(FINAL_WAIT))
for i in 0..MAX_TIMERS:
if not fastTimers[i].finished:
debugEcho repr(fastTimers[i])
pass = false
break
check pass
check slowCnt[] == 0
if isMainModule:
basicTimersMain()

View File

@ -10,6 +10,8 @@
import unittest2
import ../raft/types
import ../raft/consensus_state_machine
import ../raft/log
import ../raft/tracker
import std/[times, sequtils]
import uuids
import tables