diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..497c12f --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,170 @@ +name: CI +on: + push: + branches: + - master + pull_request: + workflow_dispatch: + +concurrency: # Cancel stale PR builds (but not push builds) + group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.sha }} + cancel-in-progress: true + +jobs: + build: + strategy: + fail-fast: false + matrix: + target: + - os: linux + cpu: amd64 + - os: linux + cpu: i386 + - os: macos + cpu: amd64 + - os: windows + cpu: amd64 + #- os: windows + #cpu: i386 + branch: [version-1-6, version-2-0, devel] + include: + - target: + os: linux + builder: ubuntu-20.04 + shell: bash + - target: + os: macos + builder: macos-12 + shell: bash + - target: + os: windows + builder: windows-2019 + shell: msys2 {0} + + defaults: + run: + shell: ${{ matrix.shell }} + + name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})' + runs-on: ${{ matrix.builder }} + continue-on-error: ${{ matrix.branch == 'devel' }} + steps: + - name: Checkout + uses: actions/checkout@v3 + + - name: Install build dependencies (Linux i386) + if: runner.os == 'Linux' && matrix.target.cpu == 'i386' + run: | + sudo dpkg --add-architecture i386 + sudo apt-fast update -qq + sudo DEBIAN_FRONTEND='noninteractive' apt-fast install \ + --no-install-recommends -yq gcc-multilib g++-multilib \ + libssl-dev:i386 + mkdir -p external/bin + cat << EOF > external/bin/gcc + #!/bin/bash + exec $(which gcc) -m32 "\$@" + EOF + cat << EOF > external/bin/g++ + #!/bin/bash + exec $(which g++) -m32 "\$@" + EOF + chmod 755 external/bin/gcc external/bin/g++ + echo '${{ github.workspace }}/external/bin' >> $GITHUB_PATH + + - name: 'Install dependencies (macOS)' + if: runner.os == 'macOS' && matrix.branch == 'devel' + run: | + brew install openssl@1.1 + ln -s $(brew --prefix)/opt/openssl/lib/libcrypto.1.1.dylib /usr/local/lib + ln -s $(brew --prefix)/opt/openssl/lib/libssl.1.1.dylib /usr/local/lib/ + + - name: MSYS2 (Windows i386) + if: runner.os == 'Windows' && matrix.target.cpu == 'i386' + uses: msys2/setup-msys2@v2 + with: + path-type: inherit + msystem: MINGW32 + install: >- + base-devel + git + mingw-w64-i686-toolchain + + - name: MSYS2 (Windows amd64) + if: runner.os == 'Windows' && matrix.target.cpu == 'amd64' + uses: msys2/setup-msys2@v2 + with: + path-type: inherit + install: >- + base-devel + git + mingw-w64-x86_64-toolchain + + - name: Restore Nim DLLs dependencies (Windows) from cache + if: runner.os == 'Windows' + id: windows-dlls-cache + uses: actions/cache@v3 + with: + path: external/dlls-${{ matrix.target.cpu }} + key: 'dlls-${{ matrix.target.cpu }}' + + - name: Install DLLs dependencies (Windows) + if: > + steps.windows-dlls-cache.outputs.cache-hit != 'true' && + runner.os == 'Windows' + run: | + mkdir -p external + curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip + 7z x -y external/windeps.zip -oexternal/dlls-${{ matrix.target.cpu }} + + - name: Path to cached dependencies (Windows) + if: > + runner.os == 'Windows' + run: | + echo "${{ github.workspace }}/external/dlls-${{ matrix.target.cpu }}" >> $GITHUB_PATH + + - name: Derive environment variables + run: | + if [[ '${{ matrix.target.cpu }}' == 'amd64' ]]; then + PLATFORM=x64 + else + PLATFORM=x86 + fi + echo "PLATFORM=$PLATFORM" >> $GITHUB_ENV + + ncpu= + MAKE_CMD="make" + case '${{ runner.os }}' in + 'Linux') + ncpu=$(nproc) + ;; + 'macOS') + ncpu=$(sysctl -n hw.ncpu) + ;; + 'Windows') + ncpu=$NUMBER_OF_PROCESSORS + MAKE_CMD="mingw32-make" + ;; + esac + [[ -z "$ncpu" || $ncpu -le 0 ]] && ncpu=1 + echo "ncpu=$ncpu" >> $GITHUB_ENV + echo "MAKE_CMD=${MAKE_CMD}" >> $GITHUB_ENV + + - name: Build Nim and Nimble + run: | + env MAKE="${MAKE_CMD} -j${ncpu}" ARCH_OVERRIDE=${PLATFORM} \ + NIM_COMMIT=${{ matrix.branch }} \ + NIMBLE_COMMIT=a4fc798838ee753f5485dd19afab22e9367eb0e7 \ + QUICK_AND_DIRTY_COMPILER=1 QUICK_AND_DIRTY_NIMBLE=1 CC=gcc \ + scripts/ci/build_nim.sh nim csources dist/nimble-latest NimBinaries + echo '${{ github.workspace }}/nim/bin' >> $GITHUB_PATH + + - name: Run tests + run: | + if [[ "${{ matrix.target.os }}" == "windows" ]]; then + # https://github.com/status-im/nimbus-eth2/issues/3121 + export NIMFLAGS="-d:nimRawSetjmp" + fi + nim --version + nimble --version + nimble test \ No newline at end of file diff --git a/.gitignore b/.gitignore index 1c3fa9c..1255ce0 100644 --- a/.gitignore +++ b/.gitignore @@ -5,4 +5,6 @@ *.exe *.out nimcache/ -build/ \ No newline at end of file +build/ +nimbledeps/ +.VSCodeCounter \ No newline at end of file diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..6ef4dd2 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,7 @@ +{ + "nim.projectMapping": [{ + "projectFile": "raft.nim", + "fileRegex": ".*\\.nim" + } + ] +} \ No newline at end of file diff --git a/README.md b/README.md index 4dda51e..2fefc4b 100644 --- a/README.md +++ b/README.md @@ -5,3 +5,19 @@ This project aims to develop an implementation of the Raft consensus protocol th We plan to leverage the implementation to create a highly-efficient setup for operating a redundant set of Nimbus beacon nodes and/or validator clients that rely on BLS threshold signatures to achieve improved resilience and security. Further details can be found in our roadmap here: https://github.com/status-im/nimbus-eth2/issues/3416 + +This project is heavily inspired by Raft implementation in ScyllaDB + +https://github.com/scylladb/scylladb/tree/master/raft + +# Design goals + +The main goal is to separate implementation of the raft state machin from the other implementation details like (storage, rpc etc) +In order to achive this we want to keep the State machine absolutly deterministic every interaction the the world like +networking, logging, acquiring current time, random number generation, disc operation etc must happened trough the state machine interface. +It will ensure better testability and integrability. + + +# Run test + +`nimble test` \ No newline at end of file diff --git a/config.nim b/config.nim new file mode 100644 index 0000000..2d680c1 --- /dev/null +++ b/config.nim @@ -0,0 +1,109 @@ +# Set up paths +--noNimblePath +when withDir(thisDir(), system.fileExists("nimble.paths")): + include "nimble.paths" + +--path:"src" + +# Turn off `libbacktrace` +--define:disable_libbacktrace + +# Configuration synced with nwaku's - https://github.com/waku-org/nwaku/blob/master/config.nims +# ---------------------------------------------------- nwaku config ---------------------------------------------------- +if defined(release): + switch("nimcache", thisDir() & "/nimcache/release/$projectName") +else: + switch("nimcache", thisDir() & "/nimcache/debug/$projectName") + +if defined(windows): + # disable timestamps in Windows PE headers - https://wiki.debian.org/ReproducibleBuilds/TimestampsInPEBinaries + switch("passL", "-Wl,--no-insert-timestamp") + # increase stack size + switch("passL", "-Wl,--stack,8388608") + # https://github.com/nim-lang/Nim/issues/4057 + --tlsEmulation:off + if defined(i386): + # set the IMAGE_FILE_LARGE_ADDRESS_AWARE flag so we can use PAE, if enabled, and access more than 2 GiB of RAM + switch("passL", "-Wl,--large-address-aware") + + # The dynamic Chronicles output currently prevents us from using colors on Windows + # because these require direct manipulations of the stdout File object. + switch("define", "chronicles_colors=off") + +# https://github.com/status-im/nimbus-eth2/blob/stable/docs/cpu_features.md#ssse3-supplemental-sse3 +# suggests that SHA256 hashing with SSSE3 is 20% faster than without SSSE3, so +# given its near-ubiquity in the x86 installed base, it renders a distribution +# build more viable on an overall broader range of hardware. +# +if defined(disableMarchNative): + if defined(i386) or defined(amd64): + if defined(macosx): + # macOS Catalina is EOL as of 2022-09 + # https://support.apple.com/kb/sp833 + # "macOS Big Sur - Technical Specifications" lists current oldest + # supported models: MacBook (2015 or later), MacBook Air (2013 or later), + # MacBook Pro (Late 2013 or later), Mac mini (2014 or later), iMac (2014 + # or later), iMac Pro (2017 or later), Mac Pro (2013 or later). + # + # These all have Haswell or newer CPUs. + # + # This ensures AVX2, AES-NI, PCLMUL, BMI1, and BMI2 instruction set support. + switch("passC", "-march=haswell -mtune=generic") + switch("passL", "-march=haswell -mtune=generic") + else: + if defined(marchOptimized): + # https://github.com/status-im/nimbus-eth2/blob/stable/docs/cpu_features.md#bmi2--adx + switch("passC", "-march=broadwell -mtune=generic") + switch("passL", "-march=broadwell -mtune=generic") + else: + switch("passC", "-mssse3") + switch("passL", "-mssse3") +elif defined(macosx) and defined(arm64): + # Apple's Clang can't handle "-march=native" on M1: https://github.com/status-im/nimbus-eth2/issues/2758 + switch("passC", "-mcpu=apple-m1") + switch("passL", "-mcpu=apple-m1") +else: + switch("passC", "-march=native") + switch("passL", "-march=native") + if defined(windows): + # https://gcc.gnu.org/bugzilla/show_bug.cgi?id=65782 + # ("-fno-asynchronous-unwind-tables" breaks Nim's exception raising, sometimes) + switch("passC", "-mno-avx512f") + switch("passL", "-mno-avx512f") + + +--threads:on +--opt:speed +--excessiveStackTrace:on +# enable metric collection +--define:metrics +# for heap-usage-by-instance-type metrics and object base-type strings +--define:nimTypeNames + +switch("define", "withoutPCRE") + +# the default open files limit is too low on macOS (512), breaking the +# "--debugger:native" build. It can be increased with `ulimit -n 1024`. +if not defined(macosx): + # add debugging symbols and original files and line numbers + --debugger:native +--define:nimOldCaseObjects # https://github.com/status-im/nim-confutils/issues/9 + +# `switch("warning[CaseTransition]", "off")` fails with "Error: invalid command line option: '--warning[CaseTransition]'" +switch("warning", "CaseTransition:off") + +# The compiler doth protest too much, methinks, about all these cases where it can't +# do its (N)RVO pass: https://github.com/nim-lang/RFCs/issues/230 +switch("warning", "ObservableStores:off") + +# Too many false positives for "Warning: method has lock level , but another method has 0 [LockLevel]" +switch("warning", "LockLevel:off") +# ---------------------------------------------------------------------------------------------------------------------- + +# Discovery configuration +switch("define", "discv5_protocol_id=d5waku") + +# Logging configuration +--define:chronicles_line_numbers +switch("define", "chronicles_log_level=DEBUG") +switch("define", "chronicles_runtime_filtering=on") diff --git a/doc/Ongaro.D-Raft-Extended.pdf b/doc/Ongaro.D-Raft-Extended.pdf new file mode 100644 index 0000000..34966b1 Binary files /dev/null and b/doc/Ongaro.D-Raft-Extended.pdf differ diff --git a/doc/Ongaro.D-Stanford-PhD-Thessis.pdf b/doc/Ongaro.D-Stanford-PhD-Thessis.pdf new file mode 100644 index 0000000..5707a2e Binary files /dev/null and b/doc/Ongaro.D-Stanford-PhD-Thessis.pdf differ diff --git a/nim.projectMapping b/nim.projectMapping new file mode 100644 index 0000000..cbdc928 --- /dev/null +++ b/nim.projectMapping @@ -0,0 +1,8 @@ +{ + "nim.provider": "lsp", + "nim.projectMapping": [{ + // everything else - use main.nim as root. + "projectFile": "raft.nim", + "fileRegex": ".*\\.nim" + }] +} \ No newline at end of file diff --git a/nimble.lock b/nimble.lock new file mode 100644 index 0000000..59db974 --- /dev/null +++ b/nimble.lock @@ -0,0 +1,155 @@ +{ + "version": 2, + "packages": { + "unittest2": { + "version": "0.2.1", + "vcsRevision": "262b697f38d6b6f1e7462d3b3ab81d79b894e336", + "url": "https://github.com/status-im/nim-unittest2", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "1bac3a8355441edeed1ef3134e7436d6fb5d4498" + } + }, + "stew": { + "version": "0.1.0", + "vcsRevision": "9958aac68a7613a3312fa96dd2f3b29caf17772e", + "url": "https://github.com/status-im/nim-stew", + "downloadMethod": "git", + "dependencies": [ + "unittest2" + ], + "checksums": { + "sha1": "4eb2b0c4b0fe9817ee19202e8723d46c284f2875" + } + }, + "isaac": { + "version": "0.1.3", + "vcsRevision": "45a5cbbd54ff59ba3ed94242620c818b9aad1b5b", + "url": "https://github.com/pragmagic/isaac/", + "downloadMethod": "git", + "dependencies": [], + "checksums": { + "sha1": "05c3583a954715d84b0bf1be97f9a503249e9cdf" + } + }, + "faststreams": { + "version": "0.3.0", + "vcsRevision": "422971502bd641703bf78a27cb20429e77fcfb8b", + "url": "https://github.com/status-im/nim-faststreams", + "downloadMethod": "git", + "dependencies": [ + "stew", + "unittest2" + ], + "checksums": { + "sha1": "64045de53dade90c36ba5a75f51603725c5b0f30" + } + }, + "serialization": { + "version": "0.2.2", + "vcsRevision": "4d541ec43454809904fc4c3c0a7436410ad597d2", + "url": "https://github.com/status-im/nim-serialization.git", + "downloadMethod": "git", + "dependencies": [ + "faststreams", + "unittest2", + "stew" + ], + "checksums": { + "sha1": "1dcdb29f17d0aff295e7e57edf530b1e16fb6c59" + } + }, + "json_serialization": { + "version": "0.2.2", + "vcsRevision": "3f1ce24ee116daedbc9c8be525e63ec03e185a28", + "url": "https://github.com/status-im/nim-json-serialization.git", + "downloadMethod": "git", + "dependencies": [ + "serialization", + "stew" + ], + "checksums": { + "sha1": "da0d38b775f222703784b273225fe89267430482" + } + }, + "httputils": { + "version": "0.3.0", + "vcsRevision": "3b491a40c60aad9e8d3407443f46f62511e63b18", + "url": "https://github.com/status-im/nim-http-utils", + "downloadMethod": "git", + "dependencies": [ + "stew", + "unittest2" + ], + "checksums": { + "sha1": "1331f33585eda05d1e50385fa7871c3bf2a449d7" + } + }, + "uuids": { + "version": "0.1.12", + "vcsRevision": "42052ba362a9cd4685463edb3781beeb9b8e547e", + "url": "https://github.com/pragmagic/uuids/", + "downloadMethod": "git", + "dependencies": [ + "isaac" + ], + "checksums": { + "sha1": "154a31d6f5428c2863c48a057b7143ff9a6e4613" + } + }, + "testutils": { + "version": "0.5.0", + "vcsRevision": "dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34", + "url": "https://github.com/status-im/nim-testutils", + "downloadMethod": "git", + "dependencies": [ + "unittest2" + ], + "checksums": { + "sha1": "756d0757c4dd06a068f9d38c7f238576ba5ee897" + } + }, + "bearssl": { + "version": "0.2.1", + "vcsRevision": "d55d3a86d7ec3ad11b244e17b3bad490bfbd076d", + "url": "https://github.com/status-im/nim-bearssl.git", + "downloadMethod": "git", + "dependencies": [ + "unittest2" + ], + "checksums": { + "sha1": "5327c983483c4dd465347c6b8a974239c7c6c612" + } + }, + "chronicles": { + "version": "0.10.3", + "vcsRevision": "ccbb7566d1a06bfc1ec42dd8da74a47f1d3b3f4b", + "url": "https://github.com/status-im/nim-chronicles.git", + "downloadMethod": "git", + "dependencies": [ + "testutils", + "json_serialization" + ], + "checksums": { + "sha1": "09ae5c46be94aa60d2b0ca80c215a142f94e3603" + } + }, + "chronos": { + "version": "3.2.0", + "vcsRevision": "ba143e029f35fd9b4cd3d89d007cc834d0d5ba3c", + "url": "https://github.com/status-im/nim-chronos", + "downloadMethod": "git", + "dependencies": [ + "stew", + "bearssl", + "httputils", + "unittest2" + ], + "checksums": { + "sha1": "5783067584ac6812eb64b8454ea6f9c97ff1262a" + } + } + }, + "tasks": {} +} diff --git a/raft.nimble b/raft.nimble index 2baa6d2..a8a7829 100644 --- a/raft.nimble +++ b/raft.nimble @@ -14,14 +14,17 @@ version = "0.0.1" author = "Status Research & Development GmbH" description = "raft consensus in nim" license = "Apache License 2.0" +srcDir = "src" +installExt = @["nim"] skipDirs = @["tests"] +bin = @["raft"] -requires "nim >= 1.6.0" + +requires "nim >= 1.6.14" requires "stew >= 0.1.0" -requires "nimcrypto >= 0.5.4" requires "unittest2 >= 0.0.4" -requires "chronicles >= 0.10.2" -requires "eth >= 1.0.0" -requires "chronos >= 3.2.0" +requires "uuids >= 0.1.11" +requires "chronicles >= 0.10.3" +requires "chronos >= 3.0.11" + -# Helper functions \ No newline at end of file diff --git a/raft.nims b/raft.nims new file mode 100644 index 0000000..86bc93c --- /dev/null +++ b/raft.nims @@ -0,0 +1,19 @@ +proc buildLibrary(name: string, srcDir = "./", params = "", `type` = "static") = + if not dirExists "build": + mkDir "build" + # allow something like "nim nimbus --verbosity:0 --hints:off nimbus.nims" + var extra_params = params + for i in 2../dev/null + if [[ -n "${NIM_COMMIT}" ]]; then + # support old Git versions, like the one from Ubuntu-18.04 + git restore . 2>/dev/null || git reset --hard + if ! git checkout -q ${NIM_COMMIT} 2>/dev/null; then + # Pay the price for a non-default NIM_COMMIT here, by fetching everything. + # (This includes upstream branches and tags that might be missing from our fork.) + git remote add upstream https://github.com/nim-lang/Nim + git fetch --all --tags --quiet + git checkout -q ${NIM_COMMIT} + fi + # In case the local branch diverged and a fast-forward merge is not possible. + git fetch || true + git reset -q --hard origin/${NIM_COMMIT} 2>/dev/null || true + # In case NIM_COMMIT is a local branch that's behind the remote one it's tracking. + git pull -q 2>/dev/null || true + git checkout -q ${NIM_COMMIT} + # We can't use "rev-parse" here, because it would return the tag object's + # hash instead of the commit hash, when NIM_COMMIT is a tag. + NIM_COMMIT_HASH="$(git rev-list -n 1 ${NIM_COMMIT})" + else + # NIM_COMMIT is empty, so assume the commit we need is already checked out + NIM_COMMIT_HASH="$(git rev-list -n 1 HEAD)" + fi + popd >/dev/null + + if [[ -n "$CI_CACHE" && -d "$CI_CACHE" ]]; then + cp -a "$CI_CACHE"/* "$NIM_DIR"/bin/ || true # let this one fail with an empty cache dir + fi + + # Delete old Nim binaries, to put a limit on how much storage we use. + for F in "$(ls -t "${NIM_DIR}"/bin/nim_commit_* 2>/dev/null | tail -n +$((MAX_NIM_BINARIES + 1)))"; do + if [[ -e "${F}" ]]; then + rm "${F}" + fi + done + + # Compare the last built commit to the one requested. + # Handle the scenario where our symlink is manually deleted by the user. + if [[ -e "${NIM_DIR}/bin/last_built_commit" && \ + -e "${NIM_DIR}/bin/nim${EXE_SUFFIX}" && \ + "$(cat "${NIM_DIR}/bin/last_built_commit")" == "${NIM_COMMIT_HASH}" ]]; then + return $NO_REBUILD + elif [[ -e "${NIM_DIR}/bin/nim_commit_${NIM_COMMIT_HASH}" ]]; then + # we built the requested commit in the past, so we simply reuse it + rm -f "${NIM_DIR}/bin/nim${EXE_SUFFIX}" + ln -s "nim_commit_${NIM_COMMIT_HASH}" "${NIM_DIR}/bin/nim${EXE_SUFFIX}" + echo ${NIM_COMMIT_HASH} > "${NIM_DIR}/bin/last_built_commit" + return $NO_REBUILD + else + return $REBUILD + fi +} + +build_nim() { + echo -e "$NIM_BUILD_MSG" + # [[ "$V" == "0" ]] && exec &>/dev/null + + # working directory + pushd "$NIM_DIR" + if grep -q "skipIntegrityCheck" koch.nim && [ "${NIM_COMMIT}" != "version-1-6" ]; then + echo "in if" + # Run Nim buildchain, with matching dependency versions + # - CSOURCES_REPO from Nim/config/build_config.txt (nim_csourcesUrl) + # - CSOURCES_COMMIT from Nim/config/build_config.txt (nim_csourcesHash) + # - NIMBLE_REPO from Nim/koch.nim (bundleNimbleExe) + # - NIMBLE_COMMIT from Nim/koch.nim (NimbleStableCommit) + . ci/funs.sh + NIMCORES=1 nimBuildCsourcesIfNeeded $UCPU + bin/nim c --noNimblePath --skipUserCfg --skipParentCfg --warnings:off --hints:off koch + ./koch --skipIntegrityCheck boot -d:release --skipUserCfg --skipParentCfg --warnings:off --hints:off + if [[ "${QUICK_AND_DIRTY_COMPILER}" == "0" ]]; then + # We want tools + ./koch tools -d:release --skipUserCfg --skipParentCfg --warnings:off --hints:off + elif [[ "${QUICK_AND_DIRTY_NIMBLE}" != "0" ]]; then + # We just want nimble + ./koch nimble -d:release --skipUserCfg --skipParentCfg --warnings:off --hints:off + fi + else + # Git commits + echo "in else" + + : ${CSOURCES_V1_COMMIT:=561b417c65791cd8356b5f73620914ceff845d10} + : ${CSOURCES_V2_COMMIT:=86742fb02c6606ab01a532a0085784effb2e753e} + : ${CSOURCES_V1_REPO:=https://github.com/nim-lang/csources_v1.git} + : ${CSOURCES_V2_REPO:=https://github.com/nim-lang/csources_v2.git} + + # After this Nim commit, use csources v2 + : ${CSOURCES_V2_START_COMMIT:=f7c203fb6c89b5cef83c4f326aeb23ef8c4a2c40} + : ${NIMBLE_REPO:=https://github.com/nim-lang/nimble.git} + : ${NIMBLE_COMMIT:=a4fc798838ee753f5485dd19afab22e9367eb0e7} # 0.13.1 + + # Custom buildchain for older versions + # TODO Remove this once the default NIM_COMMIT supports `--skipIntegrityCheck` + # We will still be able to compile older versions by removing the flag, + # which will just waste a bit of CPU + + # Git repos for csources and Nimble + if [[ ! -d "$CSOURCES_DIR" ]]; then + if git merge-base --is-ancestor $CSOURCES_V2_START_COMMIT $NIM_COMMIT_HASH; then + CSOURCES_REPO=$CSOURCES_V2_REPO + CSOURCES_COMMIT=$CSOURCES_V2_COMMIT + else + CSOURCES_REPO=$CSOURCES_V1_REPO + CSOURCES_COMMIT=$CSOURCES_V1_COMMIT + fi + + mkdir -p "$CSOURCES_DIR" + pushd "$CSOURCES_DIR" + git clone $CSOURCES_REPO . + git checkout $CSOURCES_COMMIT + popd + fi + if [[ "$CSOURCES_DIR" != "csources" ]]; then + rm -rf csources + ln -s "$CSOURCES_DIR" csources + fi + + # bootstrap the Nim compiler and build the tools + rm -f bin/{nim,nim_csources} + pushd csources + if [[ "$ON_WINDOWS" == "0" ]]; then + $MAKE $UCPU clean + $MAKE $UCPU LD=$CC + else + $MAKE myos=windows $UCPU clean + $MAKE myos=windows $UCPU CC=gcc LD=gcc + fi + popd + if [[ -e csources/bin ]]; then + rm -f bin/nim bin/nim_csources + cp -a csources/bin/nim bin/nim + cp -a csources/bin/nim bin/nim_csources + rm -rf csources/bin + else + cp -a bin/nim bin/nim_csources + fi + if [[ "$QUICK_AND_DIRTY_COMPILER" == "0" ]]; then + sed \ + -e 's/koch$/--warnings:off --hints:off koch/' \ + -e 's/koch boot/koch boot --warnings:off --hints:off/' \ + -e '/nimBuildCsourcesIfNeeded/d' \ + build_all.sh > build_all_custom.sh + sh build_all_custom.sh + rm build_all_custom.sh + else + # Don't re-build it multiple times until we get identical + # binaries, like "build_all.sh" does. Don't build any tools + # either. This is all about build speed, not developer comfort. + bin/nim_csources \ + c \ + --compileOnly \ + --nimcache:nimcache \ + -d:release \ + --skipUserCfg \ + --skipParentCfg \ + --warnings:off \ + --hints:off \ + compiler/nim.nim + bin/nim_csources \ + jsonscript \ + --nimcache:nimcache \ + --skipUserCfg \ + --skipParentCfg \ + compiler/nim.nim + cp -a compiler/nim bin/nim1 + # If we stop here, we risk ending up with a buggy compiler: + # https://github.com/status-im/nimbus-eth2/pull/2220 + # https://github.com/status-im/nimbus-eth2/issues/2310 + bin/nim1 \ + c \ + --compileOnly \ + --nimcache:nimcache \ + -d:release \ + --skipUserCfg \ + --skipParentCfg \ + --warnings:off \ + --hints:off \ + compiler/nim.nim + bin/nim1 \ + jsonscript \ + --nimcache:nimcache \ + --skipUserCfg \ + --skipParentCfg \ + compiler/nim.nim + rm -f bin/nim + cp -a compiler/nim bin/nim + rm bin/nim1 + + if [[ ! -d "$NIMBLE_DIR" ]]; then + mkdir -p "$NIMBLE_DIR" + pushd "$NIMBLE_DIR" + git clone $NIMBLE_REPO . + git checkout $NIMBLE_COMMIT + pwd + ../../bin/nim r src/nimblepkg/private/clone.nim + # we have to delete .git or koch.nim will checkout a branch tip, overriding our target commit + rm -rf .git + popd + fi + if [[ "$NIMBLE_DIR" != "dist/nimble" ]]; then + mkdir -p dist + rm -rf dist/nimble + ln -s ../"$NIMBLE_DIR" dist/nimble + fi + # Do we want Nimble in this quick build? + if [[ "${QUICK_AND_DIRTY_NIMBLE}" != "0" ]]; then + bin/nim c -d:release --noNimblePath --skipUserCfg --skipParentCfg dist/nimble/src/nimble.nim + mv dist/nimble/src/nimble bin/ + fi + fi + fi + + if [[ "$QUICK_AND_DIRTY_COMPILER" == "0" || "${QUICK_AND_DIRTY_NIMBLE}" != "0" ]]; then + # Nimble needs a CA cert + rm -f bin/cacert.pem + curl -LsS -o bin/cacert.pem https://curl.se/ca/cacert.pem || echo "Warning: 'curl' failed to download a CA cert needed by Nimble. Ignoring it." + fi + + # record the built commit + echo ${NIM_COMMIT_HASH} > bin/last_built_commit + + # create the symlink + mv bin/nim bin/nim_commit_${NIM_COMMIT_HASH} + ln -s nim_commit_${NIM_COMMIT_HASH} bin/nim${EXE_SUFFIX} + + # update the CI cache + popd # we were in $NIM_DIR + if [[ -n "$CI_CACHE" ]]; then + rm -rf "$CI_CACHE" + mkdir "$CI_CACHE" + cp "$NIM_DIR"/bin/* "$CI_CACHE"/ + fi +} + +if nim_needs_rebuilding; then + build_nim +fi \ No newline at end of file diff --git a/raft.nim b/src/raft.nim similarity index 76% rename from raft.nim rename to src/raft.nim index f6e04bb..4690588 100644 --- a/raft.nim +++ b/src/raft.nim @@ -7,8 +7,7 @@ # This file may not be copied, modified, or distributed except according to # those terms. -import - raft/raft_api - -export - raft_api, types, protocol +import raft/consensus_state_machine +import raft/types +export consensus_state_machine +export types diff --git a/src/raft/consensus_state_machine.nim b/src/raft/consensus_state_machine.nim new file mode 100644 index 0000000..d88ed1c --- /dev/null +++ b/src/raft/consensus_state_machine.nim @@ -0,0 +1,542 @@ +# nim-raft +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +import types +import log +import tracker +import state + +import std/[times] +import std/random + +type + RaftRpcMessageType* = enum + VoteRequest = 0, + VoteReply = 1, + AppendRequest = 2, + AppendReply = 3, + InstallSnapshot = 4, + SnapshotReply = 5 + + RaftRpcCode* = enum + Rejected = 0, + Accepted = 1 + + DebugLogLevel* = enum + None = 0 + Critical = 1, + Error = 2, + Warning = 3, + Debug = 4, + Info = 5, + All = 6, + + DebugLogEntry* = object + level*: DebugLogLevel + time*: times.DateTime + nodeId*: RaftnodeId + state*: RaftNodeState + msg*: string + + + RaftRpcAppendRequest* = object + previousTerm*: RaftNodeTerm + previousLogIndex*: RaftLogIndex + commitIndex*: RaftLogIndex + entries*: seq[LogEntry] + + RaftRpcAppendReplyRejected* = object + nonMatchingIndex: RaftLogIndex + lastIdx: RaftLogIndex + + RaftRpcAppendReplyAccepted* = object + lastNewIndex: RaftLogIndex + + RaftRpcAppendReply* = object + commitIndex*: RaftLogIndex + term*: RaftNodeTerm + case result: RaftRpcCode: + of Accepted: accepted: RaftRpcAppendReplyAccepted + of Rejected: rejected: RaftRpcAppendReplyRejected + + RaftRpcVoteRequest* = object + currentTerm*: RaftNodeTerm + lastLogIndex*: RaftLogIndex + lastLogTerm*: RaftNodeTerm + force*: bool + + RaftRpcVoteReply* = object + currentTerm*: RaftNodeTerm + voteGranted*: bool + + RaftSnapshot* = object + index: RaftLogIndex + term: RaftNodeTerm + config: RaftConfig + snapshotId: RaftSnapshotId + + RaftInstallSnapshot* = object + term: RaftNodeTerm + snapshot: RaftSnapshot + + RaftSnapshotReply* = object + term: RaftNodeTerm + success: bool + + RaftRpcMessage* = object + currentTerm*: RaftNodeTerm + sender*: RaftNodeId + receiver*: RaftNodeId + case kind*: RaftRpcMessageType + of VoteRequest: voteRequest*: RaftRpcVoteRequest + of VoteReply: voteReply*: RaftRpcVoteReply + of AppendRequest: appendRequest*: RaftRpcAppendRequest + of AppendReply: appendReply*: RaftRpcAppendReply + of InstallSnapshot: installSnapshot*: RaftInstallSnapshot + of SnapshotReply: snapshotReply*: RaftSnapshotReply + + RaftStateMachineOutput* = object + logEntries*: seq[LogEntry] + # Entries that should be applyed to the "User" State machine + committed*: seq[LogEntry] + messages*: seq[RaftRpcMessage] + debugLogs*: seq[DebugLogEntry] + term*: RaftNodeTerm + votedFor*: Option[RaftNodeId] + stateChange*: bool + + RaftLastPollState* = object + term*: RaftNodeTerm + votedFor*: RaftNodeId + commitIndex: RaftLogIndex + + + RaftStateMachine* = object + myId*: RaftNodeId + term*: RaftNodeTerm + commitIndex: RaftLogIndex + toCommit: RaftLogIndex + log: RaftLog + output: RaftStateMachineOutput + lastUpdate: times.Time + votedFor: RaftNodeId + currentLeader: RaftNodeId + pingLeader: bool + config: RaftConfig + + lastElectionTime: times.DateTime + randomizedElectionTime: times.Duration + heartbeatTime: times.Duration + timeNow: times.DateTime + startTime: times.DateTime + electionTimeout: times.Duration + randomGenerator: Rand + + observedState: RaftLastPollState + state*: RaftStateMachineState + + +func observe(ps: var RaftLastPollState, sm: RaftStateMachine) = + ps.term = sm.term + ps.votedFor = sm.votedFor + ps.commitIndex = sm.commitIndex + +func eq(ps: RaftLastPollState, sm: RaftStateMachine): bool = + return ps.term == sm.term and + ps.votedFor == sm.votedFor and + ps.commitIndex == sm.commitIndex + +func leader*(sm: var RaftStateMachine): var LeaderState = + return sm.state.leader + +func follower*(sm: var RaftStateMachine): var FollowerState = + return sm.state.follower + +func candidate*(sm: var RaftStateMachine): var CandidateState = + return sm.state.candidate + +func addDebugLogEntry(sm: var RaftStateMachine, level: DebugLogLevel, msg: string) = + sm.output.debugLogs.add(DebugLogEntry(time: sm.timeNow, state: sm.state.state, level: level, msg: msg, nodeId: sm.myId)) + +func debug*(sm: var RaftStateMachine, log: string) = + sm.addDebugLogEntry(DebugLogLevel.Debug, log) + +func warning*(sm: var RaftStateMachine, log: string) = + sm.addDebugLogEntry(DebugLogLevel.Warning, log) + +func error*(sm: var RaftStateMachine, log: string) = + sm.addDebugLogEntry(DebugLogLevel.Error, log) + +func info*(sm: var RaftStateMachine, log: string) = + sm.addDebugLogEntry(DebugLogLevel.Info, log) + +func critical*(sm: var RaftStateMachine, log: string) = + sm.addDebugLogEntry(DebugLogLevel.Critical, log) + +func resetElectionTimeout*(sm: var RaftStateMachine) = + # TODO actually pick random time + sm.randomizedElectionTime = sm.electionTimeout + times.initDuration(milliseconds = 100 + sm.randomGenerator.rand(200)) + +func initRaftStateMachine*(id: RaftnodeId, currentTerm: RaftNodeTerm, log: RaftLog, commitIndex: RaftLogIndex, config: RaftConfig, now: times.DateTime, randomGenerator: Rand): RaftStateMachine = + var sm = RaftStateMachine() + sm.term = currentTerm + sm.log = log + sm.commitIndex = commitIndex + sm.state = initFollower(RaftNodeId()) + sm.config = config + sm.lastElectionTime = now + sm.timeNow = now + sm.startTime = now + sm.myId = id + sm.electionTimeout = times.initDuration(milliseconds = 100) + sm.heartbeatTime = times.initDuration(milliseconds = 50) + sm.randomGenerator = randomGenerator + sm.resetElectionTimeout() + sm.observedState.observe(sm) + return sm + +func findFollowerProggressById(sm: var RaftStateMachine, id: RaftNodeId): Option[RaftFollowerProgressTracker] = + return sm.leader.tracker.find(id) + +func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendRequest) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendRequest, appendRequest: request)) + +func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcAppendReply) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.AppendReply, appendReply: request)) + +func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteRequest) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteRequest, voteRequest: request)) + +func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftRpcVoteReply) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.VoteReply, voteReply: request)) + +func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftSnapshotReply) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.SnapshotReply, snapshotReply: request)) + +func sendToImpl*(sm: var RaftStateMachine, id: RaftNodeId, request: RaftInstallSnapshot) = + sm.output.messages.add(RaftRpcMessage(currentTerm: sm.term, receiver: id, sender: sm.myId, kind: RaftRpcMessageType.InstallSnapshot, installSnapshot: request)) + + +func sendTo[MsgType](sm: var RaftStateMachine, id: RaftNodeId, request: MsgType) = + sm.debug "Send to " & $id & $request + if sm.state.isLeader: + var follower = sm.findFollowerProggressById(id) + if follower.isSome: + follower.get().lastMessageAt = sm.timeNow + else: + sm.warning "Follower not found: " & $id + sm.debug $sm.leader + sm.sendToImpl(id, request) + +func createVoteRequest*(sm: var RaftStateMachine): RaftRpcMessage = + return RaftRpcMessage(currentTerm: sm.term, sender: sm.myId, kind: VoteRequest, voteRequest: RaftRpcVoteRequest()) + +func replicateTo*(sm: var RaftStateMachine, follower: RaftFollowerProgressTracker) = + if follower.nextIndex > sm.log.lastIndex: + return + + var previousTerm = sm.log.termForIndex(follower.nextIndex - 1) + sm.debug "replicate to " & $follower[] + if previousTerm.isSome: + let request = RaftRpcAppendRequest( + previousTerm: previousTerm.get(), + previousLogIndex: follower.nextIndex - 1, + commitIndex: sm.commitIndex, + entries: @[sm.log.getEntryByIndex(follower.nextIndex)]) + follower.nextIndex += 1 + sm.sendTo(follower.id, request) + else: + # TODO: we add support for snapshots + let request = RaftRpcAppendRequest( + previousTerm: 0, + previousLogIndex: 1, + commitIndex: sm.commitIndex, + entries: @[sm.log.getEntryByIndex(follower.nextIndex)]) + follower.nextIndex += 1 + sm.sendTo(follower.id, request) + +func replicate*(sm: var RaftStateMachine) = + if sm.state.isLeader: + for followerIndex in 0.. 1: + previousTerm = sm.log.termForIndex(follower.nextIndex - 1).get() + let request = RaftRpcAppendRequest( + previousTerm: previousTerm, + previousLogIndex: follower.nextIndex - 1, + commitIndex: sm.commitIndex, + entries: @[]) + sm.sendTo(follower.id, request) + +func tickLeader*(sm: var RaftStateMachine, now: times.DateTime) = + sm.timeNow = now + + # if sm.lastElectionTime - sm.timeNow > sm.electionTimeout: + # sm.becomeFollower(RaftnodeId()) + # return + + sm.lastElectionTime = now + if not sm.state.isLeader: + sm.error "tickLeader can be called only on the leader" + return + for followerIndex in 0.. sm.heartbeatTime: + sm.heartbeat(follower) + # TODO: implement step down logic + +func tick*(sm: var RaftStateMachine, now: times.DateTime) = + sm.info "Term: " & $sm.term & " commit idx " & $sm.commitIndex & " Time since last update: " & $(now - sm.timeNow).inMilliseconds & "ms time until election:" & $(sm.randomizedElectionTime - (sm.timeNow - sm.lastElectionTime)).inMilliseconds & "ms" + sm.timeNow = now + if sm.state.isLeader: + sm.tickLeader(now); + elif sm.state.isFollower and sm.timeNow - sm.lastElectionTime > sm.randomizedElectionTime: + sm.debug "Become candidate" + sm.becomeCandidate() + +func commit(sm: var RaftStateMachine) = + if not sm.state.isLeader: + return + var newIndex = sm.commitIndex + var nextIndex = sm.commitIndex + 1 + while nextIndex < sm.log.nextIndex: + var replicationCnt = 1 + for p in sm.leader.tracker.progress: + if p.matchIndex > newIndex: + replicationCnt += 1 + sm.debug "replication count: " & $replicationCnt & " for log index: " & $nextIndex + if replicationCnt >= (sm.leader.tracker.progress.len div 2 + 1): + sm.debug "Commit index: " & $nextIndex + sm.commitIndex = nextIndex; + nextIndex += 1 + newIndex += 1 + else: + break + +func poll*(sm: var RaftStateMachine): RaftStateMachineOutput = + # Should initiate replication if we have new entries + if sm.state.isLeader: + sm.replicate() + sm.commit() + sm.output.term = sm.term + if sm.observedState.commitIndex < sm.commitIndex: + for i in (sm.observedState.commitIndex + 1)..<(sm.commitIndex + 1): + sm.output.committed.add(sm.log.getEntryByIndex(i)) + + if sm.votedFor != RaftnodeId(): + sm.output.votedFor = some(sm.votedFor) + + sm.observedState.observe(sm) + let output = sm.output + sm.output = RaftStateMachineOutput() + return output + +func appendEntryReply*(sm: var RaftStateMachine, fromId: RaftNodeId, reply: RaftRpcAppendReply) = + if not sm.state.isLeader: + sm.debug "You can't append append reply to the follower" + return + var follower = sm.findFollowerProggressById(fromId) + if not follower.isSome: + sm.debug "Can't find the follower" + return + follower.get().commitIndex = max(follower.get().commitIndex, reply.commitIndex) + case reply.result: + of RaftRpcCode.Accepted: + let lastIndex = reply.accepted.lastNewIndex + sm.debug "Accpeted message from" & $fromId & " last log index: " & $lastIndex + follower.get().accepted(lastIndex) + # TODO: add leader stepping down logic here + if not sm.state.isLeader: + return + of RaftRpcCode.Rejected: + if reply.rejected.nonMatchingIndex == 0 and reply.rejected.lastIdx == 0: + sm.replicateTo(follower.get()) + follower.get().nextIndex = min(reply.rejected.nonMatchingIndex, reply.rejected.lastIdx + 1) + # if commit apply configuration that removes current follower + # we should take it again + var follower2 = sm.findFollowerProggressById(fromId) + if follower2.isSome: + sm.replicateTo(follower2.get()) + +func advanceCommitIdx(sm: var RaftStateMachine, leaderIdx: RaftLogIndex) = + let newIdx = min(leaderIdx, sm.log.lastIndex) + if newIdx > sm.commitIndex: + sm.debug "Commit index is changed. Old:" & $sm.commitIndex & " New:" & $newIdx + sm.commitIndex = newIdx + # TODO: signal the output for the update + +func appendEntry*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcAppendRequest) = + if not sm.state.isFollower: + sm.debug "You can't append append request to the non follower" + return + let (match, term) = sm.log.matchTerm(request.previousLogIndex, request.previousTerm) + if not match: + let rejected = RaftRpcAppendReplyRejected(nonMatchingIndex: request.previousLogIndex, lastIdx: sm.log.lastIndex) + let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected) + sm.sendTo(fromId, responce) + sm.debug "Reject to apply the entry" + for entry in request.entries: + sm.log.appendAsFollower(entry) + sm.advanceCommitIdx(request.commitIndex) + let accepted = RaftRpcAppendReplyAccepted(lastNewIndex: sm.log.lastIndex) + let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Accepted, accepted: accepted) + sm.sendTo(fromId, responce) + +func requestVote*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteRequest) = + let canVote = sm.votedFor == fromId or (sm.votedFor == RaftNodeId() and sm.currentLeader == RaftNodeId()) + if canVote and sm.log.isUpToDate(request.lastLogIndex, request.lastLogTerm): + let responce = RaftRpcVoteReply(currentTerm: sm.term, voteGranted: true) + sm.sendTo(fromId, responce) + else: + let responce: RaftRpcVoteReply = RaftRpcVoteReply(currentTerm: sm.term, voteGranted: false) + sm.sendTo(fromId, responce) + +func requestVoteReply*(sm: var RaftStateMachine, fromId: RaftNodeId, request: RaftRpcVoteReply) = + if not sm.state.isCandidate: + sm.debug "Non candidate can't handle votes" + return + discard sm.candidate.votes.registerVote(fromId, request.voteGranted) + + case sm.candidate.votes.tallyVote: + of RaftElectionResult.Unknown: + return + of RaftElectionResult.Won: + sm.debug "Elecation won" & $(sm.candidate.votes) & $sm.myId + sm.becomeLeader() + of RaftElectionResult.Lost: + sm.debug "Lost election" + sm.becomeFollower(RaftNodeId()) + +func advance*(sm: var RaftStateMachine, msg: RaftRpcMessage, now: times.DateTime) = + #sm.debug $msg + if msg.currentTerm > sm.term: + sm.debug "Current node is behind" + var leaderId = RaftnodeId() + if msg.kind == RaftRpcMessageType.AppendRequest: + leaderId = msg.sender + sm.becomeFollower(leaderId) + # TODO: implement pre vote + sm.term = msg.currentTerm + sm.votedFor = RaftnodeId() + elif msg.currentTerm < sm.term: + if msg.kind == RaftRpcMessageType.AppendRequest: + # Instruct leader to step down + let rejected = RaftRpcAppendReplyRejected(nonMatchingIndex: 0, lastIdx: sm.log.lastIndex) + let responce = RaftRpcAppendReply(term: sm.term, commitIndex: sm.commitIndex, result: RaftRpcCode.Rejected, rejected: rejected) + sm.sendTo(msg.sender, responce) + + sm.warning "Ignore message with lower term" + else: + # TODO: add also snapshot + if msg.kind == RaftRpcMessageType.AppendRequest: + if sm.state.isCandidate: + sm.becomeFollower(msg.sender) + elif sm.state.isFollower: + sm.follower.leader = msg.sender + # TODO: fix time + if sm.state.isCandidate: + if msg.kind == RaftRpcMessageType.VoteRequest: + sm.requestVote(msg.sender, msg.voteRequest) + elif msg.kind == RaftRpcMessageType.VoteReply: + sm.debug "Apply vote" + sm.requestVoteReply(msg.sender, msg.voteReply) + else: + sm.warning "Candidate ignore message" + elif sm.state.isFollower: + if msg.sender == sm.follower.leader: + sm.lastElectionTime = now + if msg.kind == RaftRpcMessageType.AppendRequest: + sm.appendEntry(msg.sender, msg.appendRequest) + elif msg.kind == RaftRpcMessageType.VoteRequest: + sm.requestVote(msg.sender, msg.voteRequest) + else: + sm.warning "Follower ignore message" & $msg + # TODO: imelement the rest of the state transitions + elif sm.state.isLeader: + if msg.kind == RaftRpcMessageType.AppendRequest: + sm.warning "Ignore message leader append his entries directly" + elif msg.kind == RaftRpcMessageType.AppendReply: + sm.appendEntryReply(msg.sender, msg.appendReply) + elif msg.kind == RaftRpcMessageType.VoteRequest: + sm.requestVote(msg.sender, msg.voteRequest) + else: + sm.warning "Leader ignore message" diff --git a/src/raft/log.nim b/src/raft/log.nim new file mode 100644 index 0000000..55d6830 --- /dev/null +++ b/src/raft/log.nim @@ -0,0 +1,103 @@ +import types +import std/sequtils + +type + RaftLogEntryType* = enum + rletCommand = 0, + rletConfig = 1, + rletEmpty = 2 + Command* = object + data*: seq[byte] + Config* = object + Empty* = object + + LogEntry* = object # Abstarct Raft Node Log entry containing opaque binary data (Blob etc.) + term*: RaftNodeTerm + index*: RaftLogIndex + # TODO: Add configuration too + case kind*: RaftLogEntryType: + of rletCommand: command*: Command + of rletConfig: config*: Config + of rletEmpty: empty*: bool + + RaftLog* = object + logEntries: seq[LogEntry] + firstIndex: RaftLogIndex + +func initRaftLog*(firstIndex: RaftLogIndex): RaftLog = + var log = RaftLog() + assert firstIndex > 0 + log.firstIndex = firstIndex + return log + +func lastTerm*(rf: RaftLog): RaftNodeTerm = + # Not sure if it's ok, maybe we should return optional value + let size = rf.logEntries.len + if size == 0: + return 0 + return rf.logEntries[size - 1].term + +func entriesCount*(rf: RaftLog): int = + return rf.logEntries.len + +func lastIndex*(rf: RaftLog): RaftNodeTerm = + return rf.logEntries.len + rf.firstIndex - 1 + +func nextIndex*(rf: RaftLog): int = + return rf.lastIndex + 1 + +func truncateUncomitted*(rf: var RaftLog, index: RaftLogIndex) = + # TODO: We should add support for configurations and snapshots + if rf.logEntries.len == 0: + return + rf.logEntries.delete((index - rf.firstIndex).. rf.lastTerm or (term == rf.lastTerm and index >= rf.lastIndex) + +func getEntryByIndex*(rf: RaftLog, index: RaftLogIndex): LogEntry = + return rf.logEntries[index - rf.firstIndex] + +func appendAsLeader*(rf: var RaftLog, entry: LogEntry) = + rf.logEntries.add(entry) + +func appendAsFollower*(rf: var RaftLog, entry: LogEntry) = + assert entry.index > 0 + let currentIdx = rf.lastIndex + if entry.index <= currentIdx: + # TODO: The indexing hold only if we keep all entries in memory + # we should change it when we add support for snapshots + if entry.index >= rf.firstIndex or entry.term != rf.getEntryByIndex(entry.index).term: + rf.truncateUncomitted(entry.index) + rf.logEntries.add(entry) + +func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) = + rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletCommand, command: data)) + +func appendAsLeader*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, empty: bool) = + rf.appendAsLeader(LogEntry(term: term, index: index, kind: rletEmpty, empty: true)) + +func appendAsFollower*(rf: var RaftLog, term: RaftNodeTerm, index: RaftLogIndex, data: Command) = + rf.appendAsFollower(LogEntry(term: term, index: index, kind: rletCommand, command: data)) + + +func matchTerm*(rf: RaftLog, index: RaftLogIndex, term: RaftNodeTerm): (bool, RaftNodeTerm) = + if len(rf.logEntries) == 0: + return (true, 0) + # TODO: We should add support for snapshots + if index > len(rf.logEntries): + # The follower doesn't have all etries + return (false, 0) + + let i = index - rf.firstIndex + if rf.logEntries[i].term == term: + return (true, 0) + else: + return (false, rf.logEntries[i].term) + +func termForIndex*(rf: RaftLog, index: RaftLogIndex): Option[RaftNodeTerm] = + # TODO: snapshot support + assert rf.logEntries.len > index - rf.firstIndex, $rf.logEntries.len & " " & $index & "" & $rf + if rf.logEntries.len > 0 and index >= rf.firstIndex: + return some(rf.logEntries[index - rf.firstIndex].term) + return none(RaftNodeTerm) diff --git a/src/raft/state.nim b/src/raft/state.nim new file mode 100644 index 0000000..af8c16c --- /dev/null +++ b/src/raft/state.nim @@ -0,0 +1,57 @@ + +import types +import tracker + +import std/[times] +type + RaftNodeState* = enum + rnsFollower = 0, # Follower state + rnsCandidate = 1 # Candidate state + rnsLeader = 2 # Leader state + + RaftStateMachineState* = object + case state*: RaftNodeState + of rnsFollower: follower: FollowerState + of rnsCandidate: candidate: CandidateState + of rnsLeader: leader: LeaderState + + LeaderState* = object + tracker*: RaftTracker + + CandidateState* = object + votes*: RaftVotes + + FollowerState* = object + leader*: RaftNodeId + +func `$`*(s: RaftStateMachineState): string = + return $s.state + +func initLeader*(cfg: RaftConfig, index: RaftLogIndex, now: times.DateTime): RaftStateMachineState = + var state = RaftStateMachineState(state: RaftnodeState.rnsLeader, leader: LeaderState()) + state.leader.tracker = initTracker(cfg, index, now) + return state + +func initFollower*(leaderId: RaftNodeId): RaftStateMachineState = + return RaftStateMachineState(state: RaftNodeState.rnsFollower, follower: FollowerState(leader: leaderId)) + +func initCandidate*(cfg: RaftConfig): RaftStateMachineState = + return RaftStateMachineState(state: RaftnodeState.rnsCandidate, candidate: CandidateState(votes: initVotes(cfg))) + +func isLeader*(s: RaftStateMachineState): bool = + return s.state == RaftNodeState.rnsLeader + +func isFollower*(s: RaftStateMachineState): bool = + return s.state == RaftNodeState.rnsFollower + +func isCandidate*(s: RaftStateMachineState): bool = + return s.state == RaftNodeState.rnsCandidate + +func leader*(s: var RaftStateMachineState): var LeaderState = + return s.leader + +func follower*(s: var RaftStateMachineState): var FollowerState = + return s.follower + +func candidate*(s: var RaftStateMachineState): var CandidateState = + return s.candidate diff --git a/src/raft/tracker.nim b/src/raft/tracker.nim new file mode 100644 index 0000000..e4b8715 --- /dev/null +++ b/src/raft/tracker.nim @@ -0,0 +1,97 @@ +import types +import std/[times] + +type + RaftElectionResult* = enum + Unknown = 0, + Won = 1, + Lost = 2 + + RaftElectionTracker* = object + all: seq[RaftNodeId] + responded: seq[RaftNodeId] + granted: int + + RaftVotes* = object + voters*: seq[RaftNodeId] + current: RaftElectionTracker + + RaftFollowerProgress = seq[RaftFollowerProgressTracker] + + RaftTracker* = object + progress*: RaftFollowerProgress + current: seq[RaftNodeId] + + RaftFollowerProgressTracker* = ref object + id*: RaftNodeId + nextIndex*: RaftLogIndex + # Index of the highest log entry known to be replicated to this server. + matchIndex*: RaftLogIndex + commitIndex*: RaftLogIndex + replayedIndex: RaftLogIndex + lastMessageAt*: times.DateTime + + +func initElectionTracker*(nodes: seq[RaftNodeId]): RaftElectionTracker = + var r = RaftElectionTracker() + r.all = nodes + r.granted = 0 + return r + +func registerVote*(ret: var RaftElectionTracker, nodeId: RaftNodeId, granted: bool): bool = + if not ret.all.contains nodeId: + return false + + if not ret.responded.contains nodeId: + ret.responded.add(nodeId) + if granted: + ret.granted += 1 + + return true + +func tallyVote*(ret: var RaftElectionTracker): RaftElectionResult = + let quorym = int(len(ret.all) / 2) + 1 + if ret.granted >= quorym: + return RaftElectionResult.Won + let unkown = len(ret.all) - len(ret.responded) + if ret.granted + unkown >= quorym: + return RaftElectionResult.Unknown + else: + return RaftElectionResult.Lost + +func initVotes*(nodes: seq[RaftNodeId]): RaftVotes = + var r = RaftVotes(voters: nodes, current: initElectionTracker(nodes)) + return r + +func initVotes*(config: RaftConfig): RaftVotes = + var r = RaftVotes(voters: config.currentSet, current: initElectionTracker(config.currentSet)) + return r + +func registerVote*(rv: var RaftVotes, nodeId: RaftNodeId, granted: bool): bool = + # TODO: Add support for configuration + return rv.current.registerVote(nodeId, granted) + +func tallyVote*(rv: var RaftVotes): RaftElectionResult = + # TODO: Add support for configuration + return rv.current.tallyVote() + +func find*(ls: RaftTracker, id: RaftnodeId): Option[RaftFollowerProgressTracker] = + for follower in ls.progress: + if follower.id == id: + return some(follower) + return none(RaftFollowerProgressTracker) + +func initFollowerProgressTracker*(follower: RaftNodeId, nextIndex: RaftLogIndex, now: times.DateTime): RaftFollowerProgressTracker = + return RaftFollowerProgressTracker(id: follower, nextIndex: nextIndex, matchIndex: 0, commitIndex: 0, replayedIndex: 0, lastMessageAt: now) + +func initTracker*(config: RaftConfig, nextIndex: RaftLogIndex, now: times.DateTime): RaftTracker = + var tracker = RaftTracker() + + for node in config.currentSet: + tracker.progress.add(initFollowerProgressTracker(node, nextIndex, now)) + tracker.current.add(node) + return tracker + +func accepted*(fpt: var RaftFollowerProgressTracker, index: RaftLogIndex)= + fpt.matchIndex = max(fpt.matchIndex, index) + fpt.nextIndex = max(fpt.nextIndex, index) \ No newline at end of file diff --git a/src/raft/types.nim b/src/raft/types.nim new file mode 100644 index 0000000..f0d737d --- /dev/null +++ b/src/raft/types.nim @@ -0,0 +1,34 @@ +# nim-raft +# Copyright (c) 2023 Status Research & Development GmbH +# Licensed under either of +# * Apache License, version 2.0, ([LICENSE-APACHE](LICENSE-APACHE)) +# * MIT license ([LICENSE-MIT](LICENSE-MIT)) +# at your option. +# This file may not be copied, modified, or distributed except according to +# those terms. + +# Raft Node Public Types + +import std/rlocks +import options +import stew/results +import uuids +import chronos + +export + results, + options, + rlocks, + uuids, + chronos + +const + DefaultUUID* = initUUID(0, 0) # 00000000-0000-0000-0000-000000000000 + +type + RaftNodeId* = UUID # uuid4 uniquely identifying every Raft Node + RaftNodeTerm* = int # Raft Node Term Type + RaftLogIndex* = int # Raft Node Log Index Type + RaftSnapshotId* = int + RaftConfig* = object + currentSet*: seq[RaftNodeId] \ No newline at end of file diff --git a/tests/all_tests.nim b/tests/all_tests.nim index 98cb173..fb2a83b 100644 --- a/tests/all_tests.nim +++ b/tests/all_tests.nim @@ -6,3 +6,7 @@ # at your option. # This file may not be copied, modified, or distributed except according to # those terms. + + +import test_consensus_state_machine +export test_consensus_state_machine \ No newline at end of file diff --git a/tests/test_bls_cluester.nim b/tests/test_bls_cluester.nim new file mode 100644 index 0000000..7f34729 --- /dev/null +++ b/tests/test_bls_cluester.nim @@ -0,0 +1,345 @@ +import ../src/raft/types +import ../src/raft/consensus_state_machine +import ../src/raft/log +import ../src/raft/tracker +import ../src/raft/state + +import std/[times, sequtils, random] +import std/sugar +import std/sets +import std/json +import std/jsonutils +import std/options +import std/strutils +import stew/endians2 +import stew/byteutils +import std/algorithm + +import blscurve +import tables + +import unittest2 + +type + UserStateMachine = object + + Message* = object + fieldInt: int + + Hash = int + + UserState* = object + lastCommitedMsg: Message + + SignedLogEntry = object + hash: Hash + logIndex: RaftLogIndex + signature: SignedShare + + BLSTestNode* = ref object + stm: RaftStateMachine + keyShare: SecretShare + us: UserState + blockCommunication: bool + debugLogs: seq[DebugLogEntry] + messageSignatures: Table[Hash, seq[SignedShare]] + signEntries: seq[SignedLogEntry] + clusterPublicKey: PublicKey + + BLSTestCluster* = object + nodes*: Table[RaftnodeId, BLSTestNode] + delayer*: MessageDelayer + + SecretShare = object + secret: SecretKey + id: ID + + DelayedMessage* = object + msg: SignedRpcMessage + executeAt: times.DateTime + + MessageDelayer* = object + messages: seq[DelayedMessage] + randomGenerator: Rand + meanDelay: float + stdDelay: float + minDelayMs: int + + SignedShare = object + sign: Signature + pubkey: PublicKey + id: ID + + SignedRpcMessage* = object + raftMsg: RaftRpcMessage + signEntries: seq[SignedLogEntry] + +var secretKey = "1b500388741efd98239a9b3a689721a89a92e8b209aabb10fb7dc3f844976dc2" + +var test_ids_3 = @[ + RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")), + RaftnodeId(parseUUID("2a98fc33-6559-44c0-b130-fc3e9df80a69")), + RaftnodeId(parseUUID("9156756d-697f-4ffa-9b82-0c86720344bd")) +] + +var test_ids_1 = @[ + RaftnodeId(parseUUID("a8409b39-f17b-4682-aaef-a19cc9f356fb")), +] + + +proc initDelayer(mean: float, std: float, minInMs: int, generator: Rand): MessageDelayer = + var delayer = MessageDelayer() + delayer.meanDelay = mean + delayer.stdDelay = std + delayer.minDelayMs = minInMs + delayer.randomGenerator = generator + return delayer + +proc getMessages(delayer: var MessageDelayer, now: times.DateTime): seq[SignedRpcMessage] = + result = delayer.messages.filter(m => m.executeAt <= now).map(m => m.msg) + delayer.messages = delayer.messages.filter(m => m.executeAt > now) + return result + +proc add(delayer: var MessageDelayer, message: SignedRpcMessage, now: times.DateTime) = + let rndDelay = delayer.randomGenerator.gauss(delayer.meanDelay, delayer.stdDelay) + let at = now + times.initDuration(milliseconds = delayer.minDelayMs + rndDelay.int) + delayer.messages.add(DelayedMessage(msg: message, executeAt: at)) + + +proc signs(shares: openArray[SignedShare]): seq[Signature] = + shares.mapIt(it.sign) + +proc ids(shares: openArray[SignedShare]): seq[ID] = + shares.mapIt(it.id) + +func createConfigFromIds*(ids: seq[RaftnodeId]): RaftConfig = + var config = RaftConfig() + for id in ids: + config.currentSet.add(id) + return config + +proc toString(bytes: openarray[byte]): string = + result = newString(bytes.len) + copyMem(result[0].addr, bytes[0].unsafeAddr, bytes.len) + +proc toCommand(msg: Message): Command = + var msgJson = $(msg.toJson) + return Command(data: msgJson.toBytes) + +proc toMessage(cmd: Command): Message = + return to(parseJson(cmd.data.toString), Message) + +proc toBytes(msg: Message): seq[byte] = + var msgJson = $(msg.toJson) + return msgJson.toBytes + +proc toBytes(msg: RaftRpcMessage): seq[byte] = + var msgJson = $(msg.toJson) + return msgJson.toBytes + +proc cmpLogs*(x, y: DebugLogEntry): int = + cmp(x.time, y.time) + +func `$`*(de: DebugLogEntry): string = + return "[" & $de.level & "][" & de.time.format("HH:mm:ss:fff") & "][" & (($de.nodeId)[0..7]) & "...][" & $de.state & "]: " & de.msg + + +proc sign(node: BLSTestNode, msg: Message): SignedShare = + var pk: PublicKey + discard pk.publicFromSecret(node.keyShare.secret) + echo "Produce signature from node: " & $node.stm.myId & " with public key: " & $pk.toHex & "over msg " & $msg.toJson + return SignedShare( + sign: node.keyShare.secret.sign(msg.toBytes), + pubkey: pk, + id: node.keyShare.id, + ) + +proc pollMessages(node: BLSTestNode): seq[SignedRpcMessage] = + var output = node.stm.poll() + var debugLogs = output.debugLogs + var msgs: seq[SignedRpcMessage] + var pk: PublicKey + discard pk.publicFromSecret(node.keyShare.secret) + for msg in output.messages: + if msg.kind == RaftRpcMessageType.AppendReply: + msgs.add(SignedRpcMessage( + raftMsg: msg, + signEntries: node.signEntries + )) + let commitIndex = msg.appendReply.commitIndex + # remove the signature of all entries that are already commited + node.signEntries = node.signEntries.filter(x => x.logIndex > commitIndex) + else: + msgs.add(SignedRpcMessage( + raftMsg: msg, + signEntries: @[] + )) + if node.stm.state.isLeader: + for commitedMsg in output.committed: + if commitedMsg.kind != rletCommand: + continue + var orgMsg = commitedMsg.command.toMessage + var shares = node.messageSignatures[orgMsg.fieldInt] + echo "Try to recover message" & $orgMsg.toBytes + echo "Shares: " & $shares.signs + var recoveredSignature = recover(shares.signs, shares.ids).expect("valid shares") + if not node.clusterPublicKey.verify(orgMsg.toBytes, recoveredSignature): + node.us.lastCommitedMsg = orgMsg + echo "State succesfuly changed" + else: + echo "Failed to reconstruct signature" + + debugLogs.sort(cmpLogs) + for msg in debugLogs: + if msg.level <= DebugLogLevel.Debug: + echo $msg + return msgs + +proc acceptMessage(node: var BLSTestNode, msg: SignedRpcMessage, now: times.DateTime) = + if msg.raftMsg.kind == RaftRpcMessageType.AppendRequest and node.stm.state.isFollower: + var pk: PublicKey + discard pk.publicFromSecret(node.keyShare.secret) + for entry in msg.raftMsg.appendRequest.entries: + if entry.kind == rletEmpty: + continue + var orgMsg = entry.command.toMessage + var share = SignedLogEntry( + hash: orgMsg.fieldInt, + logIndex: msg.raftMsg.appendRequest.previousLogIndex + 1, + signature: node.sign(orgMsg) + ) + node.signEntries.add(share) + node.stm.advance(msg.raftMsg, now) + +proc tick(node: BLSTestNode, now: times.DateTime) = + node.stm.tick(now) + +proc keyGen(seed: uint64): tuple[pubkey: PublicKey, seckey: SecretKey] = + var ikm: array[32, byte] + ikm[0 ..< 8] = seed.toBytesLE + let ok = ikm.keyGen(result.pubkey, result.seckey) + doAssert ok + +proc blsIdFromUint32(x: uint32) : ID = + var a: array[8, uint32] = [uint32 0, 0, 0, 0, 0, 0, 0, x] + ID.fromUint32(a) + +proc generateSecretShares(sk: SecretKey, k: int, n: int): seq[SecretShare] = + doAssert k <= n + var originPts: seq[SecretKey] + originPts.add(sk) + for i in 1 ..< k: + originPts.add(keyGen(uint64(42 + i)).seckey) + + for i in uint32(0) ..< uint32(n): + # id must not be zero + let id = blsIdFromUint32(i + 1) + let secret = genSecretShare(originPts, id) + result.add(SecretShare(secret: secret, id: id)) + +proc createBLSCluster(ids: seq[RaftnodeId], now: times.DateTime, k: int, n: int, delayer: MessageDelayer) : BLSTestCluster = + var sk: SecretKey + discard sk.fromHex("1b500388741efd98239a9b3a689721a89a92e8b209aabb10fb7dc3f844976dc2") + + var pk: PublicKey + discard pk.publicFromSecret(sk) + + var blsShares = generateSecretShares(sk, k, n) + + var config = createConfigFromIds(ids) + var cluster = BLSTestCluster() + cluster.delayer = delayer + cluster.nodes = initTable[RaftnodeId, BLSTestNode]() + + + for i in 0..