Merge pull request #8 from NDobrev/master

Raft Initial implementation
This commit is contained in:
zah 2024-03-12 11:32:54 +02:00 committed by GitHub
commit 4bec310615
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
28 changed files with 2471 additions and 282 deletions

170
.github/workflows/ci.yml vendored Normal file
View File

@ -0,0 +1,170 @@
name: CI
on:
push:
branches:
- master
pull_request:
workflow_dispatch:
concurrency: # Cancel stale PR builds (but not push builds)
group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.sha }}
cancel-in-progress: true
jobs:
build:
strategy:
fail-fast: false
matrix:
target:
- os: linux
cpu: amd64
- os: linux
cpu: i386
- os: macos
cpu: amd64
- os: windows
cpu: amd64
#- os: windows
#cpu: i386
branch: [version-1-6, version-2-0, devel]
include:
- target:
os: linux
builder: ubuntu-20.04
shell: bash
- target:
os: macos
builder: macos-12
shell: bash
- target:
os: windows
builder: windows-2019
shell: msys2 {0}
defaults:
run:
shell: ${{ matrix.shell }}
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})'
runs-on: ${{ matrix.builder }}
continue-on-error: ${{ matrix.branch == 'devel' }}
steps:
- name: Checkout
uses: actions/checkout@v3
- name: Install build dependencies (Linux i386)
if: runner.os == 'Linux' && matrix.target.cpu == 'i386'
run: |
sudo dpkg --add-architecture i386
sudo apt-fast update -qq
sudo DEBIAN_FRONTEND='noninteractive' apt-fast install \
--no-install-recommends -yq gcc-multilib g++-multilib \
libssl-dev:i386
mkdir -p external/bin
cat << EOF > external/bin/gcc
#!/bin/bash
exec $(which gcc) -m32 "\$@"
EOF
cat << EOF > external/bin/g++
#!/bin/bash
exec $(which g++) -m32 "\$@"
EOF
chmod 755 external/bin/gcc external/bin/g++
echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH
- name: 'Install dependencies (macOS)'
if: runner.os == 'macOS' && matrix.branch == 'devel'
run: |
brew install openssl@1.1
ln -s $(brew --prefix)/opt/openssl/lib/libcrypto.1.1.dylib /usr/local/lib
ln -s $(brew --prefix)/opt/openssl/lib/libssl.1.1.dylib /usr/local/lib/
- name: MSYS2 (Windows i386)
if: runner.os == 'Windows' && matrix.target.cpu == 'i386'
uses: msys2/setup-msys2@v2
with:
path-type: inherit
msystem: MINGW32
install: >-
base-devel
git
mingw-w64-i686-toolchain
- name: MSYS2 (Windows amd64)
if: runner.os == 'Windows' && matrix.target.cpu == 'amd64'
uses: msys2/setup-msys2@v2
with:
path-type: inherit
install: >-
base-devel
git
mingw-w64-x86_64-toolchain
- name: Restore Nim DLLs dependencies (Windows) from cache
if: runner.os == 'Windows'
id: windows-dlls-cache
uses: actions/cache@v3
with:
path: external/dlls-${{ matrix.target.cpu }}
key: 'dlls-${{ matrix.target.cpu }}'
- name: Install DLLs dependencies (Windows)
if: >
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
runner.os == 'Windows'
run: |
mkdir -p external
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }}
- name: Path to cached dependencies (Windows)
if: >
runner.os == 'Windows'
run: |
echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH
- name: Derive environment variables
run: |
if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then
PLATFORM=x64
else
PLATFORM=x86
fi
echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV
ncpu=
MAKE_CMD="make"
case '${{ runner.os }}' in
'Linux')
ncpu=$(nproc)
;;
'macOS')
ncpu=$(sysctl -n hw.ncpu)
;;
'Windows')
ncpu=$NUMBER_OF_PROCESSORS
MAKE_CMD="mingw32-make"
;;
esac
[[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1
echo "ncpu=$ncpu" >> $GITHUB_ENV
echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV
- name: Build Nim and Nimble
run: |
env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} \
NIM_COMMIT=${{ matrix.branch }} \
NIMBLE_COMMIT=a4fc798838ee753f5485dd19afab22e9367eb0e7 \
QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \
scripts/ci/build_nim.sh nim csources dist/nimble-latest NimBinaries
echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH
- name: Run tests
run: |
if [[ "${{ matrix.target.os }}" == "windows" ]]; then
# https://github.com/status-im/nimbus-eth2/issues/3121
export NIMFLAGS="-d:nimRawSetjmp"
fi
nim --version
nimble --version
nimble test

2
.gitignore vendored
View File

@ -6,3 +6,5 @@
*.out *.out
nimcache/ nimcache/
build/ build/
nimbledeps/
.VSCodeCounter

7
.vscode/settings.json vendored Normal file
View File

@ -0,0 +1,7 @@
{
"nim.projectMapping": [{
"projectFile": "raft.nim",
"fileRegex": ".*\\.nim"
}
]
}

View File

@ -5,3 +5,19 @@ This project aims to develop an implementation of the Raft consensus protocol th
We plan to leverage the implementation to create a highly-efficient setup for operating a redundant set of Nimbus beacon nodes and/or validator clients that rely on BLS threshold signatures to achieve improved resilience and security. Further details can be found in our roadmap here: We plan to leverage the implementation to create a highly-efficient setup for operating a redundant set of Nimbus beacon nodes and/or validator clients that rely on BLS threshold signatures to achieve improved resilience and security. Further details can be found in our roadmap here:
https://github.com/status-im/nimbus-eth2/issues/3416 https://github.com/status-im/nimbus-eth2/issues/3416
This project is heavily inspired by Raft implementation in ScyllaDB
https://github.com/scylladb/scylladb/tree/master/raft
# Design goals
The main goal is to separate implementation of the raft state machin from the other implementation details like (storage, rpc etc)
In order to achive this we want to keep the State machine absolutly deterministic every interaction the the world like
networking, logging, acquiring current time, random number generation, disc operation etc must happened trough the state machine interface.
It will ensure better testability and integrability.
# Run test
`nimble test`

109
config.nim Normal file
View File

@ -0,0 +1,109 @@
# Set up paths
--noNimblePath
when withDir(thisDir(), system.fileExists("nimble.paths")):
include "nimble.paths"
--path:"src"
# Turn off `libbacktrace`
--define:disable_libbacktrace
# Configuration synced with nwaku's - https://github.com/waku-org/nwaku/blob/master/config.nims
# ---------------------------------------------------- nwaku config ----------------------------------------------------
if defined(release):
switch("nimcache", thisDir() & "/nimcache/release/$projectName")
else:
switch("nimcache", thisDir() & "/nimcache/debug/$projectName")
if defined(windows):
# disable timestamps in Windows PE headers - https://wiki.debian.org/ReproducibleBuilds/TimestampsInPEBinaries
switch("passL", "-Wl,--no-insert-timestamp")
# increase stack size
switch("passL", "-Wl,--stack,8388608")
# https://github.com/nim-lang/Nim/issues/4057
--tlsEmulation:off
if defined(i386):
# set the IMAGE_FILE_LARGE_ADDRESS_AWARE flag so we can use PAE, if enabled, and access more than 2 GiB of RAM
switch("passL", "-Wl,--large-address-aware")
# The dynamic Chronicles output currently prevents us from using colors on Windows
# because these require direct manipulations of the stdout File object.
switch("define", "chronicles_colors=off")
# https://github.com/status-im/nimbus-eth2/blob/stable/docs/cpu_features.md#ssse3-supplemental-sse3
# suggests that SHA256 hashing with SSSE3 is 20% faster than without SSSE3, so
# given its near-ubiquity in the x86 installed base, it renders a distribution
# build more viable on an overall broader range of hardware.
#
if defined(disableMarchNative):
if defined(i386) or defined(amd64):
if defined(macosx):
# macOS Catalina is EOL as of 2022-09
# https://support.apple.com/kb/sp833
# "macOS Big Sur - Technical Specifications" lists current oldest
# supported models: MacBook (2015 or later), MacBook Air (2013 or later),
# MacBook Pro (Late 2013 or later), Mac mini (2014 or later), iMac (2014
# or later), iMac Pro (2017 or later), Mac Pro (2013 or later).
#
# These all have Haswell or newer CPUs.
#
# This ensures AVX2, AES-NI, PCLMUL, BMI1, and BMI2 instruction set support.
switch("passC", "-march=haswell -mtune=generic")
switch("passL", "-march=haswell -mtune=generic")
else:
if defined(marchOptimized):
# https://github.com/status-im/nimbus-eth2/blob/stable/docs/cpu_features.md#bmi2--adx
switch("passC", "-march=broadwell -mtune=generic")
switch("passL", "-march=broadwell -mtune=generic")
else:
switch("passC", "-mssse3")
switch("passL", "-mssse3")
elif defined(macosx) and defined(arm64):
# Apple's Clang can't handle "-march=native" on M1: https://github.com/status-im/nimbus-eth2/issues/2758
switch("passC", "-mcpu=apple-m1")
switch("passL", "-mcpu=apple-m1")
else:
switch("passC", "-march=native")
switch("passL", "-march=native")
if defined(windows):
# https://gcc.gnu.org/bugzilla/show_bug.cgi?id=65782
# ("-fno-asynchronous-unwind-tables" breaks Nim's exception raising, sometimes)
switch("passC", "-mno-avx512f")
switch("passL", "-mno-avx512f")
--threads:on
--opt:speed
--excessiveStackTrace:on
# enable metric collection
--define:metrics
# for heap-usage-by-instance-type metrics and object base-type strings
--define:nimTypeNames
switch("define", "withoutPCRE")
# the default open files limit is too low on macOS (512), breaking the
# "--debugger:native" build. It can be increased with `ulimit -n 1024`.
if not defined(macosx):
# add debugging symbols and original files and line numbers
--debugger:native
--define:nimOldCaseObjects # https://github.com/status-im/nim-confutils/issues/9
# `switch("warning[CaseTransition]", "off")` fails with "Error: invalid command line option: '--warning[CaseTransition]'"
switch("warning", "CaseTransition:off")
# The compiler doth protest too much, methinks, about all these cases where it can't
# do its (N)RVO pass: https://github.com/nim-lang/RFCs/issues/230
switch("warning", "ObservableStores:off")
# Too many false positives for "Warning: method has lock level <unknown>, but another method has 0 [LockLevel]"
switch("warning", "LockLevel:off")
# ----------------------------------------------------------------------------------------------------------------------
# Discovery configuration
switch("define", "discv5_protocol_id=d5waku")
# Logging configuration
--define:chronicles_line_numbers
switch("define", "chronicles_log_level=DEBUG")
switch("define", "chronicles_runtime_filtering=on")

Binary file not shown.

Binary file not shown.

8
nim.projectMapping Normal file
View File

@ -0,0 +1,8 @@
{
"nim.provider": "lsp",
"nim.projectMapping": [{
// everything else - use main.nim as root.
"projectFile": "raft.nim",
"fileRegex": ".*\\.nim"
}]
}

155
nimble.lock Normal file
View File

@ -0,0 +1,155 @@
{
"version": 2,
"packages": {
"unittest2": {
"version": "0.2.1",
"vcsRevision": "262b697f38d6b6f1e7462d3b3ab81d79b894e336",
"url": "https://github.com/status-im/nim-unittest2",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "1bac3a8355441edeed1ef3134e7436d6fb5d4498"
}
},
"stew": {
"version": "0.1.0",
"vcsRevision": "9958aac68a7613a3312fa96dd2f3b29caf17772e",
"url": "https://github.com/status-im/nim-stew",
"downloadMethod": "git",
"dependencies": [
"unittest2"
],
"checksums": {
"sha1": "4eb2b0c4b0fe9817ee19202e8723d46c284f2875"
}
},
"isaac": {
"version": "0.1.3",
"vcsRevision": "45a5cbbd54ff59ba3ed94242620c818b9aad1b5b",
"url": "https://github.com/pragmagic/isaac/",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "05c3583a954715d84b0bf1be97f9a503249e9cdf"
}
},
"faststreams": {
"version": "0.3.0",
"vcsRevision": "422971502bd641703bf78a27cb20429e77fcfb8b",
"url": "https://github.com/status-im/nim-faststreams",
"downloadMethod": "git",
"dependencies": [
"stew",
"unittest2"
],
"checksums": {
"sha1": "64045de53dade90c36ba5a75f51603725c5b0f30"
}
},
"serialization": {
"version": "0.2.2",
"vcsRevision": "4d541ec43454809904fc4c3c0a7436410ad597d2",
"url": "https://github.com/status-im/nim-serialization.git",
"downloadMethod": "git",
"dependencies": [
"faststreams",
"unittest2",
"stew"
],
"checksums": {
"sha1": "1dcdb29f17d0aff295e7e57edf530b1e16fb6c59"
}
},
"json_serialization": {
"version": "0.2.2",
"vcsRevision": "3f1ce24ee116daedbc9c8be525e63ec03e185a28",
"url": "https://github.com/status-im/nim-json-serialization.git",
"downloadMethod": "git",
"dependencies": [
"serialization",
"stew"
],
"checksums": {
"sha1": "da0d38b775f222703784b273225fe89267430482"
}
},
"httputils": {
"version": "0.3.0",
"vcsRevision": "3b491a40c60aad9e8d3407443f46f62511e63b18",
"url": "https://github.com/status-im/nim-http-utils",
"downloadMethod": "git",
"dependencies": [
"stew",
"unittest2"
],
"checksums": {
"sha1": "1331f33585eda05d1e50385fa7871c3bf2a449d7"
}
},
"uuids": {
"version": "0.1.12",
"vcsRevision": "42052ba362a9cd4685463edb3781beeb9b8e547e",
"url": "https://github.com/pragmagic/uuids/",
"downloadMethod": "git",
"dependencies": [
"isaac"
],
"checksums": {
"sha1": "154a31d6f5428c2863c48a057b7143ff9a6e4613"
}
},
"testutils": {
"version": "0.5.0",
"vcsRevision": "dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34",
"url": "https://github.com/status-im/nim-testutils",
"downloadMethod": "git",
"dependencies": [
"unittest2"
],
"checksums": {
"sha1": "756d0757c4dd06a068f9d38c7f238576ba5ee897"
}
},
"bearssl": {
"version": "0.2.1",
"vcsRevision": "d55d3a86d7ec3ad11b244e17b3bad490bfbd076d",
"url": "https://github.com/status-im/nim-bearssl.git",
"downloadMethod": "git",
"dependencies": [
"unittest2"
],
"checksums": {
"sha1": "5327c983483c4dd465347c6b8a974239c7c6c612"
}
},
"chronicles": {
"version": "0.10.3",
"vcsRevision": "ccbb7566d1a06bfc1ec42dd8da74a47f1d3b3f4b",
"url": "https://github.com/status-im/nim-chronicles.git",
"downloadMethod": "git",
"dependencies": [
"testutils",
"json_serialization"
],
"checksums": {
"sha1": "09ae5c46be94aa60d2b0ca80c215a142f94e3603"
}
},
"chronos": {
"version": "3.2.0",
"vcsRevision": "ba143e029f35fd9b4cd3d89d007cc834d0d5ba3c",
"url": "https://github.com/status-im/nim-chronos",
"downloadMethod": "git",
"dependencies": [
"stew",
"bearssl",
"httputils",
"unittest2"
],
"checksums": {
"sha1": "5783067584ac6812eb64b8454ea6f9c97ff1262a"
}
}
},
"tasks": {}
}

View File

@ -14,14 +14,17 @@ version = "0.0.1"
author = "Status Research & Development GmbH" author = "Status Research & Development GmbH"
description = "raft consensus in nim" description = "raft consensus in nim"
license = "Apache License 2.0" license = "Apache License 2.0"
srcDir = "src"
installExt = @["nim"]
skipDirs = @["tests"] skipDirs = @["tests"]
bin = @["raft"]
requires "nim >= 1.6.0"
requires "nim >= 1.6.14"
requires "stew >= 0.1.0" requires "stew >= 0.1.0"
requires "nimcrypto >= 0.5.4"
requires "unittest2 >= 0.0.4" requires "unittest2 >= 0.0.4"
requires "chronicles >= 0.10.2" requires "uuids >= 0.1.11"
requires "eth >= 1.0.0" requires "chronicles >= 0.10.3"
requires "chronos >= 3.2.0" requires "chronos >= 3.0.11"
# Helper functions

19
raft.nims Normal file
View File

@ -0,0 +1,19 @@
proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") =
if not dirExists "build":
mkDir "build"
# allow something like "nim nimbus --verbosity:0 --hints:off nimbus.nims"
var extra_params = params
for i in 2..<paramCount():
extra_params &= " " & paramStr(i)
if `type` == "static":
exec "nim c" & " --out:build/" & name & ".a --threads:on --app:staticlib --opt:size --noMain --header " & extra_params & " " & srcDir & name & ".nim"
else:
exec "nim c" & " --out:build/" & name & ".so --threads:on --app:lib --opt:size --noMain --header " & extra_params & " " & srcDir & name & ".nim"
task build, "Build static lib":
buildLibrary "raft", "src/"
# echo "pesho"
task test, "Run tests":
exec "nim c -r tests/test_consensus_state_machine.nim "

View File

@ -1,8 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

View File

@ -1,8 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

View File

@ -1,8 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

View File

@ -1,8 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.

View File

@ -1,48 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
# #
# Raft Messages Protocol definition #
# #
import types
import options
type
# Raft Node Messages OPs
RaftMessageOps* = enum
rmoRequestVote = 0,
rmoAppendLogEntry = 1,
rmoInstallSnapshot = 2 # For dynamic adding of new Raft Nodes
RaftMessagePayloadChecksum* = object # Checksum probably will be a SHA3 hash not sure about this at this point
RaftMessagePayload*[LogEntryDataType] = ref object
data*: RaftNodeLogEntry[LogEntryDataType]
checksum*: RaftMessagePayloadChecksum
RaftMessage*[LogEntryDataType] = ref object of RaftMessageBase
op*: RaftMessageOps # Message Op - Ask For Votes, Append Entry(ies), Install Snapshot etc.
payload*: Option[seq[RaftMessagePayload[LogEntryDataType]]] # Optional Message Payload(s) - e.g. log entry(ies). Will be empty for a Heart-Beat # Heart-Beat will be a message with Append Entry(ies) Op and empty payload
RaftMessageResponse*[SmStateType] = ref object of RaftMessageBase
success*: bool # Indicates success/failure
state*: Option[SmStateType] # Raft Abstract State Machine State
# Raft Node Client Request/Response definitions
RaftNodeClientRequestOps = enum
rncroRequestState = 0,
rncroAppendNewEntry = 1
RaftNodeClientRequest*[LogEntryDataType] = ref object
op*: RaftNodeClientRequestOps
payload*: Option[RaftMessagePayload[LogEntryDataType]] # Optional RaftMessagePayload carrying a Log Entry
RaftNodeClientResponse*[SmStateType] = ref object
success*: bool # Indicate succcess
state*: Option[SmStateType] # Optional Raft Abstract State Machine State
raftNodeRedirectId*: Option[RaftNodeId] # Optional Raft Node ID to redirect the request to in case of failure

View File

@ -1,70 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import types
import protocol
export types, protocol
# Raft Node Public API procedures / functions
proc RaftNodeCreateNew*[LogEntryDataType, SmStateType]( # Create New Raft Node
id: RaftNodeId, peers: RaftNodePeers,
persistentStorage: RaftNodePersistentStorage,
msgSendCallback: RaftMessageSendCallback): RaftNode[LogEntryDataType, SmStateType] =
discard
proc RaftNodeLoad*[LogEntryDataType, SmStateType](
persistentStorage: RaftNodePersistentStorage, # Load Raft Node From Storage
msgSendCallback: RaftMessageSendCallback): Result[RaftNode[LogEntryDataType, SmStateType], string] =
discard
proc RaftNodeStop*(node: RaftNode) =
discard
proc RaftNodeStart*(node: RaftNode) =
discard
func RaftNodeIdGet*(node: RaftNode): RaftNodeId = # Get Raft Node ID
discard
func RaftNodeStateGet*(node: RaftNode): RaftNodeState = # Get Raft Node State
discard
func RaftNodeTermGet*(node: RaftNode): RaftNodeTerm = # Get Raft Node Term
discard
func RaftNodePeersGet*(node: RaftNode): RaftNodePeers = # Get Raft Node Peers
discard
func RaftNodeIsLeader*(node: RaftNode): bool = # Check if Raft Node is Leader
discard
proc RaftNodeMessageDeliver*(node: RaftNode, raftMessage: RaftMessageBase): RaftMessageResponse {.discardable.} = # Deliver Raft Message to the Raft Node
discard
proc RaftNodeRequest*(node: RaftNode, req: RaftNodeClientRequest): RaftNodeClientResponse = # Process RaftNodeClientRequest
discard
proc RaftNodeLogIndexGet*(node: RaftNode): RaftLogIndex =
discard
proc RaftNodeLogEntryGet*(node: RaftNode, logIndex: RaftLogIndex): Result[RaftNodeLogEntry, string] =
discard
# Abstract State Machine Ops
func RaftNodeSmStateGet*[LogEntryDataType, SmStateType](node: RaftNode[LogEntryDataType, SmStateType]): SmStateType =
node.stateMachine.state
proc RaftNodeSmInit[LogEntryDataType, SmStateType](stateMachine: var RaftNodeStateMachine[LogEntryDataType, SmStateType]) =
mixin RaftSmInit
RaftSmInit(stateMachine)
proc RaftNodeSmApply[LogEntryDataType, SmStateType](stateMachine: RaftNodeStateMachine[LogEntryDataType, SmStateType], logEntry: LogEntryDataType) =
mixin RaftSmApply
RaftSmApply(stateMachine, logEntry)

View File

@ -1,120 +0,0 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
# Raft Node Public Types.
# I guess that at some point these can be moved to a separate file called raft_consensus_types.nim for example
import std/locks
import stew/results
import eth/keyfile
export results
type
# Raft Node basic definitions
Blob* = seq[byte]
RaftNodeState* = enum
UNKNOWN = 0,
FOLLOWER = 1,
LEADER = 2
RaftNodeId* = UUID # UUID uniquely identifying every Raft Node
RaftNodePeers* = seq[RaftNodeId] # List of Raft Node Peers IDs
RaftNodeTerm* = uint64 # Raft Node Term Type
RaftLogIndex* = uint64 # Raft Node Log Index Type
# Raft Node Abstract State Machine type
RaftNodeStateMachine*[LogEntryDataType, SmStateType] = ref object # Some probably opaque State Machine Impelementation to be used by the Raft Node
# providing at minimum operations for initialization, querying the current state
# and RaftNodeLogEntry application
state: SmStateType
# Raft Node Persistent Storage basic definition
RaftNodePersistentStorage* = ref object # Should be some kind of Persistent Transactional Store Wrapper
# Basic modules (algos) definitions
RaftNodeAccessCallback[LogEntryDataType] = proc: RaftNode[LogEntryDataType] {.nimcall, gcsafe.} # This should be implementes as a closure holding the RaftNode
RaftConsensusModule*[LogEntryDataType] = object of RootObj
stateTransitionsFsm: seq[byte] # I plan to use nim.fsm https://github.com/ba0f3/fsm.nim
raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType]
RaftLogCompactionModule*[LogEntryDataType] = object of RootObj
raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType]
RaftMembershipChangeModule*[LogEntryDataType] = object of RootObj
raftNodeAccessCallback: RaftNodeAccessCallback[LogEntryDataType]
# Callback for sending messages out of this Raft Node
RaftMessageId* = UUID # UUID assigned to every Raft Node Message,
# so it can be matched with it's corresponding response etc.
RaftMessageSendCallback* = proc (raftMessage: RaftMessageBase) {.nimcall, gcsafe.} # Callback for Sending Raft Node Messages
# out of this Raft Node. Can be used for broadcasting
# (a Heart-Beat for example)
# Raft Node basic Log definitions
RaftNodeLogEntry*[LogEntryDataType] = ref object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
data*: LogEntryDataType
RaftNodeLog*[LogEntryDataType] = ref object # Needs more elaborate definition.
# Probably this will be a RocksDB/MDBX/SQLite Store Wrapper etc.
logData*: seq[RaftNodeLogEntry[LogEntryDataType]] # Raft Node Log Data
# Base type for Raft message objects
RaftMessageBase* = ref object of RootObj # Base Type for Raft Node Messages
msgId*: RaftMessageId # Message UUID
senderId*: RaftNodeId # Sender Raft Node ID
senderTerm*: RaftNodeTerm # Sender Raft Node Term
peers*: RaftNodePeers # List of Raft Node IDs, which should receive this message
# Raft Node Object type
RaftNode*[LogEntryDataType, SmStateType] = ref object
# Timers
votingTimout: uint64
heartBeatTimeout: uint64
# etc. timers
# Mtx definitions go here
raftStateMutex: Lock
raftLogMutex: Lock
raftCommMutexReceiveMsg: Lock
raftCommMutexClientResponse: Lock
# Modules (Algos)
consensusModule: RaftConsensusModule[LogEntryDataType]
logCompactionModule: RaftLogCompactionModule[LogEntryDataType]
membershipChangeModule: RaftMembershipChangeModule[LogEntryDataType]
# Misc
msgSendCallback: RaftMessageSendCallback
persistentStorage: RaftNodePersistentStorage
# Persistent state
id: RaftNodeId # This Raft Node ID
state: RaftNodeState # This Raft Node State
currentTerm: RaftNodeTerm # Latest term this Raft Node has seen (initialized to 0 on first boot, increases monotonically)
log: RaftNodeLog[LogEntryDataType] # This Raft Node Log
votedFor: RaftNodeId # Candidate RaftNodeId that received vote in current term (or nil/zero if none),
# also used to redirect Client Requests in case this Raft Node is not the leader
peers: RaftNodePeers # This Raft Node Peers IDs. I am not sure if this must be persistent or volatile but making it persistent
# makes sense for the moment
stateMachine: RaftNodeStateMachine[LogEntryDataType, SmStateType] # Not sure for now putting it here. I assume that persisting the State Machine's
# state is enough to consider it 'persisted'
# Volatile state
commitIndex: RaftLogIndex # Index of highest log entry known to be committed (initialized to 0, increases monotonically)
lastApplied: RaftLogIndex # Index of highest log entry applied to state machine (initialized to 0, increases monotonically)
# Volatile state on leaders
nextIndex: seq[RaftLogIndex] # For each peer Raft Node, index of the next log entry to send to that Node
# (initialized to leader last log index + 1)
matchIndex: seq[RaftLogIndex] # For each peer Raft Node, index of highest log entry known to be replicated on Node
# (initialized to 0, increases monotonically)

297
scripts/ci/build_nim.sh Executable file
View File

@ -0,0 +1,297 @@
#!/usr/bin/env bash
# used in Travis CI and AppVeyor scripts
# Copyright (c) 2018-2020 Status Research & Development GmbH. Licensed under
# either of:
# - Apache License, version 2.0
# - MIT license
# at your option. This file may not be copied, modified, or distributed except
# according to those terms.
set -e
# NIM_COMMIT could be a (partial) commit hash, a tag, a branch name, etc. Empty by default.
NIM_COMMIT_HASH="" # full hash for NIM_COMMIT, retrieved in "nim_needs_rebuilding()"
# script arguments
[[ $# -ne 4 ]] && { echo "Usage: $0 nim_dir csources_dir nimble_dir ci_cache_dir"; exit 1; }
NIM_DIR="$1"
CSOURCES_DIR="$2" # can be relative to NIM_DIR; only used when `skipIntegrityCheck` unsupported
NIMBLE_DIR="$3" # can be relative to NIM_DIR; only used when `skipIntegrityCheck` unsupported
CI_CACHE="$4"
## env vars
# verbosity level
[[ -z "$V" ]] && V=0
[[ -z "$CC" ]] && CC="gcc"
# to build csources in parallel, set MAKE="make -jN"
[[ -z "$MAKE" ]] && MAKE="make"
# for 32-bit binaries on a 64-bit host
UCPU=""
[[ "$ARCH_OVERRIDE" == "x86" ]] && UCPU="ucpu=i686"
[[ -z "$NIM_BUILD_MSG" ]] && NIM_BUILD_MSG="Building the Nim compiler"
[[ -z "$QUICK_AND_DIRTY_COMPILER" ]] && QUICK_AND_DIRTY_COMPILER=0
[[ -z "$QUICK_AND_DIRTY_NIMBLE" ]] && QUICK_AND_DIRTY_NIMBLE=0
# Windows detection
if uname | grep -qiE "mingw|msys"; then
ON_WINDOWS=1
EXE_SUFFIX=".exe"
# otherwise it fails in AppVeyor due to https://github.com/git-for-windows/git/issues/2495
GIT_TIMESTAMP_ARG="--date=unix" # available since Git 2.9.4
else
ON_WINDOWS=0
EXE_SUFFIX=""
GIT_TIMESTAMP_ARG="--date=format-local:%s" # available since Git 2.7.0
fi
NIM_BINARY="${NIM_DIR}/bin/nim${EXE_SUFFIX}"
MAX_NIM_BINARIES="10" # Old ones get deleted.
nim_needs_rebuilding() {
REBUILD=0
NO_REBUILD=1
if [[ ! -e "$NIM_DIR" ]]; then
# Shallow clone, optimised for the default NIM_COMMIT value.
git clone -q --depth=1 https://github.com/status-im/Nim.git "$NIM_DIR"
fi
pushd "${NIM_DIR}" >/dev/null
if [[ -n "${NIM_COMMIT}" ]]; then
# support old Git versions, like the one from Ubuntu-18.04
git restore . 2>/dev/null || git reset --hard
if ! git checkout -q ${NIM_COMMIT} 2>/dev/null; then
# Pay the price for a non-default NIM_COMMIT here, by fetching everything.
# (This includes upstream branches and tags that might be missing from our fork.)
git remote add upstream https://github.com/nim-lang/Nim
git fetch --all --tags --quiet
git checkout -q ${NIM_COMMIT}
fi
# In case the local branch diverged and a fast-forward merge is not possible.
git fetch || true
git reset -q --hard origin/${NIM_COMMIT} 2>/dev/null || true
# In case NIM_COMMIT is a local branch that's behind the remote one it's tracking.
git pull -q 2>/dev/null || true
git checkout -q ${NIM_COMMIT}
# We can't use "rev-parse" here, because it would return the tag object's
# hash instead of the commit hash, when NIM_COMMIT is a tag.
NIM_COMMIT_HASH="$(git rev-list -n 1 ${NIM_COMMIT})"
else
# NIM_COMMIT is empty, so assume the commit we need is already checked out
NIM_COMMIT_HASH="$(git rev-list -n 1 HEAD)"
fi
popd >/dev/null
if [[ -n "$CI_CACHE" && -d "$CI_CACHE" ]]; then
cp -a "$CI_CACHE"/* "$NIM_DIR"/bin/ || true # let this one fail with an empty cache dir
fi
# Delete old Nim binaries, to put a limit on how much storage we use.
for F in "$(ls -t "${NIM_DIR}"/bin/nim_commit_* 2>/dev/null | tail -n +$((MAX_NIM_BINARIES + 1)))"; do
if [[ -e "${F}" ]]; then
rm "${F}"
fi
done
# Compare the last built commit to the one requested.
# Handle the scenario where our symlink is manually deleted by the user.
if [[ -e "${NIM_DIR}/bin/last_built_commit" && \
-e "${NIM_DIR}/bin/nim${EXE_SUFFIX}" && \
"$(cat "${NIM_DIR}/bin/last_built_commit")" == "${NIM_COMMIT_HASH}" ]]; then
return $NO_REBUILD
elif [[ -e "${NIM_DIR}/bin/nim_commit_${NIM_COMMIT_HASH}" ]]; then
# we built the requested commit in the past, so we simply reuse it
rm -f "${NIM_DIR}/bin/nim${EXE_SUFFIX}"
ln -s "nim_commit_${NIM_COMMIT_HASH}" "${NIM_DIR}/bin/nim${EXE_SUFFIX}"
echo ${NIM_COMMIT_HASH} > "${NIM_DIR}/bin/last_built_commit"
return $NO_REBUILD
else
return $REBUILD
fi
}
build_nim() {
echo -e "$NIM_BUILD_MSG"
# [[ "$V" == "0" ]] && exec &>/dev/null
# working directory
pushd "$NIM_DIR"
if grep -q "skipIntegrityCheck" koch.nim && [ "${NIM_COMMIT}" != "version-1-6" ]; then
echo "in if"
# Run Nim buildchain, with matching dependency versions
# - CSOURCES_REPO from Nim/config/build_config.txt (nim_csourcesUrl)
# - CSOURCES_COMMIT from Nim/config/build_config.txt (nim_csourcesHash)
# - NIMBLE_REPO from Nim/koch.nim (bundleNimbleExe)
# - NIMBLE_COMMIT from Nim/koch.nim (NimbleStableCommit)
. ci/funs.sh
NIMCORES=1 nimBuildCsourcesIfNeeded $UCPU
bin/nim c --noNimblePath --skipUserCfg --skipParentCfg --warnings:off --hints:off koch
./koch --skipIntegrityCheck boot -d:release --skipUserCfg --skipParentCfg --warnings:off --hints:off
if [[ "${QUICK_AND_DIRTY_COMPILER}" == "0" ]]; then
# We want tools
./koch tools -d:release --skipUserCfg --skipParentCfg --warnings:off --hints:off
elif [[ "${QUICK_AND_DIRTY_NIMBLE}" != "0" ]]; then
# We just want nimble
./koch nimble -d:release --skipUserCfg --skipParentCfg --warnings:off --hints:off
fi
else
# Git commits
echo "in else"
: ${CSOURCES_V1_COMMIT:=561b417c65791cd8356b5f73620914ceff845d10}
: ${CSOURCES_V2_COMMIT:=86742fb02c6606ab01a532a0085784effb2e753e}
: ${CSOURCES_V1_REPO:=https://github.com/nim-lang/csources_v1.git}
: ${CSOURCES_V2_REPO:=https://github.com/nim-lang/csources_v2.git}
# After this Nim commit, use csources v2
: ${CSOURCES_V2_START_COMMIT:=f7c203fb6c89b5cef83c4f326aeb23ef8c4a2c40}
: ${NIMBLE_REPO:=https://github.com/nim-lang/nimble.git}
: ${NIMBLE_COMMIT:=a4fc798838ee753f5485dd19afab22e9367eb0e7} # 0.13.1
# Custom buildchain for older versions
# TODO Remove this once the default NIM_COMMIT supports `--skipIntegrityCheck`
# We will still be able to compile older versions by removing the flag,
# which will just waste a bit of CPU
# Git repos for csources and Nimble
if [[ ! -d "$CSOURCES_DIR" ]]; then
if git merge-base --is-ancestor $CSOURCES_V2_START_COMMIT $NIM_COMMIT_HASH; then
CSOURCES_REPO=$CSOURCES_V2_REPO
CSOURCES_COMMIT=$CSOURCES_V2_COMMIT
else
CSOURCES_REPO=$CSOURCES_V1_REPO
CSOURCES_COMMIT=$CSOURCES_V1_COMMIT
fi
mkdir -p "$CSOURCES_DIR"
pushd "$CSOURCES_DIR"
git clone $CSOURCES_REPO .
git checkout $CSOURCES_COMMIT
popd
fi
if [[ "$CSOURCES_DIR" != "csources" ]]; then
rm -rf csources
ln -s "$CSOURCES_DIR" csources
fi
# bootstrap the Nim compiler and build the tools
rm -f bin/{nim,nim_csources}
pushd csources
if [[ "$ON_WINDOWS" == "0" ]]; then
$MAKE $UCPU clean
$MAKE $UCPU LD=$CC
else
$MAKE myos=windows $UCPU clean
$MAKE myos=windows $UCPU CC=gcc LD=gcc
fi
popd
if [[ -e csources/bin ]]; then
rm -f bin/nim bin/nim_csources
cp -a csources/bin/nim bin/nim
cp -a csources/bin/nim bin/nim_csources
rm -rf csources/bin
else
cp -a bin/nim bin/nim_csources
fi
if [[ "$QUICK_AND_DIRTY_COMPILER" == "0" ]]; then
sed \
-e 's/koch$/--warnings:off --hints:off koch/' \
-e 's/koch boot/koch boot --warnings:off --hints:off/' \
-e '/nimBuildCsourcesIfNeeded/d' \
build_all.sh > build_all_custom.sh
sh build_all_custom.sh
rm build_all_custom.sh
else
# Don't re-build it multiple times until we get identical
# binaries, like "build_all.sh" does. Don't build any tools
# either. This is all about build speed, not developer comfort.
bin/nim_csources \
c \
--compileOnly \
--nimcache:nimcache \
-d:release \
--skipUserCfg \
--skipParentCfg \
--warnings:off \
--hints:off \
compiler/nim.nim
bin/nim_csources \
jsonscript \
--nimcache:nimcache \
--skipUserCfg \
--skipParentCfg \
compiler/nim.nim
cp -a compiler/nim bin/nim1
# If we stop here, we risk ending up with a buggy compiler:
# https://github.com/status-im/nimbus-eth2/pull/2220
# https://github.com/status-im/nimbus-eth2/issues/2310
bin/nim1 \
c \
--compileOnly \
--nimcache:nimcache \
-d:release \
--skipUserCfg \
--skipParentCfg \
--warnings:off \
--hints:off \
compiler/nim.nim
bin/nim1 \
jsonscript \
--nimcache:nimcache \
--skipUserCfg \
--skipParentCfg \
compiler/nim.nim
rm -f bin/nim
cp -a compiler/nim bin/nim
rm bin/nim1
if [[ ! -d "$NIMBLE_DIR" ]]; then
mkdir -p "$NIMBLE_DIR"
pushd "$NIMBLE_DIR"
git clone $NIMBLE_REPO .
git checkout $NIMBLE_COMMIT
pwd
../../bin/nim r src/nimblepkg/private/clone.nim
# we have to delete .git or koch.nim will checkout a branch tip, overriding our target commit
rm -rf .git
popd
fi
if [[ "$NIMBLE_DIR" != "dist/nimble" ]]; then
mkdir -p dist
rm -rf dist/nimble
ln -s ../"$NIMBLE_DIR" dist/nimble
fi
# Do we want Nimble in this quick build?
if [[ "${QUICK_AND_DIRTY_NIMBLE}" != "0" ]]; then
bin/nim c -d:release --noNimblePath --skipUserCfg --skipParentCfg dist/nimble/src/nimble.nim
mv dist/nimble/src/nimble bin/
fi
fi
fi
if [[ "$QUICK_AND_DIRTY_COMPILER" == "0" || "${QUICK_AND_DIRTY_NIMBLE}" != "0" ]]; then
# Nimble needs a CA cert
rm -f bin/cacert.pem
curl -LsS -o bin/cacert.pem https://curl.se/ca/cacert.pem || echo "Warning: 'curl' failed to download a CA cert needed by Nimble. Ignoring it."
fi
# record the built commit
echo ${NIM_COMMIT_HASH} > bin/last_built_commit
# create the symlink
mv bin/nim bin/nim_commit_${NIM_COMMIT_HASH}
ln -s nim_commit_${NIM_COMMIT_HASH} bin/nim${EXE_SUFFIX}
# update the CI cache
popd # we were in $NIM_DIR
if [[ -n "$CI_CACHE" ]]; then
rm -rf "$CI_CACHE"
mkdir "$CI_CACHE"
cp "$NIM_DIR"/bin/* "$CI_CACHE"/
fi
}
if nim_needs_rebuilding; then
build_nim
fi

View File

@ -7,8 +7,7 @@
# This file may not be copied, modified, or distributed except according to # This file may not be copied, modified, or distributed except according to
# those terms. # those terms.
import import raft/consensus_state_machine
raft/raft_api import raft/types
export consensus_state_machine
export export types
raft_api, types, protocol

View File

@ -0,0 +1,542 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import types
import log
import tracker
import state
import std/[times]
import std/random
type
RaftRpcMessageType* = enum
VoteRequest = 0,
VoteReply = 1,
AppendRequest = 2,
AppendReply = 3,
InstallSnapshot = 4,
SnapshotReply = 5
RaftRpcCode* = enum
Rejected = 0,
Accepted = 1
DebugLogLevel* = enum
None = 0
Critical = 1,
Error = 2,
Warning = 3,
Debug = 4,
Info = 5,
All = 6,
DebugLogEntry* = object
level*: DebugLogLevel
time*: times.DateTime
nodeId*: RaftnodeId
state*: RaftNodeState
msg*: string
RaftRpcAppendRequest* = object
previousTerm*: RaftNodeTerm
previousLogIndex*: RaftLogIndex
commitIndex*: RaftLogIndex
entries*: seq[LogEntry]
RaftRpcAppendReplyRejected* = object
nonMatchingIndex: RaftLogIndex
lastIdx: RaftLogIndex
RaftRpcAppendReplyAccepted* = object
lastNewIndex: RaftLogIndex
RaftRpcAppendReply* = object
commitIndex*: RaftLogIndex
term*: RaftNodeTerm
case result: RaftRpcCode:
of Accepted: accepted: RaftRpcAppendReplyAccepted
of Rejected: rejected: RaftRpcAppendReplyRejected
RaftRpcVoteRequest* = object
currentTerm*: RaftNodeTerm
lastLogIndex*: RaftLogIndex
lastLogTerm*: RaftNodeTerm
force*: bool
RaftRpcVoteReply* = object
currentTerm*: RaftNodeTerm
voteGranted*: bool
RaftSnapshot* = object
index: RaftLogIndex
term: RaftNodeTerm
config: RaftConfig
snapshotId: RaftSnapshotId
RaftInstallSnapshot* = object
term: RaftNodeTerm
snapshot: RaftSnapshot
RaftSnapshotReply* = object
term: RaftNodeTerm
success: bool
RaftRpcMessage* = object
currentTerm*: RaftNodeTerm
sender*: RaftNodeId
receiver*: RaftNodeId
case kind*: RaftRpcMessageType
of VoteRequest: voteRequest*: RaftRpcVoteRequest
of VoteReply: voteReply*: RaftRpcVoteReply
of AppendRequest: appendRequest*: RaftRpcAppendRequest
of AppendReply: appendReply*: RaftRpcAppendReply
of InstallSnapshot: installSnapshot*: RaftInstallSnapshot
of SnapshotReply: snapshotReply*: RaftSnapshotReply
RaftStateMachineOutput* = object
logEntries*: seq[LogEntry]
# Entries that should be applyed to the "User" State machine
committed*: seq[LogEntry]
messages*: seq[RaftRpcMessage]
debugLogs*: seq[DebugLogEntry]
term*: RaftNodeTerm
votedFor*: Option[RaftNodeId]
stateChange*: bool
RaftLastPollState* = object
term*: RaftNodeTerm
votedFor*: RaftNodeId
commitIndex: RaftLogIndex
RaftStateMachine* = object
myId*: RaftNodeId
term*: RaftNodeTerm
commitIndex: RaftLogIndex
toCommit: RaftLogIndex
log: RaftLog
output: RaftStateMachineOutput
lastUpdate: times.Time
votedFor: RaftNodeId
currentLeader: RaftNodeId
pingLeader: bool
config: RaftConfig
lastElectionTime: times.DateTime
randomizedElectionTime: times.Duration
heartbeatTime: times.Duration
timeNow: times.DateTime
startTime: times.DateTime
electionTimeout: times.Duration
randomGenerator: Rand
observedState: RaftLastPollState
state*: RaftStateMachineState
func observe(ps: var RaftLastPollState, sm: RaftStateMachine) =
ps.term = sm.term
ps.votedFor = sm.votedFor
ps.commitIndex = sm.commitIndex
func eq(ps: RaftLastPollState, sm: RaftStateMachine): bool =
return ps.term == sm.term and
ps.votedFor == sm.votedFor and
ps.commitIndex == sm.commitIndex
func leader*(sm: var RaftStateMachine): var LeaderState =
return sm.state.leader
func follower*(sm: var RaftStateMachine): var FollowerState =
return sm.state.follower
func candidate*(sm: var RaftStateMachine): var CandidateState =
return sm.state.candidate
func addDebugLogEntry(sm: var RaftStateMachine, level: DebugLogLevel, msg: string) =
sm.output.debugLogs.add(DebugLogEntry(time: sm.timeNow, state: sm.state.state, level: level, msg: msg, nodeId: sm.myId))
func debug*(sm: var RaftStateMachine, log: string) =
sm.addDebugLogEntry(DebugLogLevel.Debug, log)
func warning*(sm: var RaftStateMachine, log: string) =
sm.addDebugLogEntry(DebugLogLevel.Warning, log)
func error*(sm: var RaftStateMachine, log: string) =
sm.addDebugLogEntry(DebugLogLevel.Error, log)
func info*(sm: var RaftStateMachine, log: string) =
sm.addDebugLogEntry(DebugLogLevel.Info, log)
func critical*(sm: var RaftStateMachine, log: string) =
sm.addDebugLogEntry(DebugLogLevel.Critical, log)
func resetElectionTimeout*(sm: var RaftStateMachine) =
# TODO actually pick random time
sm.randomizedElectionTime = sm.electionTimeout + times.initDuration(milliseconds = 100 + sm.randomGenerator.rand(200))
func initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftLog, commitIndex: RaftLogIndex, config: RaftConfig, now: times.DateTime, randomGenerator: Rand): RaftStateMachine =
var sm = RaftStateMachine()
sm.term = currentTerm
sm.log = log
sm.commitIndex = commitIndex
sm.state = initFollower(RaftNodeId())
sm.config = config
sm.lastElectionTime = now
sm.timeNow = now
sm.startTime = now
sm.myId = id
sm.electionTimeout = times.initDuration(milliseconds = 100)
sm.heartbeatTime = times.initDuration(milliseconds = 50)
sm.randomGenerator = randomGenerator
sm.resetElectionTimeout()
sm.observedState.observe(sm)
return sm
func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] =
return sm.leader.tracker.find(id)
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendRequest, appendRequest: request))
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendReply) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendReply, appendReply: request))
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteRequest) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteRequest, voteRequest: request))
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteReply) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReply, voteReply: request))
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftSnapshotReply) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.SnapshotReply, snapshotReply: request))
func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftInstallSnapshot) =
sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.InstallSnapshot, installSnapshot: request))
func sendTo[MsgType](sm: var RaftStateMachine, id: RaftNodeId, request: MsgType) =
sm.debug "Send to " & $id & $request
if sm.state.isLeader:
var follower = sm.findFollowerProggressById(id)
if follower.isSome:
follower.get().lastMessageAt = sm.timeNow
else:
sm.warning "Follower not found: " & $id
sm.debug $sm.leader
sm.sendToImpl(id, request)
func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage =
return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: VoteRequest, voteRequest: RaftRpcVoteRequest())
func replicateTo*(sm: var RaftStateMachine, follower: RaftFollowerProgressTracker) =
if follower.nextIndex > sm.log.lastIndex:
return
var previousTerm = sm.log.termForIndex(follower.nextIndex - 1)
sm.debug "replicate to " & $follower[]
if previousTerm.isSome:
let request = RaftRpcAppendRequest(
previousTerm: previousTerm.get(),
previousLogIndex: follower.nextIndex - 1,
commitIndex: sm.commitIndex,
entries: @[sm.log.getEntryByIndex(follower.nextIndex)])
follower.nextIndex += 1
sm.sendTo(follower.id, request)
else:
# TODO: we add support for snapshots
let request = RaftRpcAppendRequest(
previousTerm: 0,
previousLogIndex: 1,
commitIndex: sm.commitIndex,
entries: @[sm.log.getEntryByIndex(follower.nextIndex)])
follower.nextIndex += 1
sm.sendTo(follower.id, request)
func replicate*(sm: var RaftStateMachine) =
if sm.state.isLeader:
for followerIndex in 0..<sm.leader.tracker.progress.len:
if sm.myId != sm.leader.tracker.progress[followerIndex].id:
sm.replicateTo(sm.leader.tracker.progress[followerIndex])
func addEntry(sm: var RaftStateMachine, entry: LogEntry) =
if not sm.state.isLeader:
sm.error "only the leader can handle new entries"
sm.log.appendAsLeader(entry)
func addEntry*(sm: var RaftStateMachine, command: Command) =
sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletCommand, command: command))
func addEntry*(sm: var RaftStateMachine, config: Config) =
sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletConfig, config: config))
func addEntry*(sm: var RaftStateMachine, dummy: Empty) =
sm.addEntry(LogEntry(term: sm.term, index: sm.log.nextIndex, kind: rletEmpty, empty: true))
func becomeFollower*(sm: var RaftStateMachine, leaderId: RaftNodeId) =
if sm.myId == leaderId:
sm.error "Can't be follower of itself"
sm.output.stateChange = not sm.state.isFollower
sm.state = initFollower(leaderId)
if leaderId != RaftnodeId():
sm.pingLeader = false
# TODO: Update last election time
func becomeLeader*(sm: var RaftStateMachine) =
if sm.state.isLeader:
sm.error "The leader can't become leader second time"
return
sm.output.stateChange = true
# Because we will add new entry on the next line
sm.state = initLeader(sm.config, sm.log.lastIndex + 1, sm.timeNow)
sm.addEntry(Empty())
sm.pingLeader = false
#TODO: Update last election time
return
func becomeCandidate*(sm: var RaftStateMachine) =
#TODO: implement
if not sm.state.isCandidate:
sm.output.stateChange = true
sm.state = initCandidate(sm.config)
sm.lastElectionTime = sm.timeNow
# TODO: Add configuration change logic
sm.term += 1
for nodeId in sm.candidate.votes.voters:
if nodeId == sm.myId:
sm.debug "register vote for it self "
discard sm.candidate.votes.registerVote(nodeId, true)
sm.votedFor = nodeId
continue
let request = RaftRpcVoteRequest(currentTerm: sm.term, lastLogIndex: sm.log.lastIndex, lastLogTerm: sm.log.lastTerm, force: false)
sm.sendTo(nodeId, request)
if sm.candidate.votes.tallyVote == RaftElectionResult.Won:
sm.debug "Elecation won" & $(sm.candidate.votes) & $sm.myId
sm.becomeLeader()
return
func heartbeat(sm: var RaftStateMachine, follower: var RaftFollowerProgressTracker) =
sm.info "heartbeat " & $follower.nextIndex
var previousTerm = 0
if sm.log.lastIndex > 1:
previousTerm = sm.log.termForIndex(follower.nextIndex - 1).get()
let request = RaftRpcAppendRequest(
previousTerm: previousTerm,
previousLogIndex: follower.nextIndex - 1,
commitIndex: sm.commitIndex,
entries: @[])
sm.sendTo(follower.id, request)
func tickLeader*(sm: var RaftStateMachine, now: times.DateTime) =
sm.timeNow = now
# if sm.lastElectionTime - sm.timeNow > sm.electionTimeout:
# sm.becomeFollower(RaftnodeId())
# return
sm.lastElectionTime = now
if not sm.state.isLeader:
sm.error "tickLeader can be called only on the leader"
return
for followerIndex in 0..<sm.leader.tracker.progress.len:
var follower = sm.leader.tracker.progress[followerIndex]
if sm.myId != follower.id:
if follower.matchIndex < sm.log.lastIndex or follower.commitIndex < sm.commitIndex:
sm.replicateTo(follower)
#sm.debug "replicate to" & $follower
#sm.debug $(now - follower.lastMessageAt)
if now - follower.lastMessageAt > sm.heartbeatTime:
sm.heartbeat(follower)
# TODO: implement step down logic
func tick*(sm: var RaftStateMachine, now: times.DateTime) =
sm.info "Term: " & $sm.term & " commit idx " & $sm.commitIndex & " Time since last update: " & $(now - sm.timeNow).inMilliseconds & "ms time until election:" & $(sm.randomizedElectionTime - (sm.timeNow - sm.lastElectionTime)).inMilliseconds & "ms"
sm.timeNow = now
if sm.state.isLeader:
sm.tickLeader(now);
elif sm.state.isFollower and sm.timeNow - sm.lastElectionTime > sm.randomizedElectionTime:
sm.debug "Become candidate"
sm.becomeCandidate()
func commit(sm: var RaftStateMachine) =
if not sm.state.isLeader:
return
var newIndex = sm.commitIndex
var nextIndex = sm.commitIndex + 1
while nextIndex < sm.log.nextIndex:
var replicationCnt = 1
for p in sm.leader.tracker.progress:
if p.matchIndex > newIndex:
replicationCnt += 1
sm.debug "replication count: " & $replicationCnt & " for log index: " & $nextIndex
if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1):
sm.debug "Commit index: " & $nextIndex
sm.commitIndex = nextIndex;
nextIndex += 1
newIndex += 1
else:
break
func poll*(sm: var RaftStateMachine): RaftStateMachineOutput =
# Should initiate replication if we have new entries
if sm.state.isLeader:
sm.replicate()
sm.commit()
sm.output.term = sm.term
if sm.observedState.commitIndex < sm.commitIndex:
for i in (sm.observedState.commitIndex + 1)..<(sm.commitIndex + 1):
sm.output.committed.add(sm.log.getEntryByIndex(i))
if sm.votedFor != RaftnodeId():
sm.output.votedFor = some(sm.votedFor)
sm.observedState.observe(sm)
let output = sm.output
sm.output = RaftStateMachineOutput()
return output
func appendEntryReply*(sm: var RaftStateMachine, fromId: RaftNodeId, reply: RaftRpcAppendReply) =
if not sm.state.isLeader:
sm.debug "You can't append append reply to the follower"
return
var follower = sm.findFollowerProggressById(fromId)
if not follower.isSome:
sm.debug "Can't find the follower"
return
follower.get().commitIndex = max(follower.get().commitIndex, reply.commitIndex)
case reply.result:
of RaftRpcCode.Accepted:
let lastIndex = reply.accepted.lastNewIndex
sm.debug "Accpeted message from" & $fromId & " last log index: " & $lastIndex
follower.get().accepted(lastIndex)
# TODO: add leader stepping down logic here
if not sm.state.isLeader:
return
of RaftRpcCode.Rejected:
if reply.rejected.nonMatchingIndex == 0 and reply.rejected.lastIdx == 0:
sm.replicateTo(follower.get())
follower.get().nextIndex = min(reply.rejected.nonMatchingIndex, reply.rejected.lastIdx + 1)
# if commit apply configuration that removes current follower
# we should take it again
var follower2 = sm.findFollowerProggressById(fromId)
if follower2.isSome:
sm.replicateTo(follower2.get())
func advanceCommitIdx(sm: var RaftStateMachine, leaderIdx: RaftLogIndex) =
let newIdx = min(leaderIdx, sm.log.lastIndex)
if newIdx > sm.commitIndex:
sm.debug "Commit index is changed. Old:" & $sm.commitIndex & " New:" & $newIdx
sm.commitIndex = newIdx
# TODO: signal the output for the update
func appendEntry*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcAppendRequest) =
if not sm.state.isFollower:
sm.debug "You can't append append request to the non follower"
return
let (match, term) = sm.log.matchTerm(request.previousLogIndex, request.previousTerm)
if not match:
let rejected = RaftRpcAppendReplyRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex)
let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected)
sm.sendTo(fromId, responce)
sm.debug "Reject to apply the entry"
for entry in request.entries:
sm.log.appendAsFollower(entry)
sm.advanceCommitIdx(request.commitIndex)
let accepted = RaftRpcAppendReplyAccepted(lastNewIndex: sm.log.lastIndex)
let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted)
sm.sendTo(fromId, responce)
func requestVote*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteRequest) =
let canVote = sm.votedFor == fromId or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId())
if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm):
let responce = RaftRpcVoteReply(currentTerm: sm.term, voteGranted: true)
sm.sendTo(fromId, responce)
else:
let responce: RaftRpcVoteReply = RaftRpcVoteReply(currentTerm: sm.term, voteGranted: false)
sm.sendTo(fromId, responce)
func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteReply) =
if not sm.state.isCandidate:
sm.debug "Non candidate can't handle votes"
return
discard sm.candidate.votes.registerVote(fromId, request.voteGranted)
case sm.candidate.votes.tallyVote:
of RaftElectionResult.Unknown:
return
of RaftElectionResult.Won:
sm.debug "Elecation won" & $(sm.candidate.votes) & $sm.myId
sm.becomeLeader()
of RaftElectionResult.Lost:
sm.debug "Lost election"
sm.becomeFollower(RaftNodeId())
func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime) =
#sm.debug $msg
if msg.currentTerm > sm.term:
sm.debug "Current node is behind"
var leaderId = RaftnodeId()
if msg.kind == RaftRpcMessageType.AppendRequest:
leaderId = msg.sender
sm.becomeFollower(leaderId)
# TODO: implement pre vote
sm.term = msg.currentTerm
sm.votedFor = RaftnodeId()
elif msg.currentTerm < sm.term:
if msg.kind == RaftRpcMessageType.AppendRequest:
# Instruct leader to step down
let rejected = RaftRpcAppendReplyRejected(nonMatchingIndex: 0, lastIdx: sm.log.lastIndex)
let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected)
sm.sendTo(msg.sender, responce)
sm.warning "Ignore message with lower term"
else:
# TODO: add also snapshot
if msg.kind == RaftRpcMessageType.AppendRequest:
if sm.state.isCandidate:
sm.becomeFollower(msg.sender)
elif sm.state.isFollower:
sm.follower.leader = msg.sender
# TODO: fix time
if sm.state.isCandidate:
if msg.kind == RaftRpcMessageType.VoteRequest:
sm.requestVote(msg.sender, msg.voteRequest)
elif msg.kind == RaftRpcMessageType.VoteReply:
sm.debug "Apply vote"
sm.requestVoteReply(msg.sender, msg.voteReply)
else:
sm.warning "Candidate ignore message"
elif sm.state.isFollower:
if msg.sender == sm.follower.leader:
sm.lastElectionTime = now
if msg.kind == RaftRpcMessageType.AppendRequest:
sm.appendEntry(msg.sender, msg.appendRequest)
elif msg.kind == RaftRpcMessageType.VoteRequest:
sm.requestVote(msg.sender, msg.voteRequest)
else:
sm.warning "Follower ignore message" & $msg
# TODO: imelement the rest of the state transitions
elif sm.state.isLeader:
if msg.kind == RaftRpcMessageType.AppendRequest:
sm.warning "Ignore message leader append his entries directly"
elif msg.kind == RaftRpcMessageType.AppendReply:
sm.appendEntryReply(msg.sender, msg.appendReply)
elif msg.kind == RaftRpcMessageType.VoteRequest:
sm.requestVote(msg.sender, msg.voteRequest)
else:
sm.warning "Leader ignore message"

103
src/raft/log.nim Normal file
View File

@ -0,0 +1,103 @@
import types
import std/sequtils
type
RaftLogEntryType* = enum
rletCommand = 0,
rletConfig = 1,
rletEmpty = 2
Command* = object
data*: seq[byte]
Config* = object
Empty* = object
LogEntry* = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.)
term*: RaftNodeTerm
index*: RaftLogIndex
# TODO: Add configuration too
case kind*: RaftLogEntryType:
of rletCommand: command*: Command
of rletConfig: config*: Config
of rletEmpty: empty*: bool
RaftLog* = object
logEntries: seq[LogEntry]
firstIndex: RaftLogIndex
func initRaftLog*(firstIndex: RaftLogIndex): RaftLog =
var log = RaftLog()
assert firstIndex > 0
log.firstIndex = firstIndex
return log
func lastTerm*(rf: RaftLog): RaftNodeTerm =
# Not sure if it's ok, maybe we should return optional value
let size = rf.logEntries.len
if size == 0:
return 0
return rf.logEntries[size - 1].term
func entriesCount*(rf: RaftLog): int =
return rf.logEntries.len
func lastIndex*(rf: RaftLog): RaftNodeTerm =
return rf.logEntries.len + rf.firstIndex - 1
func nextIndex*(rf: RaftLog): int =
return rf.lastIndex + 1
func truncateUncomitted*(rf: var RaftLog, index: RaftLogIndex) =
# TODO: We should add support for configurations and snapshots
if rf.logEntries.len == 0:
return
rf.logEntries.delete((index - rf.firstIndex)..<len(rf.logEntries))
func isUpToDate*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): bool =
return term > rf.lastTerm or (term == rf.lastTerm and index >= rf.lastIndex)
func getEntryByIndex*(rf: RaftLog, index: RaftLogIndex): LogEntry =
return rf.logEntries[index - rf.firstIndex]
func appendAsLeader*(rf: var RaftLog, entry: LogEntry) =
rf.logEntries.add(entry)
func appendAsFollower*(rf: var RaftLog, entry: LogEntry) =
assert entry.index > 0
let currentIdx = rf.lastIndex
if entry.index <= currentIdx:
# TODO: The indexing hold only if we keep all entries in memory
# we should change it when we add support for snapshots
if entry.index >= rf.firstIndex or entry.term != rf.getEntryByIndex(entry.index).term:
rf.truncateUncomitted(entry.index)
rf.logEntries.add(entry)
func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletCommand, command: data))
func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, empty: bool) =
rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletEmpty, empty: true))
func appendAsFollower*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) =
rf.appendAsFollower(LogEntry(term: term, index: index, kind: rletCommand, command: data))
func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): (bool, RaftNodeTerm) =
if len(rf.logEntries) == 0:
return (true, 0)
# TODO: We should add support for snapshots
if index > len(rf.logEntries):
# The follower doesn't have all etries
return (false, 0)
let i = index - rf.firstIndex
if rf.logEntries[i].term == term:
return (true, 0)
else:
return (false, rf.logEntries[i].term)
func termForIndex*(rf: RaftLog, index: RaftLogIndex): Option[RaftNodeTerm] =
# TODO: snapshot support
assert rf.logEntries.len > index - rf.firstIndex, $rf.logEntries.len & " " & $index & "" & $rf
if rf.logEntries.len > 0 and index >= rf.firstIndex:
return some(rf.logEntries[index - rf.firstIndex].term)
return none(RaftNodeTerm)

57
src/raft/state.nim Normal file
View File

@ -0,0 +1,57 @@
import types
import tracker
import std/[times]
type
RaftNodeState* = enum
rnsFollower = 0, # Follower state
rnsCandidate = 1 # Candidate state
rnsLeader = 2 # Leader state
RaftStateMachineState* = object
case state*: RaftNodeState
of rnsFollower: follower: FollowerState
of rnsCandidate: candidate: CandidateState
of rnsLeader: leader: LeaderState
LeaderState* = object
tracker*: RaftTracker
CandidateState* = object
votes*: RaftVotes
FollowerState* = object
leader*: RaftNodeId
func `$`*(s: RaftStateMachineState): string =
return $s.state
func initLeader*(cfg: RaftConfig, index: RaftLogIndex, now: times.DateTime): RaftStateMachineState =
var state = RaftStateMachineState(state: RaftnodeState.rnsLeader, leader: LeaderState())
state.leader.tracker = initTracker(cfg, index, now)
return state
func initFollower*(leaderId: RaftNodeId): RaftStateMachineState =
return RaftStateMachineState(state: RaftNodeState.rnsFollower, follower: FollowerState(leader: leaderId))
func initCandidate*(cfg: RaftConfig): RaftStateMachineState =
return RaftStateMachineState(state: RaftnodeState.rnsCandidate, candidate: CandidateState(votes: initVotes(cfg)))
func isLeader*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsLeader
func isFollower*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsFollower
func isCandidate*(s: RaftStateMachineState): bool =
return s.state == RaftNodeState.rnsCandidate
func leader*(s: var RaftStateMachineState): var LeaderState =
return s.leader
func follower*(s: var RaftStateMachineState): var FollowerState =
return s.follower
func candidate*(s: var RaftStateMachineState): var CandidateState =
return s.candidate

97
src/raft/tracker.nim Normal file
View File

@ -0,0 +1,97 @@
import types
import std/[times]
type
RaftElectionResult* = enum
Unknown = 0,
Won = 1,
Lost = 2
RaftElectionTracker* = object
all: seq[RaftNodeId]
responded: seq[RaftNodeId]
granted: int
RaftVotes* = object
voters*: seq[RaftNodeId]
current: RaftElectionTracker
RaftFollowerProgress = seq[RaftFollowerProgressTracker]
RaftTracker* = object
progress*: RaftFollowerProgress
current: seq[RaftNodeId]
RaftFollowerProgressTracker* = ref object
id*: RaftNodeId
nextIndex*: RaftLogIndex
# Index of the highest log entry known to be replicated to this server.
matchIndex*: RaftLogIndex
commitIndex*: RaftLogIndex
replayedIndex: RaftLogIndex
lastMessageAt*: times.DateTime
func initElectionTracker*(nodes: seq[RaftNodeId]): RaftElectionTracker =
var r = RaftElectionTracker()
r.all = nodes
r.granted = 0
return r
func registerVote*(ret: var RaftElectionTracker, nodeId: RaftNodeId, granted: bool): bool =
if not ret.all.contains nodeId:
return false
if not ret.responded.contains nodeId:
ret.responded.add(nodeId)
if granted:
ret.granted += 1
return true
func tallyVote*(ret: var RaftElectionTracker): RaftElectionResult =
let quorym = int(len(ret.all) / 2) + 1
if ret.granted >= quorym:
return RaftElectionResult.Won
let unkown = len(ret.all) - len(ret.responded)
if ret.granted + unkown >= quorym:
return RaftElectionResult.Unknown
else:
return RaftElectionResult.Lost
func initVotes*(nodes: seq[RaftNodeId]): RaftVotes =
var r = RaftVotes(voters: nodes, current: initElectionTracker(nodes))
return r
func initVotes*(config: RaftConfig): RaftVotes =
var r = RaftVotes(voters: config.currentSet, current: initElectionTracker(config.currentSet))
return r
func registerVote*(rv: var RaftVotes, nodeId: RaftNodeId, granted: bool): bool =
# TODO: Add support for configuration
return rv.current.registerVote(nodeId, granted)
func tallyVote*(rv: var RaftVotes): RaftElectionResult =
# TODO: Add support for configuration
return rv.current.tallyVote()
func find*(ls: RaftTracker, id: RaftnodeId): Option[RaftFollowerProgressTracker] =
for follower in ls.progress:
if follower.id == id:
return some(follower)
return none(RaftFollowerProgressTracker)
func initFollowerProgressTracker*(follower: RaftNodeId, nextIndex: RaftLogIndex, now: times.DateTime): RaftFollowerProgressTracker =
return RaftFollowerProgressTracker(id: follower, nextIndex: nextIndex, matchIndex: 0, commitIndex: 0, replayedIndex: 0, lastMessageAt: now)
func initTracker*(config: RaftConfig, nextIndex: RaftLogIndex, now: times.DateTime): RaftTracker =
var tracker = RaftTracker()
for node in config.currentSet:
tracker.progress.add(initFollowerProgressTracker(node, nextIndex, now))
tracker.current.add(node)
return tracker
func accepted*(fpt: var RaftFollowerProgressTracker, index: RaftLogIndex)=
fpt.matchIndex = max(fpt.matchIndex, index)
fpt.nextIndex = max(fpt.nextIndex, index)

34
src/raft/types.nim Normal file
View File

@ -0,0 +1,34 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
# Raft Node Public Types
import std/rlocks
import options
import stew/results
import uuids
import chronos
export
results,
options,
rlocks,
uuids,
chronos
const
DefaultUUID* = initUUID(0, 0) # 00000000-0000-0000-0000-000000000000
type
RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node
RaftNodeTerm* = int # Raft Node Term Type
RaftLogIndex* = int # Raft Node Log Index Type
RaftSnapshotId* = int
RaftConfig* = object
currentSet*: seq[RaftNodeId]

View File

@ -6,3 +6,7 @@
# at your option. # at your option.
# This file may not be copied, modified, or distributed except according to # This file may not be copied, modified, or distributed except according to
# those terms. # those terms.
import test_consensus_state_machine
export test_consensus_state_machine

345
tests/test_bls_cluester.nim Normal file
View File

@ -0,0 +1,345 @@
import ../src/raft/types
import ../src/raft/consensus_state_machine
import ../src/raft/log
import ../src/raft/tracker
import ../src/raft/state
import std/[times, sequtils, random]
import std/sugar
import std/sets
import std/json
import std/jsonutils
import std/options
import std/strutils
import stew/endians2
import stew/byteutils
import std/algorithm
import blscurve
import tables
import unittest2
type
UserStateMachine = object
Message* = object
fieldInt: int
Hash = int
UserState* = object
lastCommitedMsg: Message
SignedLogEntry = object
hash: Hash
logIndex: RaftLogIndex
signature: SignedShare
BLSTestNode* = ref object
stm: RaftStateMachine
keyShare: SecretShare
us: UserState
blockCommunication: bool
debugLogs: seq[DebugLogEntry]
messageSignatures: Table[Hash, seq[SignedShare]]
signEntries: seq[SignedLogEntry]
clusterPublicKey: PublicKey
BLSTestCluster* = object
nodes*: Table[RaftnodeId, BLSTestNode]
delayer*: MessageDelayer
SecretShare = object
secret: SecretKey
id: ID
DelayedMessage* = object
msg: SignedRpcMessage
executeAt: times.DateTime
MessageDelayer* = object
messages: seq[DelayedMessage]
randomGenerator: Rand
meanDelay: float
stdDelay: float
minDelayMs: int
SignedShare = object
sign: Signature
pubkey: PublicKey
id: ID
SignedRpcMessage* = object
raftMsg: RaftRpcMessage
signEntries: seq[SignedLogEntry]
var secretKey = "1b500388741efd98239a9b3a689721a89a92e8b209aabb10fb7dc3f844976dc2"
var test_ids_3 = @[
RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")),
RaftnodeId(parseUUID("2a98fc33-6559-44c0-b130-fc3e9df80a69")),
RaftnodeId(parseUUID("9156756d-697f-4ffa-9b82-0c86720344bd"))
]
var test_ids_1 = @[
RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")),
]
proc initDelayer(mean: float, std: float, minInMs: int, generator: Rand): MessageDelayer =
var delayer = MessageDelayer()
delayer.meanDelay = mean
delayer.stdDelay = std
delayer.minDelayMs = minInMs
delayer.randomGenerator = generator
return delayer
proc getMessages(delayer: var MessageDelayer, now: times.DateTime): seq[SignedRpcMessage] =
result = delayer.messages.filter(m => m.executeAt <= now).map(m => m.msg)
delayer.messages = delayer.messages.filter(m => m.executeAt > now)
return result
proc add(delayer: var MessageDelayer, message: SignedRpcMessage, now: times.DateTime) =
let rndDelay = delayer.randomGenerator.gauss(delayer.meanDelay, delayer.stdDelay)
let at = now + times.initDuration(milliseconds = delayer.minDelayMs + rndDelay.int)
delayer.messages.add(DelayedMessage(msg: message, executeAt: at))
proc signs(shares: openArray[SignedShare]): seq[Signature] =
shares.mapIt(it.sign)
proc ids(shares: openArray[SignedShare]): seq[ID] =
shares.mapIt(it.id)
func createConfigFromIds*(ids: seq[RaftnodeId]): RaftConfig =
var config = RaftConfig()
for id in ids:
config.currentSet.add(id)
return config
proc toString(bytes: openarray[byte]): string =
result = newString(bytes.len)
copyMem(result[0].addr, bytes[0].unsafeAddr, bytes.len)
proc toCommand(msg: Message): Command =
var msgJson = $(msg.toJson)
return Command(data: msgJson.toBytes)
proc toMessage(cmd: Command): Message =
return to(parseJson(cmd.data.toString), Message)
proc toBytes(msg: Message): seq[byte] =
var msgJson = $(msg.toJson)
return msgJson.toBytes
proc toBytes(msg: RaftRpcMessage): seq[byte] =
var msgJson = $(msg.toJson)
return msgJson.toBytes
proc cmpLogs*(x, y: DebugLogEntry): int =
cmp(x.time, y.time)
func `$`*(de: DebugLogEntry): string =
return "[" & $de.level & "][" & de.time.format("HH:mm:ss:fff") & "][" & (($de.nodeId)[0..7]) & "...][" & $de.state & "]: " & de.msg
proc sign(node: BLSTestNode, msg: Message): SignedShare =
var pk: PublicKey
discard pk.publicFromSecret(node.keyShare.secret)
echo "Produce signature from node: " & $node.stm.myId & " with public key: " & $pk.toHex & "over msg " & $msg.toJson
return SignedShare(
sign: node.keyShare.secret.sign(msg.toBytes),
pubkey: pk,
id: node.keyShare.id,
)
proc pollMessages(node: BLSTestNode): seq[SignedRpcMessage] =
var output = node.stm.poll()
var debugLogs = output.debugLogs
var msgs: seq[SignedRpcMessage]
var pk: PublicKey
discard pk.publicFromSecret(node.keyShare.secret)
for msg in output.messages:
if msg.kind == RaftRpcMessageType.AppendReply:
msgs.add(SignedRpcMessage(
raftMsg: msg,
signEntries: node.signEntries
))
let commitIndex = msg.appendReply.commitIndex
# remove the signature of all entries that are already commited
node.signEntries = node.signEntries.filter(x => x.logIndex > commitIndex)
else:
msgs.add(SignedRpcMessage(
raftMsg: msg,
signEntries: @[]
))
if node.stm.state.isLeader:
for commitedMsg in output.committed:
if commitedMsg.kind != rletCommand:
continue
var orgMsg = commitedMsg.command.toMessage
var shares = node.messageSignatures[orgMsg.fieldInt]
echo "Try to recover message" & $orgMsg.toBytes
echo "Shares: " & $shares.signs
var recoveredSignature = recover(shares.signs, shares.ids).expect("valid shares")
if not node.clusterPublicKey.verify(orgMsg.toBytes, recoveredSignature):
node.us.lastCommitedMsg = orgMsg
echo "State succesfuly changed"
else:
echo "Failed to reconstruct signature"
debugLogs.sort(cmpLogs)
for msg in debugLogs:
if msg.level <= DebugLogLevel.Debug:
echo $msg
return msgs
proc acceptMessage(node: var BLSTestNode, msg: SignedRpcMessage, now: times.DateTime) =
if msg.raftMsg.kind == RaftRpcMessageType.AppendRequest and node.stm.state.isFollower:
var pk: PublicKey
discard pk.publicFromSecret(node.keyShare.secret)
for entry in msg.raftMsg.appendRequest.entries:
if entry.kind == rletEmpty:
continue
var orgMsg = entry.command.toMessage
var share = SignedLogEntry(
hash: orgMsg.fieldInt,
logIndex: msg.raftMsg.appendRequest.previousLogIndex + 1,
signature: node.sign(orgMsg)
)
node.signEntries.add(share)
node.stm.advance(msg.raftMsg, now)
proc tick(node: BLSTestNode, now: times.DateTime) =
node.stm.tick(now)
proc keyGen(seed: uint64): tuple[pubkey: PublicKey, seckey: SecretKey] =
var ikm: array[32, byte]
ikm[0 ..< 8] = seed.toBytesLE
let ok = ikm.keyGen(result.pubkey, result.seckey)
doAssert ok
proc blsIdFromUint32(x: uint32) : ID =
var a: array[8, uint32] = [uint32 0, 0, 0, 0, 0, 0, 0, x]
ID.fromUint32(a)
proc generateSecretShares(sk: SecretKey, k: int, n: int): seq[SecretShare] =
doAssert k <= n
var originPts: seq[SecretKey]
originPts.add(sk)
for i in 1 ..< k:
originPts.add(keyGen(uint64(42 + i)).seckey)
for i in uint32(0) ..< uint32(n):
# id must not be zero
let id = blsIdFromUint32(i + 1)
let secret = genSecretShare(originPts, id)
result.add(SecretShare(secret: secret, id: id))
proc createBLSCluster(ids: seq[RaftnodeId], now: times.DateTime, k: int, n: int, delayer: MessageDelayer) : BLSTestCluster =
var sk: SecretKey
discard sk.fromHex("1b500388741efd98239a9b3a689721a89a92e8b209aabb10fb7dc3f844976dc2")
var pk: PublicKey
discard pk.publicFromSecret(sk)
var blsShares = generateSecretShares(sk, k, n)
var config = createConfigFromIds(ids)
var cluster = BLSTestCluster()
cluster.delayer = delayer
cluster.nodes = initTable[RaftnodeId, BLSTestNode]()
for i in 0..<config.currentSet.len:
let id = config.currentSet[i]
var log = initRaftLog(1)
cluster.nodes[id] = BLSTestNode(
stm: initRaftStateMachine(id, 0, log, 0, config, now, initRand(i + 42)),
keyShare: blsShares[i],
blockCommunication: false,
clusterPublicKey: pk,
)
return cluster
proc advance*(tc: var BLSTestCluster, now: times.DateTime, logLevel: DebugLogLevel = DebugLogLevel.Error) =
for id, node in tc.nodes:
node.tick(now)
var msgs = node.pollMessages()
for msg in msgs:
tc.delayer.add(msg, now)
var msgs = tc.delayer.getMessages(now)
for msg in msgs:
echo "eloooooooooooooooooooooooooooooooo" & $ msg
tc.nodes[msg.raftMsg.receiver].acceptMessage(msg, now)
func getLeader*(tc: BLSTestCluster): Option[BLSTestNode] =
var leader = none(BLSTestNode)
for id, node in tc.nodes:
if node.stm.state.isLeader:
if not leader.isSome() or leader.get().stm.term < node.stm.term:
leader = some(node)
return leader
proc submitMessage(tc: var BLSTestCluster, msg: Message): bool =
var leader = tc.getLeader()
if leader.isSome():
var pk: PublicKey
discard pk.publicFromSecret(leader.get.keyShare.secret)
echo "Leader Sign message" & $msg.toBytes
var share = leader.get().sign(msg)
if not leader.get.messageSignatures.hasKey(msg.fieldInt):
leader.get.messageSignatures[msg.fieldInt] = @[]
leader.get.messageSignatures[msg.fieldInt].add(share)
leader.get().stm.addEntry(msg.toCommand())
proc blsconsensusMain*() =
suite "BLS consensus tests":
test "create single node cluster":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var delayer = initDelayer(3, 3, 1, initRand(42))
var cluster = createBLSCluster(test_ids_1, timeNow, 1, 1, delayer)
timeNow += 300.milliseconds
cluster.advance(timeNow)
echo cluster.getLeader().get().stm.state
discard cluster.submitMessage(Message(fieldInt: 1))
discard cluster.submitMessage(Message(fieldInt: 2))
for i in 0..<305:
timeNow += 5.milliseconds
cluster.advance(timeNow)
echo "Helloo" & $cluster.getLeader().get.us.lastCommitedMsg
test "create 3 node cluster":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var delayer = initDelayer(3, 3, 1, initRand(42))
var cluster = createBLSCluster(test_ids_3, timeNow, 2, 3, delayer)
# skip time until first election
timeNow += 200.milliseconds
cluster.advance(timeNow)
var added = false
var commited = false
for i in 0..<50:
cluster.advance(timeNow)
if cluster.getLeader().isSome() and not added:
discard cluster.submitMessage(Message(fieldInt: 42))
added = true
echo "Add to the entry log"
timeNow += 5.milliseconds
if cluster.getLeader().isSome():
echo cluster.getLeader().get.us.lastCommitedMsg
if cluster.getLeader().get.us.lastCommitedMsg.fieldInt == 42:
commited = true
#break
check commited == true
if isMainModule:
blsconsensusMain()

View File

@ -0,0 +1,492 @@
# nim-raft
# Copyright (c) 2023 Status Research & Development GmbH
# Licensed under either of
# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE))
# * MIT license ([LICENSE-MIT](LICENSE-MIT))
# at your option.
# This file may not be copied, modified, or distributed except according to
# those terms.
import unittest2
import ../src/raft/types
import ../src/raft/consensus_state_machine
import ../src/raft/log
import ../src/raft/tracker
import ../src/raft/state
import std/sets
import std/[times, sequtils, random]
import uuids
import tables
import std/algorithm
type
TestCluster* = object
nodes: Table[RaftnodeId, RaftStateMachine]
blockedTickSet: HashSet[RaftnodeId]
blockedMsgRoutingSet: HashSet[RaftnodeId]
var test_ids_3 = @[
RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")),
RaftnodeId(parseUUID("2a98fc33-6559-44c0-b130-fc3e9df80a69")),
RaftnodeId(parseUUID("9156756d-697f-4ffa-9b82-0c86720344bd"))
]
var test_ids_1 = @[
RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")),
]
func createConfigFromIds(ids: seq[RaftnodeId]): RaftConfig =
var config = RaftConfig()
for id in ids:
config.currentSet.add(id)
return config
proc createCluster(ids: seq[RaftnodeId], now: times.DateTime) : TestCluster =
var config = createConfigFromIds(ids)
var cluster = TestCluster()
cluster.blockedTickSet.init()
cluster.blockedMsgRoutingSet.init()
cluster.nodes = initTable[RaftnodeId, RaftStateMachine]()
for i in 0..<config.currentSet.len:
let id = config.currentSet[i]
var log = initRaftLog(1)
var node = initRaftStateMachine(id, 0, log, 0, config, now, initRand(i + 42))
cluster.nodes[id] = node
return cluster
proc blockTick(tc: var TestCluster, id: RaftnodeId) =
tc.blockedTickSet.incl(id)
func blockMsgRouting(tc: var TestCluster, id: RaftnodeId) =
tc.blockedMsgRoutingSet.incl(id)
func allowTick(tc: var TestCluster, id: RaftnodeId) =
tc.blockedTickSet.excl(id)
func allowMsgRouting(tc: var TestCluster, id: RaftnodeId) =
tc.blockedMsgRoutingSet.excl(id)
proc cmpLogs(x, y: DebugLogEntry): int =
cmp(x.time, y.time)
func `$`*(de: DebugLogEntry): string =
return "[" & $de.level & "][" & de.time.format("HH:mm:ss:fff") & "][" & (($de.nodeId)[0..7]) & "...][" & $de.state & "]: " & de.msg
proc advance(tc: var TestCluster, now: times.DateTime, logLevel: DebugLogLevel = DebugLogLevel.Error) =
var debugLogs : seq[DebugLogEntry]
for id, node in tc.nodes:
if tc.blockedTickSet.contains(id):
continue
tc.nodes[id].tick(now)
var output = tc.nodes[id].poll()
debugLogs.add(output.debugLogs)
for msg in output.messages:
if not tc.blockedMsgRoutingSet.contains(msg.sender) and not tc.blockedMsgRoutingSet.contains(msg.receiver):
if DebugLogLevel.Debug <= logLevel:
echo now.format("HH:mm:ss:fff") & "rpc:" & $msg
tc.nodes[msg.receiver].advance(msg, now)
else:
if DebugLogLevel.Debug <= logLevel:
echo "[" & now.format("HH:mm:ss:fff") & "] rpc message is blocked: " & $msg & $tc.blockedMsgRoutingSet
for commit in output.committed:
if DebugLogLevel.Debug <= logLevel:
echo "[" & (($node.myId)[0..7]) & "...] Commit:" & $commit
debugLogs.sort(cmpLogs)
for msg in debugLogs:
if msg.level <= logLevel:
echo $msg
func getLeader(tc: TestCluster): Option[RaftStateMachine] =
var leader = none(RaftStateMachine)
for id, node in tc.nodes:
if node.state.isLeader:
if not leader.isSome() or leader.get().term < node.term:
leader = some(node)
return leader
proc consensusstatemachineMain*() =
suite "Basic state machine tests":
test "create state machine":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_1, timeNow)
test "tick empty state machine":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var config = createConfigFromIds(test_ids_1)
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
timeNow += 5.milliseconds
sm.tick(timeNow)
suite "Entry log tests":
test "append entry as leadeer":
var log = initRaftLog(1)
log.appendAsLeader(0, 1, Command())
log.appendAsLeader(0, 2, Command())
check log.lastTerm() == 0
log.appendAsLeader(1, 2, Command())
check log.lastTerm() == 1
test "append entry as follower":
var log = initRaftLog(1)
log.appendAsFollower(0, 1, Command())
check log.lastTerm() == 0
check log.lastIndex() == 1
check log.entriesCount == 1
log.appendAsFollower(0, 1, Command())
check log.lastTerm() == 0
check log.lastIndex() == 1
check log.entriesCount == 1
discard log.matchTerm(1, 1)
log.appendAsFollower(1, 2, Command())
check log.lastTerm() == 1
check log.lastIndex() == 2
check log.entriesCount == 2
log.appendAsFollower(1, 3, Command())
check log.lastTerm() == 1
check log.lastIndex() == 3
check log.entriesCount == 3
log.appendAsFollower(1, 2, Command())
check log.lastTerm() == 1
check log.lastIndex() == 2
check log.entriesCount == 2
log.appendAsFollower(2, 1, Command())
check log.lastTerm() == 2
check log.lastIndex() == 1
check log.entriesCount == 1
suite "3 node cluster":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
var t = now()
suite "Single node election tracker":
test "unknown":
var votes = initVotes(test_ids_1)
check votes.tallyVote == RaftElectionResult.Unknown
test "win election":
var votes = initVotes(test_ids_1)
discard votes.registerVote(test_ids_1[0], true)
check votes.tallyVote == RaftElectionResult.Won
test "lost election":
var votes = initVotes(test_ids_1)
discard votes.registerVote(test_ids_1[0], false)
echo votes.tallyVote
check votes.tallyVote == RaftElectionResult.Lost
suite "3 nodes election tracker":
test "win election":
var votes = initVotes(test_ids_3)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[0], true)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[1], true)
check votes.tallyVote == RaftElectionResult.Won
test "lose election":
var votes = initVotes(test_ids_3)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[0], false)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[1], true)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[2], true)
check votes.tallyVote == RaftElectionResult.Won
test "lose election":
var votes = initVotes(test_ids_3)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[0], false)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[1], false)
check votes.tallyVote == RaftElectionResult.Lost
test "lose election":
var votes = initVotes(test_ids_3)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[0], true)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[1], false)
check votes.tallyVote == RaftElectionResult.Unknown
discard votes.registerVote(test_ids_3[2], false)
check votes.tallyVote == RaftElectionResult.Lost
suite "Single node cluster":
test "election":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var config = createConfigFromIds(test_ids_1)
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
check sm.state.isFollower
timeNow += 99.milliseconds
sm.tick(timeNow)
var output = sm.poll()
check output.logEntries.len == 0
check output.committed.len == 0
check output.messages.len == 0
check sm.state.isFollower
timeNow += 500.milliseconds
sm.tick(timeNow)
output = sm.poll()
check output.logEntries.len == 0
# When the node became a leader it will produce empty message in the log
# and because we have single node cluster the node will commit that entry immediately
check output.committed.len == 1
check output.messages.len == 0
check sm.state.isLeader
check sm.term == 1
test "append entry":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var config = createConfigFromIds(test_ids_1)
var log = initRaftLog(1)
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
check sm.state.isFollower
timeNow += 1000.milliseconds
sm.tick(timeNow)
var output = sm.poll()
check output.logEntries.len == 0
# When the node became a leader it will produce empty message in the log
# and because we have single node cluster the node will commit that entry immediately
check output.committed.len == 1
check output.messages.len == 0
check sm.state.isLeader
sm.addEntry(Empty())
check sm.poll().messages.len == 0
timeNow += 250.milliseconds
sm.tick(timeNow)
check sm.poll().messages.len == 0
suite "Two nodes cluster":
test "election":
let id1 = test_ids_3[0]
let id2 = test_ids_3[1]
var config = createConfigFromIds(@[id1, id2])
var log = initRaftLog(1)
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
check sm.state.isFollower
timeNow += 601.milliseconds
sm.tick(timeNow)
check sm.state.isCandidate
var output = sm.poll()
check output.votedFor.isSome
check output.votedFor.get() == id1
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: true)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == true
check sm.state.isLeader
timeNow += 1.milliseconds
# Older messages should be ignored
block:
let voteRaplay = RaftRpcVoteReply(currentTerm: (output.term - 1), voteGranted: true)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:id1, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == false
check sm.state.isLeader
block:
output = sm.poll()
timeNow += 100.milliseconds
sm.tick(timeNow)
output = sm.poll()
# if the leader get a message with higher term it should become follower
block:
timeNow += 201.milliseconds
sm.tick(timeNow)
output = sm.poll()
let entry = LogEntry(term: (output.term + 1), index: 101, kind: RaftLogEntryType.rletEmpty, empty: true)
let appendRequest = RaftRpcAppendRequest(previousTerm: (output.term + 1), previousLogIndex: 100, commitIndex: 99, entries: @[entry])
let msg = RaftRpcMessage(currentTerm: (output.term + 1), sender: id2, receiver:id1, kind: RaftRpcMessageType.AppendRequest, appendRequest: appendRequest)
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == true
check sm.state.isFollower
suite "3 nodes cluster":
test "election failed":
let mainNodeId = test_ids_3[0]
let id2 = test_ids_3[1]
let id3 = test_ids_3[2]
var config = createConfigFromIds(test_ids_3)
var log = initRaftLog(1)
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
check sm.state.isFollower
timeNow += 501.milliseconds
sm.tick(timeNow)
check sm.state.isCandidate
var output = sm.poll()
check output.votedFor.isSome
check output.votedFor.get() == mainNodeId
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == false
check sm.state.isCandidate
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == true
check sm.state.isFollower
timeNow += 1.milliseconds
test "election":
let mainNodeId = test_ids_3[0]
let id2 = test_ids_3[1]
let id3 = test_ids_3[2]
var config = createConfigFromIds(test_ids_3)
var log = initRaftLog(1)
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var sm = initRaftStateMachine(test_ids_1[0], 0, log, 0, config, timeNow, initRand(42))
check sm.state.isFollower
timeNow += 501.milliseconds
sm.tick(timeNow)
check sm.state.isCandidate
var output = sm.poll()
check output.votedFor.isSome
check output.votedFor.get() == mainNodeId
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: false)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id2, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == false
check sm.state.isCandidate
timeNow += 1.milliseconds
block:
let voteRaplay = RaftRpcVoteReply(currentTerm: output.term, voteGranted: true)
let msg = RaftRpcMessage(currentTerm: output.term, sender: id3, receiver:mainNodeId, kind: RaftRpcMessageType.VoteReply, voteReply: voteRaplay)
check sm.state.isCandidate
sm.advance(msg, timeNow)
output = sm.poll()
check output.stateChange == true
check sm.state.isLeader
timeNow += 1.milliseconds
suite "3 nodes cluester":
test "election":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
var leader: RaftnodeId
var hasLeader = false
for i in 0..<105:
timeNow += 5.milliseconds
cluster.advance(timeNow)
var maybeLeader = cluster.getLeader()
if leader == RaftnodeId():
if maybeLeader.isSome:
leader = maybeLeader.get().myId
hasLeader = true
else:
if maybeLeader.isSome:
check leader == maybeLeader.get().myId
else:
check false
# we should elect atleast 1 leader
check hasLeader
test "1 node is not responding":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
cluster.blockMsgRouting(test_ids_3[0])
var leader: RaftnodeId
var hasLeader = false
for i in 0..<105:
timeNow += 5.milliseconds
cluster.advance(timeNow)
var maybeLeader = cluster.getLeader()
if leader == RaftnodeId():
if maybeLeader.isSome:
leader = maybeLeader.get().myId
hasLeader = true
else:
if maybeLeader.isSome:
check leader == maybeLeader.get().myId
else:
check false
# we should elect atleast 1 leader
check hasLeader
test "2 nodes is not responding":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
cluster.blockMsgRouting(test_ids_3[0])
cluster.blockMsgRouting(test_ids_3[1])
var leader: RaftnodeId
for i in 0..<105:
timeNow += 5.milliseconds
cluster.advance(timeNow)
var maybeLeader = cluster.getLeader()
# We should never elect a leader
check leader == RaftnodeId()
test "1 nodes is not responding new leader reelection":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
var leader: RaftnodeId
var firstLeaderId = RaftnodeId()
var secondLeaderId = RaftnodeId()
for i in 0..<305:
timeNow += 5.milliseconds
cluster.advance(timeNow)
var maybeLeader = cluster.getLeader()
if maybeLeader.isSome() and firstLeaderId == RaftNodeId():
# we will block the comunication and will try to elect new leader
firstLeaderId = maybeLeader.get().myId
cluster.blockMsgRouting(firstLeaderId)
echo "Block comunication with: " & $firstLeaderId
if firstLeaderId != RaftnodeId() and maybeLeader.isSome() and maybeLeader.get().myId != firstLeaderId:
secondLeaderId = maybeLeader.get().myId
check secondLeaderId != RaftnodeId() and firstLeaderId != secondLeaderId
test "After reaelection leader should become follower":
var timeNow = dateTime(2017, mMar, 01, 00, 00, 00, 00, utc())
var cluster = createCluster(test_ids_3, timeNow)
var leader: RaftnodeId
var firstLeaderId = RaftnodeId()
var secondLeaderId = RaftnodeId()
for i in 0..<305:
timeNow += 5.milliseconds
cluster.advance(timeNow)
var maybeLeader = cluster.getLeader()
if maybeLeader.isSome() and firstLeaderId == RaftNodeId():
# we will block the comunication and will try to elect new leader
firstLeaderId = maybeLeader.get().myId
cluster.blockMsgRouting(firstLeaderId)
echo "Block comunication with: " & $firstLeaderId
if firstLeaderId != RaftnodeId() and maybeLeader.isSome() and maybeLeader.get().myId != firstLeaderId:
secondLeaderId = maybeLeader.get().myId
cluster.allowMsgRouting(firstLeaderId)
if isMainModule:
consensusstatemachineMain()