Merge pull request #1 from NDobrev/new_impl

New impl
This commit is contained in:
Martin Dobrev 2024-02-09 14:34:13 +02:00 committed by GitHub
commit 4186bdae2d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
30 changed files with 1875 additions and 267 deletions

171
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,171 @@
name: CI
on:
push:
branches:
- master
pull_request:
workflow_dispatch:
concurrency: # Cancel stale PR builds (but not push builds)
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.sha }}
cancel-in-progress: true
jobs:
build:
strategy:
fail-fast: false
matrix:
target:
- os: linux
cpu: amd64
- os: linux
cpu: i386
- os: macos
cpu: amd64
- os: windows
cpu: amd64
#- os: windows
#cpu: i386
branch: [version-1-6, version-2-0, devel]
include:
- target:
os: linux
builder: ubuntu-20.04
shell: bash
- target:
os: macos
builder: macos-12
shell: bash
- target:
os: windows
builder: windows-2019
shell: msys2 {0}
defaults:
run:
shell: ${{ matrix.shell }}
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})'
runs-on: ${{ matrix.builder }}
continue-on-error: ${{ matrix.branch == 'devel' }}
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Install build dependencies (Linux i386)
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
run: |
sudo dpkg --add-architecture i386
sudo apt-fast update -qq
sudo DEBIAN_FRONTEND='noninteractive' apt-fast install \
--no-install-recommends -yq gcc-multilib g++-multilib \
libssl-dev:i386
mkdir -p external/bin
cat << EOF > external/bin/gcc
#!/bin/bash
exec $(which gcc) -m32 "\$@"
EOF
cat << EOF > external/bin/g++
#!/bin/bash
exec $(which g++) -m32 "\$@"
EOF
chmod 755 external/bin/gcc external/bin/g++
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
- name: 'Install dependencies (macOS)'
if: runner.os == 'macOS' && matrix.branch == 'devel'
run: |
brew install openssl@1.1
ln -s $(brew --prefix)/opt/openssl/lib/libcrypto.1.1.dylib /usr/local/lib
ln -s $(brew --prefix)/opt/openssl/lib/libssl.1.1.dylib /usr/local/lib/
- name: MSYS2 (Windows i386)
if: runner.os == 'Windows' && matrix.target.cpu == 'i386'
uses: msys2/setup-msys2@v2
with:
path-type: inherit
msystem: MINGW32
install: >-
base-devel
git
mingw-w64-i686-toolchain
- name: MSYS2 (Windows amd64)
if: runner.os == 'Windows' && matrix.target.cpu == 'amd64'
uses: msys2/setup-msys2@v2
with:
path-type: inherit
install: >-
base-devel
git
mingw-w64-x86_64-toolchain
- name: Restore Nim DLLs dependencies (Windows) from cache
if: runner.os == 'Windows'
id: windows-dlls-cache
uses: actions/cache@v3
with:
path: external/dlls-${{ matrix.target.cpu }}
key: 'dlls-${{ matrix.target.cpu }}'
- name: Install DLLs dependencies (Windows)
if: >
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
runner.os == 'Windows'
run: |
mkdir -p external
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }}
- name: Path to cached dependencies (Windows)
if: >
runner.os == 'Windows'
run: |
echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH
- name: Derive environment variables
run: |
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
PLATFORM=x64
else
PLATFORM=x86
fi
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
ncpu=
MAKE_CMD="make"
case '${{ runner.os }}' in
'Linux')
ncpu=$(nproc)
;;
'macOS')
ncpu=$(sysctl -n hw.ncpu)
;;
'Windows')
ncpu=$NUMBER_OF_PROCESSORS
MAKE_CMD="mingw32-make"
;;
esac
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
echo "ncpu=$ncpu" >> $GITHUB_ENV
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
- name: Build Nim and Nimble
run: |
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} \
NIM_COMMIT=${{ matrix.branch }} \
NIMBLE_COMMIT=a4fc798838ee753f5485dd19afab22e9367eb0e7 \
QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \
scripts/ci/build_nim.sh nim csources dist/nimble-latest NimBinaries
echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
- name: Run tests
run: |
if [[ "${{ matrix.target.os }}" == "windows" ]]; then
# https://github.com/status-im/nimbus-eth2/issues/3121
export NIMFLAGS="-d:nimRawSetjmp"
fi
nim --version
nimble --version
nimble build
nimble test

4
.gitignore vendored
View File

@ -5,4 +5,6 @@
*.exe *.exe
*.out *.out
nimcache/ nimcache/
build/ build/
nimbledeps/
.VSCodeCounter

7
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,7 @@
{
"nim.projectMapping": [{
"projectFile": "raft.nim",
"fileRegex": ".*\\.nim"
}
]
}

View File

@ -5,3 +5,19 @@ This project aims to develop an implementation of the Raft consensus protocol th
We plan to leverage the implementation to create a highly-efficient setup for operating a redundant set of Nimbus beacon nodes and/or validator clients that rely on BLS threshold signatures to achieve improved resilience and security. Further details can be found in our roadmap here: We plan to leverage the implementation to create a highly-efficient setup for operating a redundant set of Nimbus beacon nodes and/or validator clients that rely on BLS threshold signatures to achieve improved resilience and security. Further details can be found in our roadmap here:
https://github.com/status-im/nimbus-eth2/issues/3416 https://github.com/status-im/nimbus-eth2/issues/3416
This project is heavily inspired by Raft implementation in ScyllaDB
https://github.com/scylladb/scylladb/tree/master/raft
# Design goals
The main goal is to separate implementation of the raft state machin from the other implementation details like (storage, rpc etc)
In order to achive this we want to keep the State machine absolutly deterministic every interaction the the world like
networking, logging, acquiring current time, random number generation, disc operation etc must happened trough the state machine interface.
It will ensure better testability and integrability.
# Run test
`./run_all_tests.sh`

3
all_test.md Normal file
View File

@ -0,0 +1,3 @@
* testbasictimers - 504 milliseconds, 149 microseconds, and 657 nanoseconds
* testbasicstatemachine - 1 millisecond, 831 microseconds, and 189 nanoseconds
* testbasicclusterelection - 2 minutes, 80 milliseconds, 340 microseconds, and 536 nanoseconds

Binary file not shown.

Binary file not shown.

103
misc/test_macro.nim Normal file
View File

@ -0,0 +1,103 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import std/times
import vm_compile_info
import macros, strutils, os, unittest2, osproc
import threadpool
export strutils, os, unittest2, osproc, threadpool
# AppVeyor may go out of memory with the default of 4
setMinPoolSize(2)
proc executeMyself(numModules: int, names: openArray[string]): int =
let appName = getAppFilename()
var elpdList = newSeq[Duration](numModules)
for i in 0..<numModules:
let start = getTime()
let execResult = execCmd(appName & " " & $i)
let elpd = getTime() - start
elpdList[i] = elpd
if execResult != 0:
stderr.writeLine("subtest no: " & $i & " failed: " & names[i])
result = result or execResult
var f = open("all_test.md", fmWrite)
for i in 0..<numModules:
f.write("* " & names[i])
f.write(" - " & $elpdList[i])
f.write("\n")
f.close()
proc getImportStmt(stmtList: NimNode): NimNode =
result = stmtList[0]
result.expectKind nnkImportStmt
proc ofStmt(idx: int, singleModule: NimNode): NimNode =
# remove the "test_" prefix
let moduleName = normalize(singleModule.toStrLit.strVal).substr(4)
let moduleMain = newIdentNode(moduleName & "Main")
# construct `of` branch
# of idx: moduleMain()
result = nnkOfBranch.newTree(
newLit(idx),
newCall(moduleMain)
)
proc toModuleNames(importStmt: NimNode): NimNode =
result = nnkBracket.newTree
for singleModule in importStmt:
let x = normalize(singleModule.toStrLit.strVal)
result.add newLit(x)
macro cliBuilder*(stmtList: typed): untyped =
let importStmt = stmtList.getImportStmt
let moduleCount = importStmt.len
let moduleNames = importStmt.toModuleNames
# case paramStr(1).parseInt
var caseStmt = nnkCaseStmt.newTree(
quote do: paramStr(1).parseInt
)
# of 0: codeStreamMain()
# of 1: gasMeterMain()
# of 2: memoryMain()
# ...
for idx, singleModule in importStmt:
caseStmt.add ofStmt(idx, singleModule)
# else:
# echo "invalid argument"
caseStmt.add nnkElse.newTree(
quote do: echo "invalid argument"
)
result = quote do:
if paramCount() == 0:
const names = `moduleNames`
quit(executeMyself(`moduleCount`, names))
else:
`caseStmt`
# if you want to add new test module(s)
# make sure you define an entry poin
# e.g.
# proc mytestMain*() =
# # put anything you want here
# and then give it a name `test_mytest.nim`
# the `mytest` part should match between
# the proc name and the module name
# if this executable called without any params
# it will execute each of the test by executing itself
# repeatedly until all sub-tests are executed.
# you can execute the sub-test by a number start from zero.

31
misc/vm_compile_info.nim Normal file
View File

@ -0,0 +1,31 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
func vmName(): string =
when defined(evmc_enabled):
"evmc"
else:
"nimvm"
const
VmName* = vmName()
warningMsg = block:
var rc = "*** Compiling with " & VmName
when defined(legacy_eth66_enabled):
rc &= ", legacy-eth/66"
when defined(chunked_rlpx_enabled):
rc &= ", chunked-rlpx"
when defined(boehmgc):
rc &= ", boehm/gc"
rc &= " enabled"
rc
{.warning: warningMsg.}
{.used.}

View File

@ -7,8 +7,7 @@
# This file may not be copied, modified, or distributed except according to # This file may not be copied, modified, or distributed except according to
# those terms. # those terms.
import import raft/consensus_state_machine
raft/raft_api import raft/types
export consensus_state_machine
export export types
raft_api, types, protocol

View File

@ -16,12 +16,42 @@ description = "raft consensus in nim"
license = "Apache License 2.0" license = "Apache License 2.0"
skipDirs = @["tests"] skipDirs = @["tests"]
requires "nim >= 1.6.0" requires "nim >= 1.6.14"
requires "stew >= 0.1.0" requires "stew >= 0.1.0"
requires "nimcrypto >= 0.5.4"
requires "unittest2 >= 0.0.4" requires "unittest2 >= 0.0.4"
requires "chronicles >= 0.10.2" requires "uuids >= 0.1.11"
requires "eth >= 1.0.0" requires "chronicles >= 0.10.3"
requires "chronos >= 3.2.0" requires "chronos >= 3.0.11"
requires "nimdbx >= 0.4.1"
requires "nimterop >= 0.6.13"
proc buildBinary(name: string, srcDir = "./", params = "", lang = "c") =
if not dirExists "build":
mkDir "build"
# allow something like "nim nimbus --verbosity:0 --hints:off nimbus.nims"
var extra_params = params
for i in 2..<paramCount():
extra_params &= " " & paramStr(i)
exec "nim " & lang & " --threads:on " & extra_params & " --out:build/" & name & " " & srcDir & name & ".nim"
proc test(path: string, name: string, params = "", lang = "c") =
# Verify stack usage is kept low by setting 750k stack limit in tests.
const stackLimitKiB = 750
when not defined(windows):
const (buildOption, runPrefix) = ("", "ulimit -s " & $stackLimitKiB & " && ")
else:
# No `ulimit` in Windows. `ulimit -s` in Bash is accepted but has no effect.
# See https://public-inbox.org/git/alpine.DEB.2.21.1.1709131448390.4132@virtualbox/
# Also, the command passed to NimScript `exec` on Windows is not a shell script.
# Instead, we can set stack size at link time.
const (buildOption, runPrefix) =
(" -d:windowsNoSetStack --passL:-Wl,--stack," & $(stackLimitKiB * 2048), "")
buildBinary name, (path & "/"), params & buildOption
exec runPrefix & "build/" & name
task test, "Run tests":
test "tests", "all_tests", "-d:chronicles_sinks=textlines -d:chronicles_log_level=ERROR -d:unittest2DisableParamFiltering"
# Helper functions # Helper functions

1
raft.nims Symbolic link
View File

@ -0,0 +1 @@
raft.nimble

View File

@ -1,8 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

59
raft/async_util.nim Normal file
View File

@ -0,0 +1,59 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import chronos
template awaitWithTimeout*[T](operation: Future[T],
deadline: Future[void],
body: untyped) =
let f {.inject.} = operation
await f or deadline
if not f.finished:
# If we don't wait for for the cancellation here, it's possible that
# the "next" operation will run concurrently to this one, messing up
# the order of operations (since await/async is not fair)
await cancelAndWait(f)
else:
body
# template awaitWithTimeout*[T](operation: Future[T],
# deadline: Future[void],
# onTimeout: untyped): T =
# let f = operation
# await f or deadline
# if not f.finished:
# # If we don't wait for for the cancellation here, it's possible that
# # the "next" operation will run concurrently to this one, messing up
# # the order of operations (since await/async is not fair)
# await cancelAndWait(f)
# onTimeout
# else:
# f.read
# template awaitWithTimeout*[T](operation: Future[T],
# timeout: Duration,
# onTimeout: untyped): T =
# awaitWithTimeout(operation, sleepAsync(timeout), onTimeout)
# template awaitWithTimeout*(operation: Future[void],
# deadline: Future[void],
# onTimeout: untyped) =
# let f = operation
# await f or deadline
# if not f.finished:
# # If we don't wait for for the cancellation here, it's possible that
# # the "next" operation will run concurrently to this one, messing up
# # the order of operations (since await/async is not fair)
# await cancelAndWait(f)
# onTimeout
# template awaitWithTimeout*(operation: Future[void],
# timeout: Duration,
# onTimeout: untyped) =
# awaitWithTimeout(operation, sleepAsync(timeout), onTimeout)

View File

@ -1,8 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

View File

@ -0,0 +1,452 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import types
import log
import tracker
import state
import std/[times]
import std/sequtils
import std/random
randomize()
type
RaftRpcMessageType* = enum
VoteRequest = 0,
VoteReplay = 1,
AppendRequest = 2,
AppendReplay = 3
RaftRpcCode* = enum
Rejected = 0,
Accepted = 1
RaftRpcAppendRequest* = object
previousTerm*: RaftNodeTerm
previousLogIndex*: RaftLogIndex
commitIndex*: RaftLogIndex
entries*: seq[LogEntry]
RaftRpcAppendReplayRejected* = object
nonMatchingIndex: RaftLogIndex
lastIdx: RaftLogIndex
RaftRpcAppendReplayAccepted* = object
lastNewIndex: RaftLogIndex
RaftRpcAppendReplay* = object
commitIndex: RaftLogIndex
term: RaftNodeTerm
case result: RaftRpcCode:
of Accepted: accepted: RaftRpcAppendReplayAccepted
of Rejected: rejected: RaftRpcAppendReplayRejected
RaftRpcVoteRequest* = object
currentTerm*: RaftNodeTerm
lastLogIndex*: RaftLogIndex
lastLogTerm*: RaftNodeTerm
force*: bool
RaftRpcVoteReplay* = object
currentTerm*: RaftNodeTerm
voteGranted*: bool
RaftRpcMessage* = object
currentTerm*: RaftNodeTerm
sender*: RaftNodeId
receiver*: RaftNodeId
case kind*: RaftRpcMessageType
of VoteRequest: voteRequest*: RaftRpcVoteRequest
of VoteReplay: voteReplay*: RaftRpcVoteReplay
of AppendRequest: appendRequest*: RaftRpcAppendRequest
of AppendReplay: appendReplay*: RaftRpcAppendReplay
RaftStateMachineOutput* = object
logEntries*: seq[LogEntry]
# Entries that should be applyed to the "User" State machine
committed*: seq[LogEntry]
messages*: seq[RaftRpcMessage]
debugLogs*: seq[string]
term*: RaftNodeTerm
votedFor*: Option[RaftNodeId]
stateChange*: bool
RaftStateMachine* = object
myId*: RaftNodeId
term*: RaftNodeTerm
commitIndex: RaftLogIndex
toCommit: RaftLogIndex
log: RaftLog
output: RaftStateMachineOutput
lastUpdate: times.Time
votedFor: RaftNodeId
currentLeader: RaftNodeId
pingLeader: bool
config: RaftConfig
lastElectionTime: times.DateTime
randomizedElectionTime: times.Duration
heartbeatTime: times.Duration
timeNow: times.DateTime
startTime: times.DateTime
electionTimeout: times.Duration
state*: RaftStateMachineState
func leader*(sm: var RaftStateMachine): var LeaderState =
return sm.state.leader
func follower*(sm: var RaftStateMachine): var FollowerState =
return sm.state.follower
func candidate*(sm: var RaftStateMachine): var CandidateState =
return sm.state.candidate
func debug*(sm: var RaftStateMachine, log: string) =
sm.output.debugLogs.add("[" & $(sm.timeNow - sm.startTime).inMilliseconds & "ms] [" & (($sm.myId)[0..7]) & "...] [" & $sm.state & "]: " & log)
proc resetElectionTimeout*(sm: var RaftStateMachine) =
# TODO actually pick random time
sm.randomizedElectionTime = sm.electionTimeout + times.initDuration(milliseconds = 100 + rand(200))
proc initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftLog, commitIndex: RaftLogIndex, config: RaftConfig, now: times.DateTime): RaftStateMachine =
var sm = RaftStateMachine()
sm.term = currentTerm
sm.log = log
sm.commitIndex = commitIndex
sm.state = initFollower(RaftNodeId())
sm.config = config
sm.lastElectionTime = now
sm.timeNow = now
sm.startTime = now
sm.myId = id
sm.electionTimeout = times.initDuration(milliseconds = 100)
sm.heartbeatTime = times.initDuration(milliseconds = 50)
sm.resetElectionTimeout()
return sm
func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] =
return sm.leader.tracker.find(id)
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendRequest, appendRequest: request))
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendReplay) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendReplay, appendReplay: request))
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteRequest) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteRequest, voteRequest: request))
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteReplay) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReplay, voteReplay: request))
func sendTo[MsgType](sm: var RaftStateMachine, id: RaftNodeId, request: MsgType) =
sm.debug "Sent to" & $id & $request
if sm.state.isLeader:
var follower = sm.findFollowerProggressById(id)
if follower.isSome:
follower.get().lastMessageAt = sm.timeNow
else:
sm.debug "Follower not found: " & $id
sm.debug $sm.leader
sm.sendToImpl(id, request)
func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage =
return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: VoteRequest, voteRequest: RaftRpcVoteRequest())
func replicateTo*(sm: var RaftStateMachine, follower: RaftFollowerProgressTracker) =
if follower.nextIndex > sm.log.lastIndex:
return
var previousTerm = sm.log.termForIndex(follower.nextIndex - 1)
sm.debug "replicate to " & $follower[]
if previousTerm.isSome:
let request = RaftRpcAppendRequest(
previousTerm: previousTerm.get(),
previousLogIndex: follower.nextIndex - 1,
commitIndex: sm.commitIndex,
entries: @[sm.log.getEntryByIndex(follower.nextIndex)])
follower.nextIndex += 1
sm.sendTo(follower.id, request)
else:
# TODO: we add support for snapshots
let request = RaftRpcAppendRequest(
previousTerm: 0,
previousLogIndex: 1,
commitIndex: sm.commitIndex,
entries: @[sm.log.getEntryByIndex(follower.nextIndex)])
follower.nextIndex += 1
sm.sendTo(follower.id, request)
sm.debug "exit" & $follower[]
func replicate*(sm: var RaftStateMachine) =
if sm.state.isLeader:
for followerIndex in 0..<sm.leader.tracker.progress.len:
if sm.myId != sm.leader.tracker.progress[followerIndex].id:
sm.replicateTo(sm.leader.tracker.progress[followerIndex])
func addEntry(sm: var RaftStateMachine, entry: LogEntry) =
if not sm.state.isLeader:
sm.debug "Error: only the leader can handle new entries"
sm.log.appendAsLeader(entry)
func addEntry*(sm: var RaftStateMachine, command: Command) =
sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletCommand, command: command))
func addEntry*(sm: var RaftStateMachine, config: Config) =
sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletConfig, config: config))
func addEntry*(sm: var RaftStateMachine, dummy: Empty) =
sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletEmpty, empty: true))
func becomeFollower*(sm: var RaftStateMachine, leaderId: RaftNodeId) =
if sm.myId == leaderId:
sm.debug "Can't be follower of itself"
sm.output.stateChange = not sm.state.isFollower
sm.state = initFollower(leaderId)
if leaderId != RaftnodeId():
sm.pingLeader = false
# TODO: Update last election time
func becomeLeader*(sm: var RaftStateMachine) =
if sm.state.isLeader:
sm.debug "The leader can't become leader second time"
return
sm.output.stateChange = true
sm.addEntry(Empty())
sm.state = initLeader(sm.config, sm.log.lastIndex, sm.timeNow)
sm.pingLeader = false
#TODO: Update last election time
return
func becomeCandidate*(sm: var RaftStateMachine) =
#TODO: implement
if not sm.state.isCandidate:
sm.output.stateChange = true
sm.state = initCandidate(sm.config)
sm.lastElectionTime = sm.timeNow
# TODO: Add configuration change logic
sm.term += 1
for nodeId in sm.candidate.votes.voters:
if nodeId == sm.myId:
sm.debug "reguster vote for it self "
discard sm.candidate.votes.registerVote(nodeId, true)
sm.votedFor = nodeId
continue
let request = RaftRpcVoteRequest(currentTerm: sm.term, lastLogIndex: sm.log.lastIndex, lastLogTerm: sm.log.lastTerm, force: false)
sm.sendTo(nodeId, request)
sm.debug "Elecation won" & $(sm.candidate.votes) & $sm.myId
if sm.candidate.votes.tallyVote == RaftElectionResult.Won:
sm.becomeLeader()
return
func hearthbeat(sm: var RaftStateMachine, follower: var RaftFollowerProgressTracker) =
sm.debug "hearthbear" & $follower.nextIndex
sm.addEntry(Empty())
func tickLeader*(sm: var RaftStateMachine, now: times.DateTime) =
sm.timeNow = now
# if sm.lastElectionTime - sm.timeNow > sm.electionTimeout:
# sm.becomeFollower(RaftnodeId())
# return
sm.lastElectionTime = now
if not sm.state.isLeader:
sm.debug "tickLeader can be called only on the leader"
return
for followerIndex in 0..<sm.leader.tracker.progress.len:
var follower = sm.leader.tracker.progress[followerIndex]
if sm.myId != follower.id:
if follower.matchIndex < sm.log.lastIndex or follower.commitIndex < sm.commitIndex:
sm.replicateTo(follower)
#sm.debug "replicate to" & $follower
#sm.debug $(now - follower.lastMessageAt)
if now - follower.lastMessageAt > sm.heartbeatTime:
sm.debug "heartbeat"
sm.hearthbeat(follower)
# TODO: implement step down logic
func tick*(sm: var RaftStateMachine, now: times.DateTime) =
sm.debug "Term: " & $sm.term & " commit idx " & $sm.commitIndex & " Time since last update: " & $(now - sm.timeNow).inMilliseconds & "ms time until election:" & $(sm.randomizedElectionTime - (sm.timeNow - sm.lastElectionTime)).inMilliseconds & "ms"
sm.timeNow = now
if sm.state.isLeader:
sm.tickLeader(now);
elif sm.state.isFollower and sm.timeNow - sm.lastElectionTime > sm.randomizedElectionTime:
sm.debug "Become candidate"
sm.becomeCandidate()
func poll*(sm: var RaftStateMachine): RaftStateMachineOutput =
# Should initiate replication if we have new entries
if sm.state.isLeader:
sm.replicate()
sm.output.term = sm.term
if sm.votedFor != RaftnodeId():
sm.output.votedFor = some(sm.votedFor)
let output = sm.output
sm.output = RaftStateMachineOutput()
return output
func commit*(sm: var RaftStateMachine) =
if not sm.state.isLeader:
return
var newIndex = sm.commitIndex
var nextIndex = sm.commitIndex + 1
while nextIndex < sm.log.lastIndex:
var replicationCnt = 0
for p in sm.leader.tracker.progress:
if p.matchIndex > newIndex:
replicationCnt += 1
if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1):
sm.output.committed.add(sm.log.getEntryByIndex(nextIndex))
sm.commitIndex += nextIndex;
nextIndex += 1
else:
break
func appendEntryReplay*(sm: var RaftStateMachine, fromId: RaftNodeId, replay: RaftRpcAppendReplay) =
if not sm.state.isLeader:
sm.debug "You can't append append replay to the follower"
return
var follower = sm.findFollowerProggressById(fromId)
if not follower.isSome:
sm.debug "Can't find the follower"
return
follower.get().commitIndex = max(follower.get().commitIndex, replay.commitIndex)
case replay.result:
of RaftRpcCode.Accepted:
let lastIndex = replay.accepted.lastNewIndex
sm.debug "Accpeted" & $fromId & " " & $lastIndex
follower.get().accepted(lastIndex)
# TODO: add leader stepping down logic here
sm.commit()
if not sm.state.isLeader:
return
of RaftRpcCode.Rejected:
if replay.rejected.nonMatchingIndex == 0 and replay.rejected.lastIdx == 0:
sm.replicateTo(follower.get())
follower.get().nextIndex = min(replay.rejected.nonMatchingIndex, replay.rejected.lastIdx + 1)
# if commit apply configuration that removes current follower
# we should take it again
var follower2 = sm.findFollowerProggressById(fromId)
if follower2.isSome:
sm.replicateTo(follower2.get())
func advanceCommitIdx(sm: var RaftStateMachine, leaderIdx: RaftLogIndex) =
let newIdx = min(leaderIdx, sm.log.lastIndex)
if newIdx > sm.commitIndex:
sm.commitIndex = newIdx
# TODO: signal the output for the update
func appendEntry*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcAppendRequest) =
if not sm.state.isFollower:
sm.debug "You can't append append request to the non follower"
return
let (match, term) = sm.log.matchTerm(request.previousLogIndex, request.previousTerm)
if not match:
let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex)
let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected)
sm.sendTo(fromId, responce)
sm.debug "Reject to apply the entry"
for entry in request.entries:
sm.log.appendAsFollower(entry)
sm.advanceCommitIdx(request.commitIndex)
let accepted = RaftRpcAppendReplayAccepted(lastNewIndex: sm.log.lastIndex)
let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted)
sm.sendTo(fromId, responce)
func requestVote*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteRequest) =
let canVote = sm.votedFor == fromId or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId())
if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm):
let responce = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: true)
sm.sendTo(fromId, responce)
else:
let responce: RaftRpcVoteReplay = RaftRpcVoteReplay(currentTerm: sm.term, voteGranted: false)
sm.sendTo(fromId, responce)
func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteReplay) =
if not sm.state.isCandidate:
sm.debug "Non candidate can't handle votes"
return
discard sm.candidate.votes.registerVote(fromId, request.voteGranted)
case sm.candidate.votes.tallyVote:
of RaftElectionResult.Unknown:
return
of RaftElectionResult.Won:
sm.debug "Win election"
sm.becomeLeader()
of RaftElectionResult.Lost:
sm.debug "Lost election"
sm.becomeFollower(RaftNodeId())
func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime) =
#sm.debug $msg
if msg.currentTerm > sm.term:
sm.debug "Current node is behind"
var leaderId = RaftnodeId()
if msg.kind == RaftRpcMessageType.AppendRequest:
leaderId = msg.sender
sm.becomeFollower(leaderId)
# TODO: implement pre vote
sm.term = msg.currentTerm
sm.votedFor = RaftnodeId()
elif msg.currentTerm < sm.term:
if msg.kind == RaftRpcMessageType.AppendRequest:
# Instruct leader to step down
let rejected = RaftRpcAppendReplayRejected(nonMatchingIndex: 0, lastIdx: sm.log.lastIndex)
let responce = RaftRpcAppendReplay(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected)
sm.sendTo(msg.sender, responce)
sm.debug "Ignore message with lower term"
else:
# TODO: add also snapshot
if msg.kind == RaftRpcMessageType.AppendRequest:
if sm.state.isCandidate:
sm.becomeFollower(msg.sender)
elif sm.state.isFollower:
sm.follower.leader = msg.sender
# TODO: fix time
if sm.state.isCandidate:
if msg.kind == RaftRpcMessageType.VoteRequest:
sm.requestVote(msg.sender, msg.voteRequest)
elif msg.kind == RaftRpcMessageType.VoteReplay:
sm.debug "Apply vote"
sm.requestVoteReply(msg.sender, msg.voteReplay)
else:
sm.debug "Candidate ignore message"
elif sm.state.isFollower:
if msg.sender == sm.follower.leader:
sm.lastElectionTime = now
if msg.kind == RaftRpcMessageType.AppendRequest:
sm.appendEntry(msg.sender, msg.appendRequest)
elif msg.kind == RaftRpcMessageType.VoteRequest:
sm.requestVote(msg.sender, msg.voteRequest)
else:
sm.debug "Follower ignore message" & $msg
# TODO: imelement the rest of the state transitions
elif sm.state.isLeader:
if msg.kind == RaftRpcMessageType.AppendRequest:
sm.debug "Ignore message leader append his entries directly"
elif msg.kind == RaftRpcMessageType.AppendReplay:
sm.appendEntryReplay(msg.sender, msg.appendReplay)
elif msg.kind == RaftRpcMessageType.VoteRequest:
sm.requestVote(msg.sender, msg.voteRequest)
else:
sm.debug "Leader ignore message"

103
raft/log.nim Normal file
View File

@ -0,0 +1,103 @@
import types
import std/sequtils
type
RaftLogEntryType* = enum
rletCommand = 0,
rletConfig = 1,
rletEmpty = 2
Command* = object
data: seq[byte]
Config* = object
Empty* = object
LogEntry* = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
index*: RaftLogIndex
# TODO: Add configuration too
case kind*: RaftLogEntryType:
of rletCommand: command*: Command
of rletConfig: config*: Config
of rletEmpty: empty*: bool
RaftLog* = object
logEntries: seq[LogEntry]
firstIndex: RaftLogIndex
func initRaftLog*(firstIndex: RaftLogIndex): RaftLog =
var log = RaftLog()
assert firstIndex > 0
log.firstIndex = firstIndex
return log
func lastTerm*(rf: RaftLog): RaftNodeTerm =
# Not sure if it's ok, maybe we should return optional value
let size = rf.logEntries.len
if size == 0:
return 0
return rf.logEntries[size - 1].term
func entriesCount*(rf: RaftLog): int =
return rf.logEntries.len
func lastIndex*(rf: RaftLog): RaftNodeTerm =
return rf.logEntries.len + rf.firstIndex - 1
func nextIndex*(rf: RaftLog): int =
return rf.lastIndex + 1
func truncateUncomitted*(rf: var RaftLog, index: RaftLogIndex) =
# TODO: We should add support for configurations and snapshots
if rf.logEntries.len == 0:
return
rf.logEntries.delete((index - rf.firstIndex)..<len(rf.logEntries))
func isUpToDate*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): bool =
return term > rf.lastTerm or (term == rf.lastTerm and index >= rf.lastIndex)
func getEntryByIndex*(rf: RaftLog, index: RaftLogIndex): LogEntry =
return rf.logEntries[index - rf.firstIndex]
func appendAsLeader*(rf: var RaftLog, entry: LogEntry) =
rf.logEntries.add(entry)
func appendAsFollower*(rf: var RaftLog, entry: LogEntry) =
assert entry.index > 0
let currentIdx = rf.lastIndex
if entry.index <= currentIdx:
# TODO: The indexing hold only if we keep all entries in memory
# we should change it when we add support for snapshots
if entry.index >= rf.firstIndex or entry.term != rf.getEntryByIndex(entry.index).term:
rf.truncateUncomitted(entry.index)
rf.logEntries.add(entry)
func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletCommand, command: data))
func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, empty: bool) =
rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletEmpty, empty: true))
func appendAsFollower*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
rf.appendAsFollower(LogEntry(term: term, index: index, kind: rletCommand, command: data))
func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): (bool, RaftNodeTerm) =
if len(rf.logEntries) == 0:
return (true, 0)
# TODO: We should add support for snapshots
if index > len(rf.logEntries):
# The follower doesn't have all etries
return (false, 0)
let i = index - rf.firstIndex
if rf.logEntries[i].term == term:
return (true, 0)
else:
return (false, rf.logEntries[i].term)
func termForIndex*(rf: RaftLog, index: RaftLogIndex): Option[RaftNodeTerm] =
# TODO: snapshot support
assert rf.logEntries.len > index
if rf.logEntries.len > 0 and index >= rf.firstIndex:
return some(rf.logEntries[index].term)
return none(RaftNodeTerm)

View File

@ -1,8 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

View File

@ -1,8 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

View File

@ -1,48 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
# #
# Raft Messages Protocol definition #
# #
import types
import options
type
# Raft Node Messages OPs
RaftMessageOps* = enum
rmoRequestVote = 0,
rmoAppendLogEntry = 1,
rmoInstallSnapshot = 2 # For dynamic adding of new Raft Nodes
RaftMessagePayloadChecksum* = object # Checksum probably will be a SHA3 hash not sure about this at this point
RaftMessagePayload*[LogEntryDataType] = ref object
data*: RaftNodeLogEntry[LogEntryDataType]
checksum*: RaftMessagePayloadChecksum
RaftMessage*[LogEntryDataType] = ref object of RaftMessageBase
op*: RaftMessageOps # Message Op - Ask For Votes, Append Entry(ies), Install Snapshot etc.
payload*: Option[seq[RaftMessagePayload[LogEntryDataType]]] # Optional Message Payload(s) - e.g. log entry(ies). Will be empty for a Heart-Beat # Heart-Beat will be a message with Append Entry(ies) Op and empty payload
RaftMessageResponse*[SmStateType] = ref object of RaftMessageBase
success*: bool # Indicates success/failure
state*: Option[SmStateType] # Raft Abstract State Machine State
# Raft Node Client Request/Response definitions
RaftNodeClientRequestOps = enum
rncroRequestState = 0,
rncroAppendNewEntry = 1
RaftNodeClientRequest*[LogEntryDataType] = ref object
op*: RaftNodeClientRequestOps
payload*: Option[RaftMessagePayload[LogEntryDataType]] # Optional RaftMessagePayload carrying a Log Entry
RaftNodeClientResponse*[SmStateType] = ref object
success*: bool # Indicate succcess
state*: Option[SmStateType] # Optional Raft Abstract State Machine State
raftNodeRedirectId*: Option[RaftNodeId] # Optional Raft Node ID to redirect the request to in case of failure

View File

@ -1,70 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import types
import protocol
export types, protocol
# Raft Node Public API procedures / functions
proc RaftNodeCreateNew*[LogEntryDataType, SmStateType]( # Create New Raft Node
id: RaftNodeId, peers: RaftNodePeers,
persistentStorage: RaftNodePersistentStorage,
msgSendCallback: RaftMessageSendCallback): RaftNode[LogEntryDataType, SmStateType] =
discard
proc RaftNodeLoad*[LogEntryDataType, SmStateType](
persistentStorage: RaftNodePersistentStorage, # Load Raft Node From Storage
msgSendCallback: RaftMessageSendCallback): Result[RaftNode[LogEntryDataType, SmStateType], string] =
discard
proc RaftNodeStop*(node: RaftNode) =
discard
proc RaftNodeStart*(node: RaftNode) =
discard
func RaftNodeIdGet*(node: RaftNode): RaftNodeId = # Get Raft Node ID
discard
func RaftNodeStateGet*(node: RaftNode): RaftNodeState = # Get Raft Node State
discard
func RaftNodeTermGet*(node: RaftNode): RaftNodeTerm = # Get Raft Node Term
discard
func RaftNodePeersGet*(node: RaftNode): RaftNodePeers = # Get Raft Node Peers
discard
func RaftNodeIsLeader*(node: RaftNode): bool = # Check if Raft Node is Leader
discard
proc RaftNodeMessageDeliver*(node: RaftNode, raftMessage: RaftMessageBase): RaftMessageResponse {.discardable.} = # Deliver Raft Message to the Raft Node
discard
proc RaftNodeRequest*(node: RaftNode, req: RaftNodeClientRequest): RaftNodeClientResponse = # Process RaftNodeClientRequest
discard
proc RaftNodeLogIndexGet*(node: RaftNode): RaftLogIndex =
discard
proc RaftNodeLogEntryGet*(node: RaftNode, logIndex: RaftLogIndex): Result[RaftNodeLogEntry, string] =
discard
# Abstract State Machine Ops
func RaftNodeSmStateGet*[LogEntryDataType, SmStateType](node: RaftNode[LogEntryDataType, SmStateType]): SmStateType =
node.stateMachine.state
proc RaftNodeSmInit[LogEntryDataType, SmStateType](stateMachine: var RaftNodeStateMachine[LogEntryDataType, SmStateType]) =
mixin RaftSmInit
RaftSmInit(stateMachine)
proc RaftNodeSmApply[LogEntryDataType, SmStateType](stateMachine: RaftNodeStateMachine[LogEntryDataType, SmStateType], logEntry: LogEntryDataType) =
mixin RaftSmApply
RaftSmApply(stateMachine, logEntry)

4
raft/rlock.nim Normal file
View File

@ -0,0 +1,4 @@
import types
template withRlockAlt(l: RLock, body: untyped) =

57
raft/state.nim Normal file
View File

@ -0,0 +1,57 @@
import types
import tracker
import std/[times]
type
RaftNodeState = enum
rnsFollower = 0, # Follower state
rnsCandidate = 1 # Candidate state
rnsLeader = 2 # Leader state
RaftStateMachineState* = object
case state: RaftNodeState
of rnsFollower: follower: FollowerState
of rnsCandidate: candidate: CandidateState
of rnsLeader: leader: LeaderState
LeaderState* = object
tracker*: RaftTracker
CandidateState* = object
votes*: RaftVotes
FollowerState* = object
leader*: RaftNodeId
func `$`*(s: RaftStateMachineState): string =
return $s.state
func initLeader*(cfg: RaftConfig, index: RaftLogIndex, now: times.DateTime): RaftStateMachineState =
var state = RaftStateMachineState(state: RaftnodeState.rnsLeader, leader: LeaderState())
state.leader.tracker = initTracker(cfg, index, now)
return state
func initFollower*(leaderId: RaftNodeId): RaftStateMachineState =
return RaftStateMachineState(state: RaftNodeState.rnsFollower, follower: FollowerState(leader: leaderId))
func initCandidate*(cfg: RaftConfig): RaftStateMachineState =
return RaftStateMachineState(state: RaftnodeState.rnsCandidate, candidate: CandidateState(votes: initVotes(cfg)))
func isLeader*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsLeader
func isFollower*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsFollower
func isCandidate*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsCandidate
func leader*(s: var RaftStateMachineState): var LeaderState =
return s.leader
func follower*(s: var RaftStateMachineState): var FollowerState =
return s.follower
func candidate*(s: var RaftStateMachineState): var CandidateState =
return s.candidate

98
raft/tracker.nim Normal file
View File

@ -0,0 +1,98 @@
import types
import std/sequtils
import std/[times]
type
RaftElectionResult* = enum
Unknown = 0,
Won = 1,
Lost = 2
RaftElectionTracker* = object
all: seq[RaftNodeId]
responded: seq[RaftNodeId]
granted: int
RaftVotes* = object
voters*: seq[RaftNodeId]
current: RaftElectionTracker
RaftFollowerProgress = seq[RaftFollowerProgressTracker]
RaftTracker* = object
progress*: RaftFollowerProgress
current: seq[RaftNodeId]
RaftFollowerProgressTracker* = ref object
id*: RaftNodeId
nextIndex*: RaftLogIndex
# Index of the highest log entry known to be replicated to this server.
matchIndex*: RaftLogIndex
commitIndex*: RaftLogIndex
replayedIndex: RaftLogIndex
lastMessageAt*: times.DateTime
func initElectionTracker*(nodes: seq[RaftNodeId]): RaftElectionTracker =
var r = RaftElectionTracker()
r.all = nodes
r.granted = 0
return r
func registerVote*(ret: var RaftElectionTracker, nodeId: RaftNodeId, granted: bool): bool =
if not ret.all.contains nodeId:
return false
if not ret.responded.contains nodeId:
ret.responded.add(nodeId)
if granted:
ret.granted += 1
return true
func tallyVote*(ret: var RaftElectionTracker): RaftElectionResult =
let quorym = int(len(ret.all) / 2) + 1
if ret.granted >= quorym:
return RaftElectionResult.Won
let unkown = len(ret.all) - len(ret.responded)
if ret.granted + unkown >= quorym:
return RaftElectionResult.Unknown
else:
return RaftElectionResult.Lost
func initVotes*(nodes: seq[RaftNodeId]): RaftVotes =
var r = RaftVotes(voters: nodes, current: initElectionTracker(nodes))
return r
func initVotes*(config: RaftConfig): RaftVotes =
var r = RaftVotes(voters: config.currentSet, current: initElectionTracker(config.currentSet))
return r
func registerVote*(rv: var RaftVotes, nodeId: RaftNodeId, granted: bool): bool =
# TODO: Add support for configuration
return rv.current.registerVote(nodeId, granted)
func tallyVote*(rv: var RaftVotes): RaftElectionResult =
# TODO: Add support for configuration
return rv.current.tallyVote()
func find*(ls: RaftTracker, id: RaftnodeId): Option[RaftFollowerProgressTracker] =
for follower in ls.progress:
if follower.id == id:
return some(follower)
return none(RaftFollowerProgressTracker)
func initFollowerProgressTracker*(follower: RaftNodeId, nextIndex: RaftLogIndex, now: times.DateTime): RaftFollowerProgressTracker =
return RaftFollowerProgressTracker(id: follower, nextIndex: nextIndex, matchIndex: 0, commitIndex: 0, replayedIndex: 0, lastMessageAt: now)
func initTracker*(config: RaftConfig, nextIndex: RaftLogIndex, now: times.DateTime): RaftTracker =
var tracker = RaftTracker()
for node in config.currentSet:
tracker.progress.add(initFollowerProgressTracker(node, nextIndex, now))
tracker.current.add(node)
return tracker
func accepted*(fpt: var RaftFollowerProgressTracker, index: RaftLogIndex)=
fpt.matchIndex = max(fpt.matchIndex, index)
fpt.nextIndex = max(fpt.nextIndex, index)

View File

@ -7,114 +7,27 @@
# This file may not be copied, modified, or distributed except according to # This file may not be copied, modified, or distributed except according to
# those terms. # those terms.
# Raft Node Public Types. # Raft Node Public Types
# I guess that at some point these can be moved to a separate file called raft_consensus_types.nim for example
import std/locks import std/rlocks
import options
import stew/results import stew/results
import eth/keyfile import uuids
import chronos
export results export
results,
options,
rlocks,
uuids,
chronos
const
DefaultUUID* = initUUID(0, 0) # 00000000-0000-0000-0000-000000000000
type type
# Raft Node basic definitions RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node
Blob* = seq[byte] RaftNodeTerm* = int # Raft Node Term Type
RaftLogIndex* = int # Raft Node Log Index Type
RaftNodeState* = enum RaftConfig* = object
UNKNOWN = 0, currentSet*: seq[RaftNodeId]
FOLLOWER = 1,
LEADER = 2
RaftNodeId* = UUID # UUID uniquely identifying every Raft Node
RaftNodePeers* = seq[RaftNodeId] # List of Raft Node Peers IDs
RaftNodeTerm* = uint64 # Raft Node Term Type
RaftLogIndex* = uint64 # Raft Node Log Index Type
# Raft Node Abstract State Machine type
RaftNodeStateMachine*[LogEntryDataType, SmStateType] = ref object # Some probably opaque State Machine Impelementation to be used by the Raft Node
# providing at minimum operations for initialization, querying the current state
# and RaftNodeLogEntry application
state: SmStateType
# Raft Node Persistent Storage basic definition
RaftNodePersistentStorage* = ref object # Should be some kind of Persistent Transactional Store Wrapper
# Basic modules (algos) definitions
RaftNodeAccessCallback[LogEntryDataType] = proc: RaftNode[LogEntryDataType] {.nimcall, gcsafe.} # This should be implementes as a closure holding the RaftNode
RaftConsensusModule*[LogEntryDataType] = object of RootObj
stateTransitionsFsm: seq[byte] # I plan to use nim.fsm https://github.com/ba0f3/fsm.nim
raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType]
RaftLogCompactionModule*[LogEntryDataType] = object of RootObj
raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType]
RaftMembershipChangeModule*[LogEntryDataType] = object of RootObj
raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType]
# Callback for sending messages out of this Raft Node
RaftMessageId* = UUID # UUID assigned to every Raft Node Message,
# so it can be matched with it's corresponding response etc.
RaftMessageSendCallback* = proc (raftMessage: RaftMessageBase) {.nimcall, gcsafe.} # Callback for Sending Raft Node Messages
# out of this Raft Node. Can be used for broadcasting
# (a Heart-Beat for example)
# Raft Node basic Log definitions
RaftNodeLogEntry*[LogEntryDataType] = ref object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
data*: LogEntryDataType
RaftNodeLog*[LogEntryDataType] = ref object # Needs more elaborate definition.
# Probably this will be a RocksDB/MDBX/SQLite Store Wrapper etc.
logData*: seq[RaftNodeLogEntry[LogEntryDataType]] # Raft Node Log Data
# Base type for Raft message objects
RaftMessageBase* = ref object of RootObj # Base Type for Raft Node Messages
msgId*: RaftMessageId # Message UUID
senderId*: RaftNodeId # Sender Raft Node ID
senderTerm*: RaftNodeTerm # Sender Raft Node Term
peers*: RaftNodePeers # List of Raft Node IDs, which should receive this message
# Raft Node Object type
RaftNode*[LogEntryDataType, SmStateType] = ref object
# Timers
votingTimout: uint64
heartBeatTimeout: uint64
# etc. timers
# Mtx definitions go here
raftStateMutex: Lock
raftLogMutex: Lock
raftCommMutexReceiveMsg: Lock
raftCommMutexClientResponse: Lock
# Modules (Algos)
consensusModule: RaftConsensusModule[LogEntryDataType]
logCompactionModule: RaftLogCompactionModule[LogEntryDataType]
membershipChangeModule: RaftMembershipChangeModule[LogEntryDataType]
# Misc
msgSendCallback: RaftMessageSendCallback
persistentStorage: RaftNodePersistentStorage
# Persistent state
id: RaftNodeId # This Raft Node ID
state: RaftNodeState # This Raft Node State
currentTerm: RaftNodeTerm # Latest term this Raft Node has seen (initialized to 0 on first boot, increases monotonically)
log: RaftNodeLog[LogEntryDataType] # This Raft Node Log
votedFor: RaftNodeId # Candidate RaftNodeId that received vote in current term (or nil/zero if none),
# also used to redirect Client Requests in case this Raft Node is not the leader
peers: RaftNodePeers # This Raft Node Peers IDs. I am not sure if this must be persistent or volatile but making it persistent
# makes sense for the moment
stateMachine: RaftNodeStateMachine[LogEntryDataType, SmStateType] # Not sure for now putting it here. I assume that persisting the State Machine's
# state is enough to consider it 'persisted'
# Volatile state
commitIndex: RaftLogIndex # Index of highest log entry known to be committed (initialized to 0, increases monotonically)
lastApplied: RaftLogIndex # Index of highest log entry applied to state machine (initialized to 0, increases monotonically)
# Volatile state on leaders
nextIndex: seq[RaftLogIndex] # For each peer Raft Node, index of the next log entry to send to that Node
# (initialized to leader last log index + 1)
matchIndex: seq[RaftLogIndex] # For each peer Raft Node, index of highest log entry known to be replicated on Node
# (initialized to 0, increases monotonically)

34
raft_node_config.json Normal file
View File

@ -0,0 +1,34 @@
{
"raftPeers":[
{
"id": "f9695ea4-4f37-11ee-8e75-8ff5a48faa42",
"host": "127.0.0.1",
"port": 7771,
"friendly_name": "Raft Node 1"
},
{
"id": "04fd202a-4f38-11ee-9ee2-23aba7dde7d3",
"host": "127.0.0.1",
"port": 7772,
"friendly_name": "Raft Node 2 etc."
},
{
"id": "09f3e6b8-4f38-11ee-a221-4b348f8bbde7",
"host": "127.0.0.1",
"port": 7773,
"friendly_name": "Raft Node 3 etc."
},
{
"id": "0edc0976-4f38-11ee-b1ad-5b3b0f690e65",
"host": "127.0.0.1",
"port": 7774,
"friendly_name": "Raft Node 4 etc."
},
{
"id": "130d0662-4f38-11ee-a6ed-2746aae5bc0b",
"host": "127.0.0.1",
"port": 7775,
"friendly_name": "Raft Node 5 etc."
}
]
}

2
run_all_tests.sh Executable file
View File

@ -0,0 +1,2 @@
#!/bin/bash
nim test raft.nims

297
scripts/ci/build_nim.sh Normal file
View File

@ -0,0 +1,297 @@
#!/usr/bin/env bash
# used in Travis CI and AppVeyor scripts
# Copyright (c) 2018-2020 Status Research & Development GmbH. Licensed under
# either of:
# - Apache License, version 2.0
# - MIT license
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.
set -e
# NIM_COMMIT could be a (partial) commit hash, a tag, a branch name, etc. Empty by default.
NIM_COMMIT_HASH="" # full hash for NIM_COMMIT, retrieved in "nim_needs_rebuilding()"
# script arguments
[[ $# -ne 4 ]] && { echo "Usage: $0 nim_dir csources_dir nimble_dir ci_cache_dir"; exit 1; }
NIM_DIR="$1"
CSOURCES_DIR="$2" # can be relative to NIM_DIR; only used when `skipIntegrityCheck` unsupported
NIMBLE_DIR="$3" # can be relative to NIM_DIR; only used when `skipIntegrityCheck` unsupported
CI_CACHE="$4"
## env vars
# verbosity level
[[ -z "$V" ]] && V=0
[[ -z "$CC" ]] && CC="gcc"
# to build csources in parallel, set MAKE="make -jN"
[[ -z "$MAKE" ]] && MAKE="make"
# for 32-bit binaries on a 64-bit host
UCPU=""
[[ "$ARCH_OVERRIDE" == "x86" ]] && UCPU="ucpu=i686"
[[ -z "$NIM_BUILD_MSG" ]] && NIM_BUILD_MSG="Building the Nim compiler"
[[ -z "$QUICK_AND_DIRTY_COMPILER" ]] && QUICK_AND_DIRTY_COMPILER=0
[[ -z "$QUICK_AND_DIRTY_NIMBLE" ]] && QUICK_AND_DIRTY_NIMBLE=0
# Windows detection
if uname | grep -qiE "mingw|msys"; then
ON_WINDOWS=1
EXE_SUFFIX=".exe"
# otherwise it fails in AppVeyor due to https://github.com/git-for-windows/git/issues/2495
GIT_TIMESTAMP_ARG="--date=unix" # available since Git 2.9.4
else
ON_WINDOWS=0
EXE_SUFFIX=""
GIT_TIMESTAMP_ARG="--date=format-local:%s" # available since Git 2.7.0
fi
NIM_BINARY="${NIM_DIR}/bin/nim${EXE_SUFFIX}"
MAX_NIM_BINARIES="10" # Old ones get deleted.
nim_needs_rebuilding() {
REBUILD=0
NO_REBUILD=1
if [[ ! -e "$NIM_DIR" ]]; then
# Shallow clone, optimised for the default NIM_COMMIT value.
git clone -q --depth=1 https://github.com/status-im/Nim.git "$NIM_DIR"
fi
pushd "${NIM_DIR}" >/dev/null
if [[ -n "${NIM_COMMIT}" ]]; then
# support old Git versions, like the one from Ubuntu-18.04
git restore . 2>/dev/null || git reset --hard
if ! git checkout -q ${NIM_COMMIT} 2>/dev/null; then
# Pay the price for a non-default NIM_COMMIT here, by fetching everything.
# (This includes upstream branches and tags that might be missing from our fork.)
git remote add upstream https://github.com/nim-lang/Nim
git fetch --all --tags --quiet
git checkout -q ${NIM_COMMIT}
fi
# In case the local branch diverged and a fast-forward merge is not possible.
git fetch || true
git reset -q --hard origin/${NIM_COMMIT} 2>/dev/null || true
# In case NIM_COMMIT is a local branch that's behind the remote one it's tracking.
git pull -q 2>/dev/null || true
git checkout -q ${NIM_COMMIT}
# We can't use "rev-parse" here, because it would return the tag object's
# hash instead of the commit hash, when NIM_COMMIT is a tag.
NIM_COMMIT_HASH="$(git rev-list -n 1 ${NIM_COMMIT})"
else
# NIM_COMMIT is empty, so assume the commit we need is already checked out
NIM_COMMIT_HASH="$(git rev-list -n 1 HEAD)"
fi
popd >/dev/null
if [[ -n "$CI_CACHE" && -d "$CI_CACHE" ]]; then
cp -a "$CI_CACHE"/* "$NIM_DIR"/bin/ || true # let this one fail with an empty cache dir
fi
# Delete old Nim binaries, to put a limit on how much storage we use.
for F in "$(ls -t "${NIM_DIR}"/bin/nim_commit_* 2>/dev/null | tail -n +$((MAX_NIM_BINARIES + 1)))"; do
if [[ -e "${F}" ]]; then
rm "${F}"
fi
done
# Compare the last built commit to the one requested.
# Handle the scenario where our symlink is manually deleted by the user.
if [[ -e "${NIM_DIR}/bin/last_built_commit" && \
-e "${NIM_DIR}/bin/nim${EXE_SUFFIX}" && \
"$(cat "${NIM_DIR}/bin/last_built_commit")" == "${NIM_COMMIT_HASH}" ]]; then
return $NO_REBUILD
elif [[ -e "${NIM_DIR}/bin/nim_commit_${NIM_COMMIT_HASH}" ]]; then
# we built the requested commit in the past, so we simply reuse it
rm -f "${NIM_DIR}/bin/nim${EXE_SUFFIX}"
ln -s "nim_commit_${NIM_COMMIT_HASH}" "${NIM_DIR}/bin/nim${EXE_SUFFIX}"
echo ${NIM_COMMIT_HASH} > "${NIM_DIR}/bin/last_built_commit"
return $NO_REBUILD
else
return $REBUILD
fi
}
build_nim() {
echo -e "$NIM_BUILD_MSG"
# [[ "$V" == "0" ]] && exec &>/dev/null
# working directory
pushd "$NIM_DIR"
if grep -q "skipIntegrityCheck" koch.nim && [ "${NIM_COMMIT}" != "version-1-6" ]; then
echo "in if"
# Run Nim buildchain, with matching dependency versions
# - CSOURCES_REPO from Nim/config/build_config.txt (nim_csourcesUrl)
# - CSOURCES_COMMIT from Nim/config/build_config.txt (nim_csourcesHash)
# - NIMBLE_REPO from Nim/koch.nim (bundleNimbleExe)
# - NIMBLE_COMMIT from Nim/koch.nim (NimbleStableCommit)
. ci/funs.sh
NIMCORES=1 nimBuildCsourcesIfNeeded $UCPU
bin/nim c --noNimblePath --skipUserCfg --skipParentCfg --warnings:off --hints:off koch
./koch --skipIntegrityCheck boot -d:release --skipUserCfg --skipParentCfg --warnings:off --hints:off
if [[ "${QUICK_AND_DIRTY_COMPILER}" == "0" ]]; then
# We want tools
./koch tools -d:release --skipUserCfg --skipParentCfg --warnings:off --hints:off
elif [[ "${QUICK_AND_DIRTY_NIMBLE}" != "0" ]]; then
# We just want nimble
./koch nimble -d:release --skipUserCfg --skipParentCfg --warnings:off --hints:off
fi
else
# Git commits
echo "in else"
: ${CSOURCES_V1_COMMIT:=561b417c65791cd8356b5f73620914ceff845d10}
: ${CSOURCES_V2_COMMIT:=86742fb02c6606ab01a532a0085784effb2e753e}
: ${CSOURCES_V1_REPO:=https://github.com/nim-lang/csources_v1.git}
: ${CSOURCES_V2_REPO:=https://github.com/nim-lang/csources_v2.git}
# After this Nim commit, use csources v2
: ${CSOURCES_V2_START_COMMIT:=f7c203fb6c89b5cef83c4f326aeb23ef8c4a2c40}
: ${NIMBLE_REPO:=https://github.com/nim-lang/nimble.git}
: ${NIMBLE_COMMIT:=a4fc798838ee753f5485dd19afab22e9367eb0e7} # 0.13.1
# Custom buildchain for older versions
# TODO Remove this once the default NIM_COMMIT supports `--skipIntegrityCheck`
# We will still be able to compile older versions by removing the flag,
# which will just waste a bit of CPU
# Git repos for csources and Nimble
if [[ ! -d "$CSOURCES_DIR" ]]; then
if git merge-base --is-ancestor $CSOURCES_V2_START_COMMIT $NIM_COMMIT_HASH; then
CSOURCES_REPO=$CSOURCES_V2_REPO
CSOURCES_COMMIT=$CSOURCES_V2_COMMIT
else
CSOURCES_REPO=$CSOURCES_V1_REPO
CSOURCES_COMMIT=$CSOURCES_V1_COMMIT
fi
mkdir -p "$CSOURCES_DIR"
pushd "$CSOURCES_DIR"
git clone $CSOURCES_REPO .
git checkout $CSOURCES_COMMIT
popd
fi
if [[ "$CSOURCES_DIR" != "csources" ]]; then
rm -rf csources
ln -s "$CSOURCES_DIR" csources
fi
# bootstrap the Nim compiler and build the tools
rm -f bin/{nim,nim_csources}
pushd csources
if [[ "$ON_WINDOWS" == "0" ]]; then
$MAKE $UCPU clean
$MAKE $UCPU LD=$CC
else
$MAKE myos=windows $UCPU clean
$MAKE myos=windows $UCPU CC=gcc LD=gcc
fi
popd
if [[ -e csources/bin ]]; then
rm -f bin/nim bin/nim_csources
cp -a csources/bin/nim bin/nim
cp -a csources/bin/nim bin/nim_csources
rm -rf csources/bin
else
cp -a bin/nim bin/nim_csources
fi
if [[ "$QUICK_AND_DIRTY_COMPILER" == "0" ]]; then
sed \
-e 's/koch$/--warnings:off --hints:off koch/' \
-e 's/koch boot/koch boot --warnings:off --hints:off/' \
-e '/nimBuildCsourcesIfNeeded/d' \
build_all.sh > build_all_custom.sh
sh build_all_custom.sh
rm build_all_custom.sh
else
# Don't re-build it multiple times until we get identical
# binaries, like "build_all.sh" does. Don't build any tools
# either. This is all about build speed, not developer comfort.
bin/nim_csources \
c \
--compileOnly \
--nimcache:nimcache \
-d:release \
--skipUserCfg \
--skipParentCfg \
--warnings:off \
--hints:off \
compiler/nim.nim
bin/nim_csources \
jsonscript \
--nimcache:nimcache \
--skipUserCfg \
--skipParentCfg \
compiler/nim.nim
cp -a compiler/nim bin/nim1
# If we stop here, we risk ending up with a buggy compiler:
# https://github.com/status-im/nimbus-eth2/pull/2220
# https://github.com/status-im/nimbus-eth2/issues/2310
bin/nim1 \
c \
--compileOnly \
--nimcache:nimcache \
-d:release \
--skipUserCfg \
--skipParentCfg \
--warnings:off \
--hints:off \
compiler/nim.nim
bin/nim1 \
jsonscript \
--nimcache:nimcache \
--skipUserCfg \
--skipParentCfg \
compiler/nim.nim
rm -f bin/nim
cp -a compiler/nim bin/nim
rm bin/nim1
if [[ ! -d "$NIMBLE_DIR" ]]; then
mkdir -p "$NIMBLE_DIR"
pushd "$NIMBLE_DIR"
git clone $NIMBLE_REPO .
git checkout $NIMBLE_COMMIT
pwd
../../bin/nim r src/nimblepkg/private/clone.nim
# we have to delete .git or koch.nim will checkout a branch tip, overriding our target commit
rm -rf .git
popd
fi
if [[ "$NIMBLE_DIR" != "dist/nimble" ]]; then
mkdir -p dist
rm -rf dist/nimble
ln -s ../"$NIMBLE_DIR" dist/nimble
fi
# Do we want Nimble in this quick build?
if [[ "${QUICK_AND_DIRTY_NIMBLE}" != "0" ]]; then
bin/nim c -d:release --noNimblePath --skipUserCfg --skipParentCfg dist/nimble/src/nimble.nim
mv dist/nimble/src/nimble bin/
fi
fi
fi
if [[ "$QUICK_AND_DIRTY_COMPILER" == "0" || "${QUICK_AND_DIRTY_NIMBLE}" != "0" ]]; then
# Nimble needs a CA cert
rm -f bin/cacert.pem
curl -LsS -o bin/cacert.pem https://curl.se/ca/cacert.pem || echo "Warning: 'curl' failed to download a CA cert needed by Nimble. Ignoring it."
fi
# record the built commit
echo ${NIM_COMMIT_HASH} > bin/last_built_commit
# create the symlink
mv bin/nim bin/nim_commit_${NIM_COMMIT_HASH}
ln -s nim_commit_${NIM_COMMIT_HASH} bin/nim${EXE_SUFFIX}
# update the CI cache
popd # we were in $NIM_DIR
if [[ -n "$CI_CACHE" ]]; then
rm -rf "$CI_CACHE"
mkdir "$CI_CACHE"
cp "$NIM_DIR"/bin/* "$CI_CACHE"/
fi
}
if nim_needs_rebuilding; then
build_nim
fi

View File

@ -6,3 +6,10 @@
# at your option. # at your option.
# This file may not be copied, modified, or distributed except according to # This file may not be copied, modified, or distributed except according to
# those terms. # those terms.
import ../misc/test_macro
{. warning[UnusedImport]:off .}
cliBuilder:
import ./test_consensus_state_machine

View File

@ -0,0 +1,369 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import unittest2
import ../raft/types
import ../raft/consensus_state_machine
import ../raft/log
import ../raft/tracker
import ../raft/state
import std/[times, sequtils]
import uuids
import tables
type
TestCluster* = object
nodes: Table[RaftnodeId, RaftStateMachine]
var test_ids_3 = @[
RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")),
RaftnodeId(parseUUID("2a98fc33-6559-44c0-b130-fc3e9df80a69")),
RaftnodeId(parseUUID("9156756d-697f-4ffa-9b82-0c86720344bd"))
]
var test_ids_1 = @[
RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")),
]
func createConfigFromIds(ids: seq[RaftnodeId]): RaftConfig =
var config = RaftConfig()
for id in ids:
config.currentSet.add(id)
return config
proc createCluster(ids: seq[RaftnodeId], now: times.DateTime) : TestCluster =
var config = createConfigFromIds(ids)
var cluster = TestCluster()
cluster.nodes = initTable[RaftnodeId, RaftStateMachine]()
for i in 0..<config.currentSet.len:
let id = config.currentSet[i]
var log = initRaftLog(1)
var node = initRaftStateMachine(id, 0, log, 0, config, now)
cluster.nodes[id] = node
return cluster
proc advance(tc: var TestCluster, now: times.DateTime) =
for id, node in tc.nodes:
tc.nodes[id].tick(now)
var output = tc.nodes[id].poll()
# for msg in output.debugLogs:
# echo $msg
for msg in output.messages:
#echo "rpc:" & $msg
tc.nodes[msg.receiver].advance(msg, now)
func getLeader(tc: TestCluster): Option[RaftStateMachine] =
for id, node in tc.nodes:
if node.state.isLeader:
return some(node)
return none(RaftStateMachine)
proc consensusstatemachineMain*() =
suite "Basic state machine tests":
test "create state machine":
var cluster = createCluster(test_ids_1, times.now())
echo cluster
test "tick empty state machine":
var timeNow = times.now()
var config = createConfigFromIds(test_ids_1)
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
sm.tick(times.now())
echo sm.poll()
echo sm.poll()
echo getTime()
suite "Entry log tests":
test "append entry as leadeer":
var log = initRaftLog(1)
log.appendAsLeader(0, 1, Command())
log.appendAsLeader(0, 2, Command())
check log.lastTerm() == 0
log.appendAsLeader(1, 2, Command())
check log.lastTerm() == 1
test "append entry as follower":
var log = initRaftLog(1)
log.appendAsFollower(0, 1, Command())
check log.lastTerm() == 0
check log.lastIndex() == 1
check log.entriesCount == 1
log.appendAsFollower(0, 1, Command())
check log.lastTerm() == 0
check log.lastIndex() == 1
check log.entriesCount == 1
discard log.matchTerm(1, 1)
log.appendAsFollower(1, 2, Command())
check log.lastTerm() == 1
check log.lastIndex() == 2
check log.entriesCount == 2
log.appendAsFollower(1, 3, Command())
check log.lastTerm() == 1
check log.lastIndex() == 3
check log.entriesCount == 3
log.appendAsFollower(1, 2, Command())
check log.lastTerm() == 1
check log.lastIndex() == 2
check log.entriesCount == 2
log.appendAsFollower(2, 1, Command())
check log.lastTerm() == 2
check log.lastIndex() == 1
check log.entriesCount == 1
suite "3 node cluster":
var timeNow = times.now()
var cluster = createCluster(test_ids_3, timeNow)
var t = now()
suite "Single node election tracker":
test "unknown":
var votes = initVotes(test_ids_1)
check votes.tallyVote == RaftElectionResult.Unknown
test "win election":
var votes = initVotes(test_ids_1)
discard votes.registerVote(test_ids_1[0], true)
check votes.tallyVote == RaftElectionResult.Won
test "lost election":
var votes = initVotes(test_ids_1)
discard votes.registerVote(test_ids_1[0], false)
echo votes.tallyVote
check votes.tallyVote == RaftElectionResult.Lost
suite "3 nodes election tracker":
test "win election":
var votes = initVotes(test_ids_3)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[0], true)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[1], true)
check votes.tallyVote == RaftElectionResult.Won
test "lose election":
var votes = initVotes(test_ids_3)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[0], false)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[1], true)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[2], true)
check votes.tallyVote == RaftElectionResult.Won
test "lose election":
var votes = initVotes(test_ids_3)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[0], false)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[1], false)
check votes.tallyVote == RaftElectionResult.Lost
test "lose election":
var votes = initVotes(test_ids_3)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[0], true)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[1], false)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[2], false)
check votes.tallyVote == RaftElectionResult.Lost
suite "Single node cluster":
test "election":
var timeNow = times.now()
var config = createConfigFromIds(test_ids_1)
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 99.milliseconds
sm.tick(timeNow)
var output = sm.poll()
check output.logEntries.len == 0
check output.committed.len == 0
check output.messages.len == 0
check sm.state.isFollower
timeNow += 500.milliseconds
sm.tick(timeNow)
output = sm.poll()
check output.logEntries.len == 0
check output.committed.len == 0
check output.messages.len == 0
check sm.state.isLeader
check sm.term == 1
test "append entry":
var timeNow = times.now()
var config = createConfigFromIds(test_ids_1)
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 1000.milliseconds
sm.tick(timeNow)
var output = sm.poll()
check output.logEntries.len == 0
check output.committed.len == 0
check output.messages.len == 0
check sm.state.isLeader
sm.addEntry(Empty())
check sm.poll().messages.len == 0
timeNow += 250.milliseconds
sm.tick(timeNow)
check sm.poll().messages.len == 0
suite "Two nodes cluster":
test "election":
let id1 = test_ids_3[0]
let id2 = test_ids_3[1]
var config = createConfigFromIds(@[id1, id2])
var log = initRaftLog(1)
var timeNow = times.now()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 601.milliseconds
sm.tick(timeNow)
check sm.state.isCandidate
var output = sm.poll()
check output.votedFor.isSome
check output.votedFor.get() == id1
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == true
check sm.state.isLeader
timeNow += 1.milliseconds
# Older messages should be ignored
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: (output.term - 1), voteGranted: true)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == false
check sm.state.isLeader
block:
output = sm.poll()
timeNow += 100.milliseconds
sm.tick(timeNow)
output = sm.poll()
# if the leader get a message with higher term it should become follower
block:
timeNow += 201.milliseconds
sm.tick(timeNow)
output = sm.poll()
let entry = LogEntry(term: (output.term + 1), index: 101, kind: RaftLogEntryType.rletEmpty, empty: true)
let appendRequest = RaftRpcAppendRequest(previousTerm: (output.term + 1), previousLogIndex: 100, commitIndex: 99, entries: @[entry])
let msg = RaftRpcMessage(currentTerm: (output.term + 1), sender: id2, receiver:id1, kind: RaftRpcMessageType.AppendRequest, appendRequest: appendRequest)
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == true
check sm.state.isFollower
suite "3 nodes cluster":
test "election failed":
let mainNodeId = test_ids_3[0]
let id2 = test_ids_3[1]
let id3 = test_ids_3[2]
var config = createConfigFromIds(test_ids_3)
var log = initRaftLog(1)
var timeNow = times.now()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 501.milliseconds
sm.tick(timeNow)
check sm.state.isCandidate
var output = sm.poll()
check output.votedFor.isSome
check output.votedFor.get() == mainNodeId
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == false
check sm.state.isCandidate
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == true
check sm.state.isFollower
timeNow += 1.milliseconds
test "election":
let mainNodeId = test_ids_3[0]
let id2 = test_ids_3[1]
let id3 = test_ids_3[2]
var config = createConfigFromIds(test_ids_3)
var log = initRaftLog(1)
var timeNow = times.now()
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow)
check sm.state.isFollower
timeNow += 501.milliseconds
sm.tick(timeNow)
check sm.state.isCandidate
var output = sm.poll()
check output.votedFor.isSome
check output.votedFor.get() == mainNodeId
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == false
check sm.state.isCandidate
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReplay(currentTerm: output.term, voteGranted: true)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReplay, voteReplay: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == true
check sm.state.isLeader
timeNow += 1.milliseconds
suite "3 nodes cluester":
test "election":
var cluster = createCluster(test_ids_3, times.now())
var timeNow = times.now()
var leader: RaftnodeId
for i in 0..<105:
timeNow += 5.milliseconds
cluster.advance(timeNow)
var maybeLeader = cluster.getLeader()
if leader == RaftnodeId():
if maybeLeader.isSome:
leader = maybeLeader.get().myId
else:
if maybeLeader.isSome:
check leader == maybeLeader.get().myId
else:
check false
if isMainModule:
consensusstatemachineMain()