Compare commits

..

No commits in common. "master" and "v0.5.0" have entirely different histories.

40 changed files with 1243 additions and 575 deletions

View File

@ -0,0 +1,42 @@
name: Install Nimble
description: install nimble
inputs:
nimble_version:
description: "install nimble"
# TODO: make sure to change to tagged release when available
default: "latest"
os:
description: "operating system"
default: "linux"
cpu:
description: "cpu architecture"
default: "amd64"
runs:
using: "composite"
steps:
- uses: actions/checkout@v3
- name: Build Nimble
shell: bash
run: |
set -x
mkdir -p .nimble
cd .nimble
if [[ '${{ inputs.cpu }}' == 'amd64' ]]; then
CPU=x64
elif [[ '${{ inputs.cpu }}' == 'i386' ]]; then
CPU=x32
else
CPU=${{ inputs.cpu }}
fi
if [[ '${{ inputs.os }}' == 'macos' ]]; then
OS=macosx
else
OS='${{ inputs.os }}'
fi
URL=https://github.com/nim-lang/nimble/releases/download/${{ inputs.nimble_version }}/nimble-"$OS"_"$CPU".tar.gz
curl -o nimble.tar.gz -L -s -S "$URL"
tar -xvf nimble.tar.gz
- name: Derive environment variables
shell: bash
run: echo '${{ github.workspace }}/.nimble/' >> $GITHUB_PATH

View File

@ -1,22 +1,134 @@
name: CI
on: [push, pull_request]
on:
push:
branches:
- master
pull_request:
workflow_dispatch:
jobs:
test:
runs-on: ${{ matrix.os }}
build:
timeout-minutes: 90
strategy:
fail-fast: false
matrix:
nim: [2.2.4]
os: [ubuntu-latest, macos-latest, windows-latest]
target:
- os: linux
cpu: amd64
tests: all
- os: macos
cpu: amd64
tests: all
- os: windows
cpu: amd64
tests: part1
- os: windows
cpu: amd64
tests: part2
branch: [version-1-6]
include:
- target:
os: linux
builder: ubuntu-20.04
shell: bash
- target:
os: macos
builder: macos-12
shell: bash
- target:
os: windows
builder: windows-latest
shell: msys2 {0}
defaults:
run:
shell: ${{ matrix.shell }}
name: '${{ matrix.target.os }}-${{ matrix.target.cpu }} (Nim ${{ matrix.branch }})'
runs-on: ${{ matrix.builder }}
continue-on-error: ${{ matrix.branch == 'version-1-6' || matrix.branch == 'devel' }}
steps:
- name: Checkout
uses: actions/checkout@v4
- uses: jiro4989/setup-nim-action@v2
with:
nim-version: ${{matrix.nim}}
repo-token: ${{ secrets.GITHUB_TOKEN }}
- name: Build
run: nimble install -y
- name: Test
run: nimble test -y
- name: Checkout
uses: actions/checkout@v2
with:
submodules: true
- name: MSYS2 (Windows amd64)
if: runner.os == 'Windows' && matrix.target.cpu == 'amd64'
uses: msys2/setup-msys2@v2
with:
path-type: inherit
install: >-
base-devel
git
mingw-w64-x86_64-toolchain
- name: Restore Nim DLLs dependencies (Windows) from cache
if: runner.os == 'Windows'
id: windows-dlls-cache
uses: actions/cache@v2
with:
path: external/dlls
key: 'dlls'
- name: Install DLL dependencies (Windows)
if: >
steps.windows-dlls-cache.outputs.cache-hit != 'true' &&
runner.os == 'Windows'
run: |
mkdir external
curl -L "https://nim-lang.org/download/windeps.zip" -o external/windeps.zip
7z x external/windeps.zip -oexternal/dlls
- name: Path to cached dependencies (Windows)
if: >
runner.os == 'Windows'
run: |
echo '${{ github.workspace }}'"/external/dlls" >> $GITHUB_PATH
## Restore nimble deps
- name: Restore nimble dependencies from cache
id: nimble_deps
uses: actions/cache@v3
with:
path: |
~/.nimble
${{ github.workspace }}/.nimble
key: ${{ matrix.builder }}-${{ matrix.target.cpu }}-dotnimble-${{ hashFiles('nimble.lock') }}
- name: Setup Nimble
uses: "./.github/actions/install_nimble"
with:
os: ${{ matrix.target.os }}
cpu: ${{ matrix.target.cpu }}
- name: Setup Env
run: |
nimble -v
- name: Setup Deps
run: |
nimble install -d
nimble setup
- name: Run tests
if: runner.os != 'Windows'
run: |
nimble test -y
- name: Run windows tests part1
if: runner.os == 'Windows' && matrix.target.tests == 'part1'
run: |
if [[ "${{ matrix.target.os }}" == "windows" ]]; then
# https://github.com/status-im/nimbus-eth2/issues/3121
export NIMFLAGS="-d:nimRawSetjmp"
fi
nimble testPart1 -y
- name: Run windows tests part2
if: runner.os == 'Windows' && matrix.target.tests == 'part2'
run: |
if [[ "${{ matrix.target.os }}" == "windows" ]]; then
export NIMFLAGS="-d:nimRawSetjmp"
fi
nimble testPart2 -y

69
.github/workflows/codecov.yml vendored Normal file
View File

@ -0,0 +1,69 @@
name: Generate and upload code coverage
on:
#On push to common branches, this computes the "bases stats" for PRs
push:
branches:
- master
pull_request:
workflow_dispatch:
jobs:
All_Tests:
name: All tests
runs-on: ubuntu-20.04
strategy:
matrix:
nim-options: [
""
]
test-program: [
"test"
]
steps:
- uses: actions/checkout@v2
with:
fetch-depth: 0
- name: Environment setup
run: |
sudo apt-get update
sudo apt-get install -y lcov build-essential git curl
mkdir coverage
- name: Restore nimble dependencies from cache
id: nimble_deps
uses: actions/cache@v3
with:
path: |
~/.nimble
key: ubuntu-20.04-amd64-${{ hashFiles('nimble.lock') }}
- name: Setup Nimble
uses: "./.github/actions/install_nimble"
with:
os: linux
cpu: x64
- name: Setup Env
run: |
nimble -v
- name: Setup Deps
run: |
nimble install -d
nimble setup
- name: Run tests
run: |
nimble -y --verbose coverage
- name: Upload coverage to Codecov
uses: codecov/codecov-action@v2
with:
directory: ./coverage/
fail_ci_if_error: true
files: ./coverage/coverage.f.info
flags: unittests
name: codecov-umbrella
verbose: true

1
.gitignore vendored
View File

@ -13,4 +13,3 @@ NimBinaries
.update.timestamp
*.dSYM
.vscode/*
nimbledeps

View File

@ -1,12 +1,12 @@
# A DHT implementation for Logos Storage
# A DHT implementation for Codex
[![License: Apache](https://img.shields.io/badge/License-Apache%202.0-blue.svg)](https://opensource.org/licenses/Apache-2.0)
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](https://opensource.org/licenses/MIT)
[![Stability: experimental](https://img.shields.io/badge/stability-experimental-orange.svg)](#stability)
[![CI (GitHub Actions)](https://github.com/logos-storage/logos-storage-nim-dht/workflows/CI/badge.svg?branch=master)](https://github.com/logos-storage/logos-storage-nim-dht/actions/workflows/ci.yml?query=workflow%3ACI+branch%3Amaster)
[![codecov](https://codecov.io/gh/logos-storage/logos-storage-nim-dht/branch/master/graph/badge.svg?token=tlmMJgU4l7)](https://codecov.io/gh/logos-storage/logos-storage-nim-dht)
[![CI (GitHub Actions)](https://github.com/codex-storage/nim-codex-dht/workflows/CI/badge.svg?branch=master)](https://github.com/codex-storage/nim-codex-dht/actions/workflows/ci.yml?query=workflow%3ACI+branch%3Amaster)
[![codecov](https://codecov.io/gh/codex-storage/nim-codex-dht/branch/master/graph/badge.svg?token=tlmMJgU4l7)](https://codecov.io/gh/codex-storage/nim-codex-dht)
This DHT implementation is aiming to provide a DHT for Logos Storage with the following properties
This DHT implementation is aiming to provide a DHT for Codex with the following properties
* flexible secure transport usage with
* fast UDP based operation
* eventual fallback to TCP-based operation (maybe though libp2p)

230
atlas.lock Normal file
View File

@ -0,0 +1,230 @@
{
"items": {
"zlib": {
"dir": "$deps/nim-zlib",
"url": "https://github.com/status-im/nim-zlib",
"commit": "a2f44bb7f65571a894227ff6fde9298a104e03a5",
"version": "0.1.0"
},
"stew": {
"dir": "$deps/nim-stew",
"url": "https://github.com/status-im/nim-stew",
"commit": "2c2544aec13536304438be045bfdd22452741466",
"version": "0.1.0"
},
"httputils": {
"dir": "$deps/nim-http-utils",
"url": "https://github.com/status-im/nim-http-utils",
"commit": "77a59297ad635d91a6352ef48ee09c6bde0c5d86",
"version": "0.3.0"
},
"chronos": {
"dir": "$deps/nim-chronos",
"url": "https://github.com/status-im/nim-chronos",
"commit": "e15dc3b41fea95348b58f32244962c1c6df310a7",
"version": "3.2.0"
},
"upraises": {
"dir": "$deps/upraises",
"url": "https://github.com/markspanbroek/upraises",
"commit": "bc2628989b63854d980e92dadbd58f83e34b6f25",
"version": "0.1.0"
},
"sqlite3_abi": {
"dir": "$deps/nim-sqlite3-abi",
"url": "https://github.com/arnetheduck/nim-sqlite3-abi",
"commit": "362e1bd9f689ad9f5380d9d27f0705b3d4dfc7d3",
"version": "3.40.1.1"
},
"questionable": {
"dir": "$deps/questionable",
"url": "https://github.com/codex-storage/questionable",
"commit": "1f0afff48bf80ab1149a0957f9743f345bc14b71",
"version": "0.10.12"
},
"websock": {
"dir": "$deps/nim-websock",
"url": "https://github.com/status-im/nim-websock",
"commit": "f8ed9b40a5ff27ad02a3c237c4905b0924e3f982",
"version": "0.1.0"
},
"secp256k1": {
"dir": "$deps/nim-secp256k1",
"url": "https://github.com/status-im/nim-secp256k1",
"commit": "7246d91c667f4cc3759fdd50339caa45a2ecd8be",
"version": "0.6.0.3.2"
},
"bearssl": {
"dir": "$deps/nim-bearssl",
"url": "https://github.com/status-im/nim-bearssl",
"commit": "d55d3a86d7ec3ad11b244e17b3bad490bfbd076d",
"version": "0.2.1"
},
"dnsclient": {
"dir": "$deps/dnsclient.nim",
"url": "https://github.com/ba0f3/dnsclient.nim",
"commit": "23214235d4784d24aceed99bbfe153379ea557c8",
"version": "0.3.4"
},
"nimcrypto": {
"dir": "$deps/nimcrypto",
"url": "https://github.com/status-im/nimcrypto",
"commit": "24e006df85927f64916e60511620583b11403178",
"version": "0.5.4"
},
"results": {
"dir": "$deps/nim-results",
"url": "https://github.com/arnetheduck/nim-results",
"commit": "113d433f48894ee8e7da3e340c8fe19ad7b9db4d",
"version": "0.4.0"
},
"json_serialization": {
"dir": "$deps/nim-json-serialization",
"url": "https://github.com/status-im/nim-json-serialization",
"commit": "3f1ce24ee116daedbc9c8be525e63ec03e185a28",
"version": "0.2.2"
},
"testutils": {
"dir": "$deps/nim-testutils",
"url": "https://github.com/status-im/nim-testutils",
"commit": "b56a5953e37fc5117bd6ea6dfa18418c5e112815",
"version": "0.5.0"
},
"unittest2": {
"dir": "$deps/nim-unittest2",
"url": "https://github.com/status-im/nim-unittest2",
"commit": "262b697f38d6b6f1e7462d3b3ab81d79b894e336",
"version": "0.2.1"
},
"npeg": {
"dir": "$deps/npeg",
"url": "https://github.com/zevv/npeg",
"commit": "b9051a64376d277912ec7616bbb21688754160c7",
"version": "1.2.1"
},
"serialization": {
"dir": "$deps/nim-serialization",
"url": "https://github.com/status-im/nim-serialization",
"commit": "4d541ec43454809904fc4c3c0a7436410ad597d2",
"version": "0.2.2"
},
"faststreams": {
"dir": "$deps/nim-faststreams",
"url": "https://github.com/status-im/nim-faststreams",
"commit": "720fc5e5c8e428d9d0af618e1e27c44b42350309",
"version": "0.3.0"
},
"datastore": {
"dir": "$deps/nim-datastore",
"url": "https://github.com/codex-storage/nim-datastore",
"commit": "cadf38db576a2cf6145188f285f042e56aad7c91",
"version": "0.0.1"
},
"asynctest": {
"dir": "$deps/asynctest",
"url": "https://github.com/markspanbroek/asynctest",
"commit": "c9423b198f1a7fad128396ffaaa92077a1dda869",
"version": "0.4.3"
},
"stint": {
"dir": "$deps/nim-stint",
"url": "https://github.com/status-im/nim-stint",
"commit": "711cda4456c32d3ba3c6c4524135b3453dffeb9c",
"version": "2.0.0"
},
"metrics": {
"dir": "$deps/nim-metrics",
"url": "https://github.com/status-im/nim-metrics",
"commit": "51f1227d0fd04ce84b1ef784b11280cb7875348c",
"version": "0.0.1"
},
"libp2p": {
"dir": "$deps/nim-libp2p",
"url": "https://github.com/status-im/nim-libp2p",
"commit": "e3c967ad1939fb33b8e13759037d193734acd202",
"version": "1.1.0"
},
"chronicles": {
"dir": "$deps/nim-chronicles",
"url": "https://github.com/status-im/nim-chronicles",
"commit": "ccbb7566d1a06bfc1ec42dd8da74a47f1d3b3f4b",
"version": "0.10.3"
},
"protobuf_serialization": {
"dir": "$deps/nim-protobuf-serialization",
"url": "https://github.com/status-im/nim-protobuf-serialization",
"commit": "5a31137a82c2b6a989c9ed979bb636c7a49f570e",
"version": "0.3.0"
}
},
"nimcfg": [
"############# begin Atlas config section ##########",
"--noNimblePath",
"--path:\"vendor/nim-unittest2\"",
"--path:\"vendor/nim-secp256k1\"",
"--path:\"vendor/nim-protobuf-serialization\"",
"--path:\"vendor/nimcrypto\"",
"--path:\"vendor/nim-bearssl\"",
"--path:\"vendor/nim-chronicles\"",
"--path:\"vendor/nim-chronos\"",
"--path:\"vendor/nim-libp2p\"",
"--path:\"vendor/nim-metrics\"",
"--path:\"vendor/nim-stew\"",
"--path:\"vendor/nim-stint\"",
"--path:\"vendor/asynctest\"",
"--path:\"vendor/nim-datastore\"",
"--path:\"vendor/questionable\"",
"--path:\"vendor/nim-faststreams\"",
"--path:\"vendor/nim-serialization\"",
"--path:\"vendor/npeg/src\"",
"--path:\"vendor/nim-testutils\"",
"--path:\"vendor/nim-json-serialization\"",
"--path:\"vendor/nim-results\"",
"--path:\"vendor/nim-http-utils\"",
"--path:\"vendor/dnsclient.nim/src\"",
"--path:\"vendor/nim-websock\"",
"--path:\"vendor/nim-sqlite3-abi\"",
"--path:\"vendor/upraises\"",
"--path:\"vendor/nim-zlib\"",
"############# end Atlas config section ##########",
""
],
"nimbleFile": {
"filename": "codexdht.nimble",
"content": [
"# Package",
"",
"version = \"0.4.0\"",
"author = \"Status Research & Development GmbH\"",
"description = \"DHT based on Eth discv5 implementation\"",
"license = \"MIT\"",
"skipDirs = @[\"tests\"]",
"installFiles = @[\"build.nims\"]",
"",
"# Dependencies",
"requires \"nim >= 1.6.18\"",
"requires \"unittest2 <= 0.0.9\"",
"requires \"secp256k1#2acbbdcc0e63002a013fff49f015708522875832\" # >= 0.5.2 & < 0.6.0",
"requires \"protobuf_serialization\" # >= 0.2.0 & < 0.3.0",
"requires \"nimcrypto >= 0.5.4\"",
"requires \"bearssl#head\"",
"requires \"chronicles >= 0.10.2 & < 0.11.0\"",
"requires \"chronos#e15dc3b41fea95348b58f32244962c1c6df310a7\" # Change to >= 4.0.0 & < 5.0.0 when available",
"requires \"libp2p#unstable\"",
"requires \"metrics\"",
"requires \"stew#head\"",
"requires \"stint\"",
"requires \"asynctest >= 0.4.3 & < 0.5.0\"",
"requires \"https://github.com/codex-storage/nim-datastore#head\"",
"requires \"questionable\"",
"",
"include \"build.nims\"",
" "
]
},
"hostOS": "linux",
"hostCPU": "amd64",
"nimVersion": "1.6.18 a749a8b742bd0a4272c26a65517275db4720e58a",
"gccVersion": "11.4.0",
"clangVersion": ""
}

View File

@ -1,32 +1,23 @@
import std / [os, strutils, sequtils]
task testAll, "Run DHT tests":
exec "nim c -r test.nim"
rmFile "./test"
task compileParallelTests, "Compile parallel tests":
exec "nim c --hints:off --verbosity:0 dht/test_providers.nim"
exec "nim c --hints:off --verbosity:0 dht/test_providermngr.nim"
exec "nim c --hints:off --verbosity:0 discv5/test_discoveryv5.nim"
exec "nim c --hints:off --verbosity:0 discv5/test_discoveryv5_encoding.nim"
exec "nim c -r tests/testAll.nim"
rmFile "./tests/testAll"
task test, "Run DHT tests":
# compile with trace logging to make sure it doesn't crash
exec "nim c -d:testsAll -d:chronicles_enabled=on -d:chronicles_log_level=TRACE test.nim"
rmFile "./test"
compileParallelTestsTask()
exec "nim c -r -d:testsAll --verbosity:0 testAllParallel.nim"
rmFile "./testAllParallel"
exec "nim c -d:testsAll -d:chronicles_enabled=on -d:chronicles_log_level=TRACE tests/testAll.nim"
rmFile "./tests/testAll"
exec "nim c -r -d:testsAll --verbosity:0 tests/testAllParallel.nim"
rmFile "./tests/testAllParallel"
task testPart1, "Run DHT tests A":
compileParallelTestsTask()
exec "nim c -r -d:testsPart1 testAllParallel.nim"
rmFile "./testAllParallel"
exec "nim c -r -d:testsPart1 tests/testAllParallel.nim"
rmFile "./tests/testAllParallel"
task testPart2, "Run DHT tests B":
compileParallelTestsTask()
exec "nim c -r -d:testsPart2 testAllParallel.nim"
rmFile "./testAllParallel"
exec "nim c -r -d:testsPart2 tests/testAllParallel.nim"
rmFile "./tests/testAllParallel"
task coverage, "generates code coverage report":
var (output, exitCode) = gorgeEx("which lcov")
@ -59,7 +50,7 @@ task coverage, "generates code coverage report":
if f.endswith(".nim"): nimSrcs.add " " & f.absolutePath.quoteShell()
echo "======== Running Tests ======== "
exec("nim c -r coverage.nim")
exec("nim c -r tests/coverage.nim")
exec("rm nimcache/*.c")
rmDir("coverage"); mkDir("coverage")
echo " ======== Running LCOV ======== "

22
codecov.yml Normal file
View File

@ -0,0 +1,22 @@
coverage:
status:
project:
default:
# advanced settings
# Prevents PR from being blocked with a reduction in coverage.
# Note, if we want to re-enable this, a `threshold` value can be used
# allow coverage to drop by x% while still posting a success status.
# `informational`: https://docs.codecov.com/docs/commit-status#informational
# `threshold`: https://docs.codecov.com/docs/commit-status#threshold
informational: true
patch:
default:
# advanced settings
# Prevents PR from being blocked with a reduction in coverage.
# Note, if we want to re-enable this, a `threshold` value can be used
# allow coverage to drop by x% while still posting a success status.
# `informational`: https://docs.codecov.com/docs/commit-status#informational
# `threshold`: https://docs.codecov.com/docs/commit-status#threshold
informational: true

View File

@ -1,42 +1,28 @@
# Package
version = "0.6.0"
version = "0.4.0"
author = "Status Research & Development GmbH"
description = "DHT based on Eth discv5 implementation"
license = "MIT"
skipDirs = @["tests"]
installFiles = @["build.nims"]
# Dependencies
requires "nim >= 2.2.4 & < 3.0.0"
requires "secp256k1 >= 0.6.0 & < 0.7.0"
requires "nimcrypto >= 0.6.2 & < 0.8.0"
requires "bearssl >= 0.2.5 & < 0.3.0"
requires "chronicles >= 0.11.2 & < 0.13.0"
requires "chronos >= 4.0.4 & < 4.1.0"
requires "libp2p >= 1.14.1 & < 2.0.0"
requires "metrics >= 0.1.0 & < 0.2.0"
requires "stew >= 0.4.2"
requires "stint >= 0.8.1 & < 0.9.0"
requires "https://github.com/logos-storage/nim-datastore >= 0.2.1 & < 0.3.0"
requires "questionable >= 0.10.15 & < 0.11.0"
requires "leveldbstatic >= 0.2.1 & < 0.3.0"
requires "nim >= 1.6.18"
requires "unittest2 <= 0.0.9"
requires "secp256k1#2acbbdcc0e63002a013fff49f015708522875832" # >= 0.5.2 & < 0.6.0
requires "protobuf_serialization" # >= 0.2.0 & < 0.3.0
requires "nimcrypto >= 0.5.4"
requires "bearssl#head"
requires "chronicles >= 0.10.2 & < 0.11.0"
requires "chronos#e15dc3b41fea95348b58f32244962c1c6df310a7" # Change to >= 4.0.0 & < 5.0.0 when available
requires "libp2p#unstable"
requires "metrics"
requires "stew#head"
requires "stint"
requires "asynctest >= 0.4.3 & < 0.5.0"
requires "https://github.com/codex-storage/nim-datastore#head"
requires "questionable"
task testAll, "Run all test suites":
exec "nimble install -d -y"
withDir "tests":
exec "nimble testAll"
task test, "Run the test suite":
exec "nimble install -d -y"
withDir "tests":
exec "nimble test"
task testPart1, "Run the test suite part 1":
exec "nimble install -d -y"
withDir "tests":
exec "nimble testPart1"
task testPart2, "Run the test suite part 2":
exec "nimble install -d -y"
withDir "tests":
exec "nimble testPart2"
include "build.nims"

View File

@ -1,13 +1,13 @@
import
std/sugar,
libp2p/crypto/[crypto, secp],
stew/[byteutils, objects, ptrops],
results
stew/[byteutils, objects, results, ptrops]
# from secp256k1 import ecdh, SkEcdhSecretSize, toRaw, SkSecretKey, SkPublicKey
import secp256k1
const
KeyLength* = secp256k1.SkEcdhSecretSize
KeyLength* = SkEcdhSecretSize
## Ecdh shared secret key length without leading byte
## (publicKey * privateKey).x, where length of x is 32 bytes
@ -25,12 +25,12 @@ type
data*: array[FullKeyLength, byte]
proc fromHex*(T: type PrivateKey, data: string): Result[PrivateKey, cstring] =
let skKey = ? secp.SkPrivateKey.init(data).mapErr(e =>
let skKey = ? SkPrivateKey.init(data).mapErr(e =>
("Failed to init private key from hex string: " & $e).cstring)
ok PrivateKey.init(skKey)
proc fromHex*(T: type PublicKey, data: string): Result[PublicKey, cstring] =
let skKey = ? secp.SkPublicKey.init(data).mapErr(e =>
let skKey = ? SkPublicKey.init(data).mapErr(e =>
("Failed to init public key from hex string: " & $e).cstring)
ok PublicKey.init(skKey)
@ -45,17 +45,14 @@ proc ecdhSharedSecretHash(output: ptr byte, x32, y32: ptr byte, data: pointer):
## Take the `x32` part as ecdh shared secret.
## output length is derived from x32 length and taken from ecdh
## generic parameter `KeyLength`
copyMem(output, x32, KeyLength)
copyMem(output, x32, SkEcdhSecretSize)
return 1
func ecdhSharedSecret(seckey: SkPrivateKey, pubkey: secp.SkPublicKey): SharedSecret =
## Compute ecdh agreed shared secret.
let res = secp256k1.ecdh[KeyLength](
secp256k1.SkSecretKey(seckey),
secp256k1.SkPublicKey(pubkey),
ecdhSharedSecretHash,
nil,
)
let res = ecdh[SkEcdhSecretSize](secp256k1.SkSecretKey(seckey),
secp256k1.SkPublicKey(pubkey),
ecdhSharedSecretHash, nil)
# This function only fail if the hash function return zero.
# Because our hash function always success, we can turn the error into defect
doAssert res.isOk, $res.error

View File

@ -1,4 +1,4 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
@ -11,21 +11,20 @@
## https://github.com/ethereum/devp2p/blob/master/discv5/discv5-theory.md#sessions
##
{.push raises: [].}
{.push raises: [Defect].}
import
std/[hashes, net, options, sugar, tables],
stew/endians2,
bearssl/rand,
chronicles,
stew/[byteutils],
stew/[results, byteutils],
stint,
libp2p/crypto/crypto as libp2p_crypto,
libp2p/crypto/secp,
libp2p/signed_envelope,
metrics,
nimcrypto,
results,
"."/[messages, messages_encoding, node, spr, hkdf, sessions],
"."/crypto
@ -34,16 +33,13 @@ from stew/objects import checkedEnumAssign
export crypto
declareCounter dht_session_lru_cache_hits, "Session LRU cache hits"
declareCounter dht_session_lru_cache_misses, "Session LRU cache misses"
declareCounter dht_session_decrypt_failures, "Session decrypt failures"
declareCounter discovery_session_lru_cache_hits, "Session LRU cache hits"
declareCounter discovery_session_lru_cache_misses, "Session LRU cache misses"
declareCounter discovery_session_decrypt_failures, "Session decrypt failures"
logScope:
topics = "discv5"
type
cipher = aes128
const
version: uint16 = 1
idSignatureText = "discovery v5 identity proof"
@ -166,7 +162,7 @@ proc deriveKeys*(n1, n2: NodeId, priv: PrivateKey, pub: PublicKey,
ok secrets
proc encryptGCM*(key: AesKey, nonce, pt, authData: openArray[byte]): seq[byte] =
var ectx: GCM[cipher]
var ectx: GCM[aes128]
ectx.init(key, nonce, authData)
result = newSeq[byte](pt.len + gcmTagSize)
ectx.encrypt(pt, result)
@ -179,7 +175,7 @@ proc decryptGCM*(key: AesKey, nonce, ct, authData: openArray[byte]):
debug "cipher is missing tag", len = ct.len
return
var dctx: GCM[cipher]
var dctx: GCM[aes128]
dctx.init(key, nonce, authData)
var res = newSeq[byte](ct.len - gcmTagSize)
var tag: array[gcmTagSize, byte]
@ -193,7 +189,7 @@ proc decryptGCM*(key: AesKey, nonce, ct, authData: openArray[byte]):
return some(res)
proc encryptHeader*(id: NodeId, iv, header: openArray[byte]): seq[byte] =
var ectx: CTR[cipher]
var ectx: CTR[aes128]
ectx.init(id.toByteArrayBE().toOpenArray(0, 15), iv)
result = newSeq[byte](header.len)
ectx.encrypt(header, result)
@ -235,7 +231,7 @@ proc encodeMessagePacket*(rng: var HmacDrbgContext, c: var Codec,
if c.sessions.load(toId, toAddr, recipientKey1, recipientKey2, initiatorKey):
haskey = true
messageEncrypted = encryptGCM(initiatorKey, nonce, message, @iv & header)
dht_session_lru_cache_hits.inc()
discovery_session_lru_cache_hits.inc()
else:
# We might not have the node's keys if the handshake hasn't been performed
# yet. That's fine, we send a random-packet and we will be responded with
@ -248,7 +244,7 @@ proc encodeMessagePacket*(rng: var HmacDrbgContext, c: var Codec,
var randomData: array[gcmTagSize + 4, byte]
hmacDrbgGenerate(rng, randomData)
messageEncrypted.add(randomData)
dht_session_lru_cache_misses.inc()
discovery_session_lru_cache_misses.inc()
let maskedHeader = encryptHeader(toId, iv, header)
@ -378,7 +374,7 @@ proc decodeHeader*(id: NodeId, iv, maskedHeader: openArray[byte]):
DecodeResult[(StaticHeader, seq[byte])] =
# No need to check staticHeader size as that is included in minimum packet
# size check in decodePacket
var ectx: CTR[cipher]
var ectx: CTR[aes128]
ectx.init(id.toByteArrayBE().toOpenArray(0, aesKeySize - 1), iv)
# Decrypt static-header part of the header
var staticHeader = newSeq[byte](staticHeaderSize)
@ -432,11 +428,11 @@ proc decodeMessagePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce,
# Don't consider this an error, simply haven't done a handshake yet or
# the session got removed.
trace "Decrypting failed (no keys)"
dht_session_lru_cache_misses.inc()
discovery_session_lru_cache_misses.inc()
return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce,
srcId: srcId))
dht_session_lru_cache_hits.inc()
discovery_session_lru_cache_hits.inc()
var pt = decryptGCM(recipientKey2, nonce, ct, @iv & @header)
if pt.isNone():
@ -449,7 +445,7 @@ proc decodeMessagePacket(c: var Codec, fromAddr: Address, nonce: AESGCMNonce,
# needed later, depending on message order.
trace "Decrypting failed (invalid keys)", address = fromAddr
#c.sessions.del(srcId, fromAddr)
dht_session_decrypt_failures.inc()
discovery_session_decrypt_failures.inc()
return ok(Packet(flag: Flag.OrdinaryMessage, requestNonce: nonce,
srcId: srcId))

View File

@ -1,4 +1,4 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2021 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
@ -15,7 +15,7 @@
## To select the right address, a majority count is done. This is done over a
## sort of moving window as votes expire after `IpVoteTimeout`.
{.push raises: [].}
{.push raises: [Defect].}
import
std/[tables, options],

View File

@ -1,6 +1,6 @@
import std/[tables, lists, options]
{.push raises: [].}
{.push raises: [Defect].}
export tables, lists, options

View File

@ -1,4 +1,4 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
@ -10,7 +10,7 @@
## These messages get protobuf encoded, while in the spec they get RLP encoded.
##
{.push raises: [].}
{.push raises: [Defect].}
import
std/[hashes, net],

View File

@ -1,4 +1,4 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2020-2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
@ -14,7 +14,6 @@ import
stew/endians2,
libp2p/routing_record,
libp2p/signed_envelope,
libp2p/protobuf/minprotobuf,
"."/[messages, spr, node],
../../../../dht/providers_encoding
@ -326,7 +325,7 @@ proc encodeMessage*[T: SomeMessage](p: T, reqId: RequestId): seq[byte] =
pb.write(2, encoded)
pb.finish()
result.add(pb.buffer)
trace "Encoded protobuf message", typ = $T
trace "Encoded protobuf message", typ = $T, encoded
proc decodeMessage*(body: openArray[byte]): DecodeResult[Message] =
## Decodes to the specific `Message` type.

View File

@ -1,51 +1,40 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
{.push raises: [Defect].}
import
std/[hashes, net],
std/hashes,
bearssl/rand,
chronicles,
chronos,
nimcrypto,
stew/shims/net,
stint,
./crypto,
./spr
export stint
const
avgSmoothingFactor = 0.9
seenSmoothingFactor = 0.9
type
NodeId* = UInt256
Address* = object
ip*: IpAddress
ip*: ValidIpAddress
port*: Port
Stats* = object
rttMin*: float #millisec
rttAvg*: float #millisec
bwAvg*: float #bps
bwMax*: float #bps
Node* = ref object
id*: NodeId
pubkey*: PublicKey
address*: Option[Address]
record*: SignedPeerRecord
seen*: float ## Indicates if there was at least one successful
seen*: bool ## Indicates if there was at least one successful
## request-response with this node, or if the nde was verified
## through the underlying transport mechanisms. After first contact
## it tracks how reliable is the communication with the node.
stats*: Stats # traffic measurements and statistics
## through the underlying transport mechanisms.
func toNodeId*(pid: PeerId): NodeId =
## Convert public key to a node identifier.
@ -68,7 +57,7 @@ func newNode*(
id: ? pk.toNodeId(),
pubkey: pk,
record: record,
address: Address(ip: ip, port: port).some)
address: Address(ip: ValidIpAddress.init(ip), port: port).some)
ok node
@ -88,9 +77,7 @@ func newNode*(r: SignedPeerRecord): Result[Node, cstring] =
nodeId = ? pk.get().toNodeId()
if r.ip.isSome() and r.udp.isSome():
let a = Address(
ip: IpAddress(family: IPv4, address_v4: r.ip.get()), port: Port(r.udp.get())
)
let a = Address(ip: ipv4(r.ip.get()), port: Port(r.udp.get()))
ok(Node(
id: nodeId,
@ -104,7 +91,7 @@ func newNode*(r: SignedPeerRecord): Result[Node, cstring] =
record: r,
address: none(Address)))
proc update*(n: Node, pk: PrivateKey, ip: Option[IpAddress],
proc update*(n: Node, pk: PrivateKey, ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port] = none[Port]()): Result[void, cstring] =
? n.record.update(pk, ip, tcpPort, udpPort)
@ -148,14 +135,14 @@ func shortLog*(id: NodeId): string =
result = sid
else:
result = newStringOfCap(10)
for i in 0..<3:
for i in 0..<2:
result.add(sid[i])
result.add("*")
for i in (len(sid) - 6)..sid.high:
result.add(sid[i])
chronicles.formatIt(NodeId): shortLog(it)
func hash*(ip: IpAddress): Hash =
func hash*(ip: ValidIpAddress): Hash =
case ip.family
of IpAddressFamily.IPv6: hash(ip.address_v6)
of IpAddressFamily.IPv4: hash(ip.address_v4)
@ -195,33 +182,3 @@ func shortLog*(address: Address): string =
$address
chronicles.formatIt(Address): shortLog(it)
func registerSeen*(n:Node, seen = true) =
## Register event of seeing (getting message from) or not seeing (missing message) node
## Note: interpretation might depend on NAT type
if n.seen == 0: # first time seeing the node
n.seen = 1
else:
n.seen = seenSmoothingFactor * n.seen + (1.0 - seenSmoothingFactor) * seen.float
func alreadySeen*(n:Node) : bool =
## Was the node seen at least once?
n.seen > 0
# collecting performane metrics
func registerRtt*(n: Node, rtt: Duration) =
## register an RTT measurement
let rttMs = rtt.nanoseconds.float / 1e6
n.stats.rttMin =
if n.stats.rttMin == 0: rttMs
else: min(n.stats.rttMin, rttMs)
n.stats.rttAvg =
if n.stats.rttAvg == 0: rttMs
else: avgSmoothingFactor * n.stats.rttAvg + (1.0 - avgSmoothingFactor) * rttMs
func registerBw*(n: Node, bw: float) =
## register an bandwidth measurement
n.stats.bwMax = max(n.stats.bwMax, bw)
n.stats.bwAvg =
if n.stats.bwAvg == 0: bw
else: avgSmoothingFactor * n.stats.bwAvg + (1.0 - avgSmoothingFactor) * bw

View File

@ -1,8 +1,8 @@
{.push raises: [].}
{.push raises: [Defect].}
import
std/[net, sets, options],
results, chronicles, chronos,
std/[sets, options],
stew/results, stew/shims/net, chronicles, chronos,
"."/[node, spr, routing_table]
logScope:

View File

@ -1,4 +1,4 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
@ -71,18 +71,18 @@
## more requests will be needed for a lookup (adding bandwidth and latency).
## This might be a concern for mobile devices.
{.push raises: [].}
{.push raises: [Defect].}
import
std/[net, tables, sets, options, math, sequtils, algorithm, strutils],
std/[tables, sets, options, math, sequtils, algorithm, strutils],
stew/shims/net as stewNet,
json_serialization/std/net,
stew/[base64, endians2],
stew/[base64, endians2, results],
pkg/[chronicles, chronicles/chronos_tools],
pkg/chronos,
pkg/stint,
pkg/bearssl/rand,
pkg/metrics,
pkg/results
pkg/metrics
import "."/[
messages,
@ -100,13 +100,13 @@ import nimcrypto except toHex
export options, results, node, spr, providers
declareCounter dht_message_requests_outgoing,
declareCounter discovery_message_requests_outgoing,
"Discovery protocol outgoing message requests", labels = ["response"]
declareCounter dht_message_requests_incoming,
declareCounter discovery_message_requests_incoming,
"Discovery protocol incoming message requests", labels = ["response"]
declareCounter dht_unsolicited_messages,
declareCounter discovery_unsolicited_messages,
"Discovery protocol unsolicited or timed-out messages"
declareCounter dht_enr_auto_update,
declareCounter discovery_enr_auto_update,
"Amount of discovery IP:port address SPR auto updates"
logScope:
@ -117,7 +117,6 @@ const
LookupRequestLimit = 3 ## Amount of distances requested in a single Findnode
## message for a lookup or query
FindNodeResultLimit = 16 ## Maximum amount of SPRs in the total Nodes messages
FindNodeFastResultLimit = 6 ## Maximum amount of SPRs in response to findNodeFast
## that will be processed
MaxNodesPerMessage = 3 ## Maximum amount of SPRs per individual Nodes message
RefreshInterval = 5.minutes ## Interval of launching a random query to
@ -126,17 +125,12 @@ const
RevalidateMax = 10000 ## Revalidation of a peer is done between min and max milliseconds.
## value in milliseconds
IpMajorityInterval = 5.minutes ## Interval for checking the latest IP:Port
DebugPrintInterval = 5.minutes ## Interval to print neighborhood with stats
## majority and updating this when SPR auto update is set.
InitialLookups = 1 ## Amount of lookups done when populating the routing table
ResponseTimeout* = 1.seconds ## timeout for the response of a request-response
ResponseTimeout* = 4.seconds ## timeout for the response of a request-response
MaxProvidersEntries* = 1_000_000 # one million records
MaxProvidersPerEntry* = 20 # providers per entry
## call
FindnodeSeenThreshold = 1.0 ## threshold used as findnode response filter
LookupSeenThreshold = 0.0 ## threshold used for lookup nodeset selection
QuerySeenThreshold = 0.0 ## threshold used for query nodeset selection
NoreplyRemoveThreshold = 0.5 ## remove node on no reply if 'seen' is below this value
func shortLog*(record: SignedPeerRecord): string =
## Returns compact string representation of ``SignedPeerRecord``.
@ -172,7 +166,6 @@ type
refreshLoop: Future[void]
revalidateLoop: Future[void]
ipMajorityLoop: Future[void]
debugPrintLoop: Future[void]
lastLookup: chronos.Moment
bootstrapRecords*: seq[SignedPeerRecord]
ipVote: IpVote
@ -189,9 +182,6 @@ type
DiscResult*[T] = Result[T, cstring]
func `$`*(p: Protocol): string =
$p.localNode.id
const
defaultDiscoveryConfig* = DiscoveryConfig(
tableIpLimits: DefaultTableIpLimits,
@ -241,7 +231,7 @@ proc randomNodes*(d: Protocol, maxAmount: int): seq[Node] =
d.routingTable.randomNodes(maxAmount)
proc randomNodes*(d: Protocol, maxAmount: int,
pred: proc(x: Node): bool {.gcsafe, noSideEffect, raises: [].}): seq[Node] =
pred: proc(x: Node): bool {.gcsafe, noSideEffect.}): seq[Node] =
## Get a `maxAmount` of random nodes from the local routing table with the
## `pred` predicate function applied as filter on the nodes selected.
d.routingTable.randomNodes(maxAmount, pred)
@ -253,14 +243,14 @@ proc randomNodes*(d: Protocol, maxAmount: int,
d.randomNodes(maxAmount, proc(x: Node): bool = x.record.contains(enrField))
proc neighbours*(d: Protocol, id: NodeId, k: int = BUCKET_SIZE,
seenThreshold = 0.0): seq[Node] =
seenOnly = false): seq[Node] =
## Return up to k neighbours (closest node ids) of the given node id.
d.routingTable.neighbours(id, k, seenThreshold)
d.routingTable.neighbours(id, k, seenOnly)
proc neighboursAtDistances*(d: Protocol, distances: seq[uint16],
k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] =
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
## Return up to k neighbours (closest node ids) at given distances.
d.routingTable.neighboursAtDistances(distances, k, seenThreshold)
d.routingTable.neighboursAtDistances(distances, k, seenOnly)
proc nodesDiscovered*(d: Protocol): int = d.routingTable.len
@ -282,7 +272,7 @@ proc updateRecord*(
newSpr = spr.get()
seqNo = d.localNode.record.seqNum
info "Updated discovery SPR", uri = newSpr.toURI(), newSpr = newSpr.data
info "Updated discovery SPR", uri = newSpr.toURI()
d.localNode.record = newSpr
d.localNode.record.data.seqNo = seqNo
@ -348,7 +338,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
# TODO: Still deduplicate also?
if fn.distances.all(proc (x: uint16): bool = return x <= 256):
d.sendNodes(fromId, fromAddr, reqId,
d.routingTable.neighboursAtDistances(fn.distances, FindNodeResultLimit, FindnodeSeenThreshold))
d.routingTable.neighboursAtDistances(fn.distances, seenOnly = true))
else:
# At least one invalid distance, but the polite node we are, still respond
# with empty nodes.
@ -357,7 +347,7 @@ proc handleFindNode(d: Protocol, fromId: NodeId, fromAddr: Address,
proc handleFindNodeFast(d: Protocol, fromId: NodeId, fromAddr: Address,
fnf: FindNodeFastMessage, reqId: RequestId) =
d.sendNodes(fromId, fromAddr, reqId,
d.routingTable.neighbours(fnf.target, FindNodeFastResultLimit, FindnodeSeenThreshold))
d.routingTable.neighbours(fnf.target, seenOnly = true))
# TODO: if known, maybe we should add exact target even if not yet "seen"
proc handleTalkReq(d: Protocol, fromId: NodeId, fromAddr: Address,
@ -413,27 +403,27 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
message: Message) =
case message.kind
of ping:
dht_message_requests_incoming.inc()
discovery_message_requests_incoming.inc()
d.handlePing(srcId, fromAddr, message.ping, message.reqId)
of findNode:
dht_message_requests_incoming.inc()
discovery_message_requests_incoming.inc()
d.handleFindNode(srcId, fromAddr, message.findNode, message.reqId)
of findNodeFast:
dht_message_requests_incoming.inc()
discovery_message_requests_incoming.inc()
d.handleFindNodeFast(srcId, fromAddr, message.findNodeFast, message.reqId)
of talkReq:
dht_message_requests_incoming.inc()
discovery_message_requests_incoming.inc()
d.handleTalkReq(srcId, fromAddr, message.talkReq, message.reqId)
of addProvider:
dht_message_requests_incoming.inc()
dht_message_requests_incoming.inc(labelValues = ["no_response"])
discovery_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
d.handleAddProvider(srcId, fromAddr, message.addProvider, message.reqId)
of getProviders:
dht_message_requests_incoming.inc()
discovery_message_requests_incoming.inc()
asyncSpawn d.handleGetProviders(srcId, fromAddr, message.getProviders, message.reqId)
of regTopic, topicQuery:
dht_message_requests_incoming.inc()
dht_message_requests_incoming.inc(labelValues = ["no_response"])
discovery_message_requests_incoming.inc()
discovery_message_requests_incoming.inc(labelValues = ["no_response"])
trace "Received unimplemented message kind", kind = message.kind,
origin = fromAddr
else:
@ -441,7 +431,7 @@ proc handleMessage(d: Protocol, srcId: NodeId, fromAddr: Address,
if d.awaitedMessages.take((srcId, message.reqId), waiter):
waiter.complete(some(message))
else:
dht_unsolicited_messages.inc()
discovery_unsolicited_messages.inc()
trace "Timed out or unrequested message", kind = message.kind,
origin = fromAddr
@ -453,50 +443,27 @@ proc registerTalkProtocol*(d: Protocol, protocolId: seq[byte],
else:
ok()
proc replaceNode(d: Protocol, n: Node, forceRemoveBelow = 1.0) =
proc replaceNode(d: Protocol, n: Node) =
if n.record notin d.bootstrapRecords:
d.routingTable.replaceNode(n, forceRemoveBelow)
d.routingTable.replaceNode(n)
else:
# For now we never remove bootstrap nodes. It might make sense to actually
# do so and to retry them only in case we drop to a really low amount of
# peers in the routing table.
debug "Message request to bootstrap node failed", src=d.localNode, dst=n
proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T,
reqId: RequestId) =
doAssert(toNode.address.isSome())
let
message = encodeMessage(m, reqId)
trace "Send message packet", dstId = toNode.id,
address = toNode.address, kind = messageKind(T)
dht_message_requests_outgoing.inc()
d.transport.sendMessage(toNode, message)
proc waitResponse*[T: SomeMessage](d: Protocol, node: Node, msg: T):
Future[Option[Message]] =
let reqId = RequestId.init(d.rng[])
result = d.waitMessage(node, reqId)
sendRequest(d, node, msg, reqId)
proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId, timeout = ResponseTimeout):
proc waitMessage(d: Protocol, fromNode: Node, reqId: RequestId):
Future[Option[Message]] =
result = newFuture[Option[Message]]("waitMessage")
let res = result
let key = (fromNode.id, reqId)
sleepAsync(timeout).addCallback() do(data: pointer):
sleepAsync(ResponseTimeout).addCallback() do(data: pointer):
d.awaitedMessages.del(key)
if not res.finished:
res.complete(none(Message))
d.awaitedMessages[key] = result
proc waitNodeResponses*[T: SomeMessage](d: Protocol, node: Node, msg: T):
Future[DiscResult[seq[SignedPeerRecord]]] =
let reqId = RequestId.init(d.rng[])
result = d.waitNodes(node, reqId)
sendRequest(d, node, msg, reqId)
proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId):
Future[DiscResult[seq[SignedPeerRecord]]] {.async.} =
## Wait for one or more nodes replies.
@ -505,70 +472,72 @@ proc waitNodes(d: Protocol, fromNode: Node, reqId: RequestId):
## on that, more replies will be awaited.
## If one reply is lost here (timed out), others are ignored too.
## Same counts for out of order receival.
let startTime = Moment.now()
var op = await d.waitMessage(fromNode, reqId)
if op.isSome:
if op.get.kind == MessageKind.nodes:
var res = op.get.nodes.sprs
let
total = op.get.nodes.total
firstTime = Moment.now()
rtt = firstTime - startTime
# trace "nodes RTT:", rtt, node = fromNode
fromNode.registerRtt(rtt)
let total = op.get.nodes.total
for i in 1 ..< total:
op = await d.waitMessage(fromNode, reqId)
if op.isSome and op.get.kind == MessageKind.nodes:
res.add(op.get.nodes.sprs)
# Estimate bandwidth based on UDP packet train received, assuming these were
# released fast and spaced in time by bandwidth bottleneck. This is just a rough
# packet-pair based estimate, far from being perfect.
# TODO: get message size from lower layer for better bandwidth estimate
# TODO: get better reception timestamp from lower layers
let
deltaT = Moment.now() - firstTime
bwBps = 500.0 * 8.0 / (deltaT.nanoseconds.float / i.float / 1e9)
# trace "bw estimate:", deltaT = deltaT, i, bw_mbps = bwBps / 1e6, node = fromNode
fromNode.registerBw(bwBps)
else:
# No error on this as we received some nodes.
break
return ok(res)
else:
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to find node message")
else:
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Nodes message not received in time")
proc sendRequest*[T: SomeMessage](d: Protocol, toId: NodeId, toAddr: Address, m: T):
RequestId =
let
reqId = RequestId.init(d.rng[])
message = encodeMessage(m, reqId)
trace "Send message packet", dstId = toId, toAddr, kind = messageKind(T)
discovery_message_requests_outgoing.inc()
d.transport.sendMessage(toId, toAddr, message)
return reqId
proc sendRequest*[T: SomeMessage](d: Protocol, toNode: Node, m: T):
RequestId =
doAssert(toNode.address.isSome())
let
reqId = RequestId.init(d.rng[])
message = encodeMessage(m, reqId)
trace "Send message packet", dstId = toNode.id,
address = toNode.address, kind = messageKind(T)
discovery_message_requests_outgoing.inc()
d.transport.sendMessage(toNode, message)
return reqId
proc ping*(d: Protocol, toNode: Node):
Future[DiscResult[PongMessage]] {.async.} =
## Send a discovery ping message.
##
## Returns the received pong message or an error.
let
msg = PingMessage(sprSeq: d.localNode.record.seqNum)
startTime = Moment.now()
resp = await d.waitResponse(toNode, msg)
rtt = Moment.now() - startTime
# trace "ping RTT:", rtt, node = toNode
toNode.registerRtt(rtt)
let reqId = d.sendRequest(toNode,
PingMessage(sprSeq: d.localNode.record.seqNum))
let resp = await d.waitMessage(toNode, reqId)
d.routingTable.setJustSeen(toNode, resp.isSome())
if resp.isSome():
if resp.get().kind == pong:
d.routingTable.setJustSeen(toNode)
return ok(resp.get().pong)
else:
d.replaceNode(toNode)
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to ping message")
else:
# A ping (or the pong) was lost, what should we do? Previous implementation called
# d.replaceNode(toNode) immediately, which removed the node. This is too aggressive,
# especially if we have a temporary network outage. Although bootstrap nodes are protected
# from being removed, everything else would slowly be removed.
d.replaceNode(toNode, NoreplyRemoveThreshold)
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Pong message not received in time")
proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
@ -577,13 +546,12 @@ proc findNode*(d: Protocol, toNode: Node, distances: seq[uint16]):
##
## Returns the received nodes or an error.
## Received SPRs are already validated and converted to `Node`.
let
msg = FindNodeMessage(distances: distances)
nodes = await d.waitNodeResponses(toNode, msg)
let reqId = d.sendRequest(toNode, FindNodeMessage(distances: distances))
let nodes = await d.waitNodes(toNode, reqId)
d.routingTable.setJustSeen(toNode, nodes.isOk)
if nodes.isOk:
let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit, distances)
d.routingTable.setJustSeen(toNode)
return ok(res)
else:
trace "findNode nodes not OK."
@ -596,13 +564,12 @@ proc findNodeFast*(d: Protocol, toNode: Node, target: NodeId):
##
## Returns the received nodes or an error.
## Received SPRs are already validated and converted to `Node`.
let
msg = FindNodeFastMessage(target: target)
nodes = await d.waitNodeResponses(toNode, msg)
let reqId = d.sendRequest(toNode, FindNodeFastMessage(target: target))
let nodes = await d.waitNodes(toNode, reqId)
d.routingTable.setJustSeen(toNode, nodes.isOk)
if nodes.isOk:
let res = verifyNodesRecords(nodes.get(), toNode, FindNodeFastResultLimit)
let res = verifyNodesRecords(nodes.get(), toNode, FindNodeResultLimit)
d.routingTable.setJustSeen(toNode)
return ok(res)
else:
d.replaceNode(toNode)
@ -614,26 +581,21 @@ proc talkReq*(d: Protocol, toNode: Node, protocol, request: seq[byte]):
## Send a discovery talkreq message.
##
## Returns the received talkresp message or an error.
let
msg = TalkReqMessage(protocol: protocol, request: request)
startTime = Moment.now()
resp = await d.waitResponse(toNode, msg)
rtt = Moment.now() - startTime
# trace "talk RTT:", rtt, node = toNode
toNode.registerRtt(rtt)
let reqId = d.sendRequest(toNode,
TalkReqMessage(protocol: protocol, request: request))
let resp = await d.waitMessage(toNode, reqId)
d.routingTable.setJustSeen(toNode, resp.isSome())
if resp.isSome():
if resp.get().kind == talkResp:
d.routingTable.setJustSeen(toNode)
return ok(resp.get().talkResp.response)
else:
d.replaceNode(toNode)
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to talk request message")
else:
# remove on loss only if there is a replacement
d.replaceNode(toNode, NoreplyRemoveThreshold)
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("Talk response message not received in time")
proc lookupDistances*(target, dest: NodeId): seq[uint16] =
@ -648,18 +610,25 @@ proc lookupDistances*(target, dest: NodeId): seq[uint16] =
result.add(td - uint16(i))
inc i
proc lookupWorker(d: Protocol, destNode: Node, target: NodeId, fast: bool):
proc lookupWorker(d: Protocol, destNode: Node, target: NodeId):
Future[seq[Node]] {.async.} =
let dists = lookupDistances(target, destNode.id)
let r =
if fast:
await d.findNodeFast(destNode, target)
else:
# Instead of doing max `LookupRequestLimit` findNode requests, make use
# of the discv5.1 functionality to request nodes for multiple distances.
let dists = lookupDistances(target, destNode.id)
await d.findNode(destNode, dists)
# Instead of doing max `LookupRequestLimit` findNode requests, make use
# of the discv5.1 functionality to request nodes for multiple distances.
let r = await d.findNode(destNode, dists)
if r.isOk:
result.add(r[])
# Attempt to add all nodes discovered
for n in result:
discard d.addNode(n)
proc lookupWorkerFast(d: Protocol, destNode: Node, target: NodeId):
Future[seq[Node]] {.async.} =
## use terget NodeId based find_node
let r = await d.findNodeFast(destNode, target)
if r.isOk:
result.add(r[])
@ -673,7 +642,7 @@ proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]]
# `closestNodes` holds the k closest nodes to target found, sorted by distance
# Unvalidated nodes are used for requests as a form of validation.
var closestNodes = d.routingTable.neighbours(target, BUCKET_SIZE,
LookupSeenThreshold)
seenOnly = false)
var asked, seen = initHashSet[NodeId]()
asked.incl(d.localNode.id) # No need to ask our own node
@ -690,7 +659,10 @@ proc lookup*(d: Protocol, target: NodeId, fast: bool = false): Future[seq[Node]]
while i < closestNodes.len and pendingQueries.len < Alpha:
let n = closestNodes[i]
if not asked.containsOrIncl(n.id):
pendingQueries.add(d.lookupWorker(n, target, fast))
if fast:
pendingQueries.add(d.lookupWorkerFast(n, target))
else:
pendingQueries.add(d.lookupWorker(n, target))
inc i
trace "discv5 pending queries", total = pendingQueries.len
@ -735,8 +707,7 @@ proc addProvider*(
res.add(d.localNode)
for toNode in res:
if toNode != d.localNode:
let reqId = RequestId.init(d.rng[])
d.sendRequest(toNode, AddProviderMessage(cId: cId, prov: pr), reqId)
discard d.sendRequest(toNode, AddProviderMessage(cId: cId, prov: pr))
else:
asyncSpawn d.addProviderLocal(cId, pr)
@ -749,21 +720,22 @@ proc sendGetProviders(d: Protocol, toNode: Node,
trace "sendGetProviders", toNode, msg
let
resp = await d.waitResponse(toNode, msg)
reqId = d.sendRequest(toNode, msg)
resp = await d.waitMessage(toNode, reqId)
d.routingTable.setJustSeen(toNode, resp.isSome())
if resp.isSome():
if resp.get().kind == MessageKind.providers:
d.routingTable.setJustSeen(toNode)
return ok(resp.get().provs)
else:
# TODO: do we need to do something when there is an invalid response?
d.replaceNode(toNode)
dht_message_requests_outgoing.inc(labelValues = ["invalid_response"])
discovery_message_requests_outgoing.inc(labelValues = ["invalid_response"])
return err("Invalid response to GetProviders message")
else:
# remove on loss only if there is a replacement
d.replaceNode(toNode, NoreplyRemoveThreshold)
dht_message_requests_outgoing.inc(labelValues = ["no_response"])
# TODO: do we need to do something when there is no response?
d.replaceNode(toNode)
discovery_message_requests_outgoing.inc(labelValues = ["no_response"])
return err("GetProviders response message not received in time")
proc getProvidersLocal*(
@ -836,7 +808,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
## This will take k nodes from the routing table closest to target and
## query them for nodes closest to target. If there are less than k nodes in
## the routing table, nodes returned by the first queries will be used.
var queryBuffer = d.routingTable.neighbours(target, k, QuerySeenThreshold)
var queryBuffer = d.routingTable.neighbours(target, k, seenOnly = false)
var asked, seen = initHashSet[NodeId]()
asked.incl(d.localNode.id) # No need to ask our own node
@ -851,7 +823,7 @@ proc query*(d: Protocol, target: NodeId, k = BUCKET_SIZE): Future[seq[Node]]
while i < min(queryBuffer.len, k) and pendingQueries.len < Alpha:
let n = queryBuffer[i]
if not asked.containsOrIncl(n.id):
pendingQueries.add(d.lookupWorker(n, target, false))
pendingQueries.add(d.lookupWorker(n, target))
inc i
trace "discv5 pending queries", total = pendingQueries.len
@ -962,8 +934,7 @@ proc revalidateNode*(d: Protocol, n: Node) {.async.} =
discard d.addNode(nodes[][0])
# Get IP and port from pong message and add it to the ip votes
trace "pong rx", n, myip = res.ip, myport = res.port
let a = Address(ip: res.ip, port: Port(res.port))
let a = Address(ip: ValidIpAddress.init(res.ip), port: Port(res.port))
d.ipVote.insert(n.id, a)
proc revalidateLoop(d: Protocol) {.async.} =
@ -1033,7 +1004,7 @@ proc ipMajorityLoop(d: Protocol) {.async.} =
warn "Failed updating SPR with newly discovered external address",
majority, previous, error = res.error
else:
dht_enr_auto_update.inc()
discovery_enr_auto_update.inc()
info "Updated SPR with newly discovered external address",
majority, previous, uri = toURI(d.localNode.record)
else:
@ -1048,19 +1019,6 @@ proc ipMajorityLoop(d: Protocol) {.async.} =
trace "ipMajorityLoop canceled"
trace "ipMajorityLoop exited!"
proc debugPrintLoop(d: Protocol) {.async.} =
## Loop which prints the neighborhood with stats
while true:
await sleepAsync(DebugPrintInterval)
for b in d.routingTable.buckets:
debug "bucket", depth = b.getDepth,
len = b.nodes.len, standby = b.replacementLen
for n in b.nodes:
debug "node", n, rttMin = n.stats.rttMin.int, rttAvg = n.stats.rttAvg.int,
reliability = n.seen.round(3)
# bandwidth estimates are based on limited information, so not logging it yet to avoid confusion
# trace "node", n, bwMaxMbps = (n.stats.bwMax / 1e6).round(3), bwAvgMbps = (n.stats.bwAvg / 1e6).round(3)
func init*(
T: type DiscoveryConfig,
tableIpLimit: uint,
@ -1076,7 +1034,7 @@ func init*(
proc newProtocol*(
privKey: PrivateKey,
enrIp: Option[IpAddress],
enrIp: Option[ValidIpAddress],
enrTcpPort, enrUdpPort: Option[Port],
localEnrFields: openArray[(string, seq[byte])] = [],
bootstrapRecords: openArray[SignedPeerRecord] = [],
@ -1198,7 +1156,6 @@ proc start*(d: Protocol) {.async.} =
d.refreshLoop = refreshLoop(d)
d.revalidateLoop = revalidateLoop(d)
d.ipMajorityLoop = ipMajorityLoop(d)
d.debugPrintLoop = debugPrintLoop(d)
await d.providers.start()

View File

@ -1,11 +1,11 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
{.push raises: [Defect].}
import std/sequtils

View File

@ -1,11 +1,11 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
{.push raises: [Defect].}
import std/sequtils
import std/strutils

View File

@ -1,11 +1,11 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
{.push raises: [Defect].}
import std/options
import std/sequtils

View File

@ -1,4 +1,4 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
@ -14,11 +14,12 @@ import pkg/datastore
import pkg/chronos
import pkg/libp2p
import pkg/chronicles
import pkg/stew/results as rs
import pkg/stew/byteutils
import pkg/questionable
import pkg/questionable/results
{.push raises: [].}
{.push raises: [Defect].}
import ./maintenance
import ./cache

View File

@ -1,26 +1,21 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
# * Apache v2 license (license terms in the root directory or at https://www.apache.org/licenses/LICENSE-2.0).
# at your option. This file may not be copied, modified, or distributed except according to those terms.
{.push raises: [].}
{.push raises: [Defect].}
import
std/[algorithm, net, times, sequtils, bitops, sets, options, tables],
stint, chronicles, metrics, bearssl/rand, chronos,
std/[algorithm, times, sequtils, bitops, sets, options, tables],
stint, chronicles, metrics, bearssl/rand, chronos, stew/shims/net as stewNet,
"."/[node, random2, spr]
export options
declarePublicGauge dht_routing_table_nodes,
declarePublicGauge routing_table_nodes,
"Discovery routing table nodes", labels = ["state"]
declarePublicGauge dht_routing_table_buckets,
"Discovery routing table: number of buckets"
logScope:
topics = "discv5 routingtable"
type
DistanceProc* = proc(a, b: NodeId): NodeId {.raises: [Defect], gcsafe, noSideEffect.}
@ -34,7 +29,7 @@ type
IpLimits* = object
limit*: uint
ips: Table[IpAddress, uint]
ips: Table[ValidIpAddress, uint]
RoutingTable* = object
@ -101,7 +96,7 @@ type
ReplacementExisting
NoAddress
func inc*(ipLimits: var IpLimits, ip: IpAddress): bool =
func inc*(ipLimits: var IpLimits, ip: ValidIpAddress): bool =
let val = ipLimits.ips.getOrDefault(ip, 0)
if val < ipLimits.limit:
ipLimits.ips[ip] = val + 1
@ -109,7 +104,7 @@ func inc*(ipLimits: var IpLimits, ip: IpAddress): bool =
else:
false
func dec*(ipLimits: var IpLimits, ip: IpAddress) =
func dec*(ipLimits: var IpLimits, ip: ValidIpAddress) =
let val = ipLimits.ips.getOrDefault(ip, 0)
if val == 1:
ipLimits.ips.del(ip)
@ -182,8 +177,6 @@ proc midpoint(k: KBucket): NodeId =
proc len(k: KBucket): int = k.nodes.len
proc replacementLen*(k: KBucket): int = k.replacementCache.len
proc tail(k: KBucket): Node = k.nodes[high(k.nodes)]
proc ipLimitInc(r: var RoutingTable, b: KBucket, n: Node): bool =
@ -212,14 +205,14 @@ proc ipLimitDec(r: var RoutingTable, b: KBucket, n: Node) =
proc add(k: KBucket, n: Node) =
k.nodes.add(n)
dht_routing_table_nodes.inc()
routing_table_nodes.inc()
proc remove(k: KBucket, n: Node): bool =
let i = k.nodes.find(n)
if i != -1:
dht_routing_table_nodes.dec()
if alreadySeen(k.nodes[i]):
dht_routing_table_nodes.dec(labelValues = ["seen"])
routing_table_nodes.dec()
if k.nodes[i].seen:
routing_table_nodes.dec(labelValues = ["seen"])
k.nodes.delete(i)
trace "removed node:", node = n
true
@ -285,15 +278,11 @@ proc computeSharedPrefixBits(nodes: openArray[NodeId]): int =
# Reaching this would mean that all node ids are equal.
doAssert(false, "Unable to calculate number of shared prefix bits")
proc getDepth*(b: KBucket) : int =
computeSharedPrefixBits(@[b.istart, b.iend])
proc init*(T: type RoutingTable, localNode: Node, bitsPerHop = DefaultBitsPerHop,
ipLimits = DefaultTableIpLimits, rng: ref HmacDrbgContext,
distanceCalculator = XorDistanceCalculator): T =
## Initialize the routing table for provided `Node` and bitsPerHop value.
## `bitsPerHop` is default set to 5 as recommended by original Kademlia paper.
dht_routing_table_buckets.inc()
RoutingTable(
localNode: localNode,
buckets: @[KBucket.new(0.u256, high(UInt256), ipLimits.bucketIpLimit)],
@ -307,7 +296,6 @@ proc splitBucket(r: var RoutingTable, index: int) =
let (a, b) = bucket.split()
r.buckets[index] = a
r.buckets.insert(b, index + 1)
dht_routing_table_buckets.inc()
proc bucketForNode(r: RoutingTable, id: NodeId): KBucket =
result = binaryGetBucketForNode(r.buckets, id)
@ -329,12 +317,15 @@ proc addReplacement(r: var RoutingTable, k: KBucket, n: Node): NodeStatus =
# gets moved to the tail.
if k.replacementCache[nodeIdx].address.get().ip != n.address.get().ip:
if not ipLimitInc(r, k, n):
trace "replace: ip limit reached"
return IpLimitReached
ipLimitDec(r, k, k.replacementCache[nodeIdx])
k.replacementCache.delete(nodeIdx)
k.replacementCache.add(n)
trace "replace: already existed"
return ReplacementExisting
elif not ipLimitInc(r, k, n):
trace "replace: ip limit reached (2)"
return IpLimitReached
else:
doAssert(k.replacementCache.len <= REPLACEMENT_CACHE_SIZE)
@ -345,7 +336,7 @@ proc addReplacement(r: var RoutingTable, k: KBucket, n: Node): NodeStatus =
k.replacementCache.delete(0)
k.replacementCache.add(n)
debug "Node added to replacement cache", n
trace "replace: added"
return ReplacementAdded
proc addNode*(r: var RoutingTable, n: Node): NodeStatus =
@ -412,50 +403,42 @@ proc addNode*(r: var RoutingTable, n: Node): NodeStatus =
return IpLimitReached
bucket.add(n)
debug "Node added to routing table", n
return Added
else:
# Bucket must be full, but lets see if it should be split the bucket.
# Bucket must be full, but lets see if it should be split the bucket.
# Calculate the prefix shared by all nodes in the bucket's range, not the
# ones actually in the bucket.
let depth = computeSharedPrefixBits(@[bucket.istart, bucket.iend])
# Split if the bucket has the local node in its range or if the depth is not
# congruent to 0 mod `bitsPerHop`
if bucket.inRange(r.localNode) or
(depth mod r.bitsPerHop != 0 and depth != ID_SIZE):
r.splitBucket(r.buckets.find(bucket))
return r.addNode(n) # retry adding
# When bucket doesn't get split the node is added to the replacement cache
return r.addReplacement(bucket, n)
# Calculate the prefix shared by all nodes in the bucket's range, not the
# ones actually in the bucket.
let depth = computeSharedPrefixBits(@[bucket.istart, bucket.iend])
# Split if the bucket has the local node in its range or if the depth is not
# congruent to 0 mod `bitsPerHop`
if bucket.inRange(r.localNode) or
(depth mod r.bitsPerHop != 0 and depth != ID_SIZE):
r.splitBucket(r.buckets.find(bucket))
return r.addNode(n) # retry adding
else:
# When bucket doesn't get split the node is added to the replacement cache
return r.addReplacement(bucket, n)
proc removeNode*(r: var RoutingTable, n: Node) =
## Remove the node `n` from the routing table.
## No replemennt added, even if there is in replacement cache.
let b = r.bucketForNode(n.id)
if b.remove(n):
ipLimitDec(r, b, n)
proc replaceNode*(r: var RoutingTable, n: Node, forceRemoveBelow = 1.0) =
proc replaceNode*(r: var RoutingTable, n: Node) =
## Replace node `n` with last entry in the replacement cache. If there are
## no entries in the replacement cache, node `n` will either be removed
## or kept based on `forceRemoveBelow`. Default: remove.
## Note: Kademlia paper recommends here to not remove nodes if there are no
## replacements. This might mean pinging nodes that are not reachable, but
## also avoids being too agressive because UDP losses or temporary network
## failures.
## no entries in the replacement cache, node `n` will simply be removed.
# TODO: Kademlia paper recommends here to not remove nodes if there are no
# replacements. However, that would require a bit more complexity in the
# revalidation as you don't want to try pinging that node all the time.
let b = r.bucketForNode(n.id)
if (b.replacementCache.len > 0 or n.seen <= forceRemoveBelow):
if b.remove(n):
debug "Node removed from routing table", n
ipLimitDec(r, b, n)
if b.remove(n):
ipLimitDec(r, b, n)
if b.replacementCache.len > 0:
# Nodes in the replacement cache are already included in the ip limits.
let rn = b.replacementCache[high(b.replacementCache)]
b.add(rn)
b.replacementCache.delete(high(b.replacementCache))
debug "Node added to routing table from replacement cache", node=rn
if b.replacementCache.len > 0:
# Nodes in the replacement cache are already included in the ip limits.
b.add(b.replacementCache[high(b.replacementCache)])
b.replacementCache.delete(high(b.replacementCache))
proc getNode*(r: RoutingTable, id: NodeId): Option[Node] =
## Get the `Node` with `id` as `NodeId` from the routing table.
@ -476,16 +459,16 @@ proc nodesByDistanceTo(r: RoutingTable, k: KBucket, id: NodeId): seq[Node] =
sortedByIt(k.nodes, r.distance(it.id, id))
proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE,
seenThreshold = 0.0): seq[Node] =
seenOnly = false): seq[Node] =
## Return up to k neighbours of the given node id.
## When seenThreshold is set, only nodes that have been contacted
## previously successfully and were seen enough recently will be selected.
## When seenOnly is set to true, only nodes that have been contacted
## previously successfully will be selected.
result = newSeqOfCap[Node](k * 2)
block addNodes:
for bucket in r.bucketsByDistanceTo(id):
for n in r.nodesByDistanceTo(bucket, id):
# Avoid nodes with 'seen' value below threshold
if n.seen >= seenThreshold:
# Only provide actively seen nodes when `seenOnly` set.
if not seenOnly or n.seen:
result.add(n)
if result.len == k * 2:
break addNodes
@ -497,22 +480,22 @@ proc neighbours*(r: RoutingTable, id: NodeId, k: int = BUCKET_SIZE,
result.setLen(k)
proc neighboursAtDistance*(r: RoutingTable, distance: uint16,
k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] =
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
## Return up to k neighbours at given logarithmic distance.
result = r.neighbours(r.idAtDistance(r.localNode.id, distance), k, seenThreshold)
result = r.neighbours(r.idAtDistance(r.localNode.id, distance), k, seenOnly)
# This is a bit silly, first getting closest nodes then to only keep the ones
# that are exactly the requested distance.
keepIf(result, proc(n: Node): bool = r.logDistance(n.id, r.localNode.id) == distance)
proc neighboursAtDistances*(r: RoutingTable, distances: seq[uint16],
k: int = BUCKET_SIZE, seenThreshold = 0.0): seq[Node] =
k: int = BUCKET_SIZE, seenOnly = false): seq[Node] =
## Return up to k neighbours at given logarithmic distances.
# TODO: This will currently return nodes with neighbouring distances on the
# first one prioritize. It might end up not including all the node distances
# requested. Need to rework the logic here and not use the neighbours call.
if distances.len > 0:
result = r.neighbours(r.idAtDistance(r.localNode.id, distances[0]), k,
seenThreshold)
seenOnly)
# This is a bit silly, first getting closest nodes then to only keep the ones
# that are exactly the requested distances.
keepIf(result, proc(n: Node): bool =
@ -524,30 +507,23 @@ proc len*(r: RoutingTable): int =
proc moveRight[T](arr: var openArray[T], a, b: int) =
## In `arr` move elements in range [a, b] right by 1.
var t: T
when declared(shallowCopy):
shallowCopy(t, arr[b + 1])
for i in countdown(b, a):
shallowCopy(arr[i + 1], arr[i])
shallowCopy(arr[a], t)
else:
t = move arr[b + 1]
for i in countdown(b, a):
arr[i + 1] = move arr[i]
arr[a] = move t
shallowCopy(t, arr[b + 1])
for i in countdown(b, a):
shallowCopy(arr[i + 1], arr[i])
shallowCopy(arr[a], t)
proc setJustSeen*(r: RoutingTable, n: Node, seen = true) =
## If seen, move `n` to the head (most recently seen) of its bucket.
proc setJustSeen*(r: RoutingTable, n: Node) =
## Move `n` to the head (most recently seen) of its bucket.
## If `n` is not in the routing table, do nothing.
let b = r.bucketForNode(n.id)
if seen:
let idx = b.nodes.find(n)
if idx >= 0:
if idx != 0:
b.nodes.moveRight(0, idx - 1)
let idx = b.nodes.find(n)
if idx >= 0:
if idx != 0:
b.nodes.moveRight(0, idx - 1)
if not alreadySeen(n): # first time seeing the node
dht_routing_table_nodes.inc(labelValues = ["seen"])
n.registerSeen(seen)
if not n.seen:
b.nodes[0].seen = true
routing_table_nodes.inc(labelValues = ["seen"])
proc nodeToRevalidate*(r: RoutingTable): Node =
## Return a node to revalidate. The least recently seen node from a random
@ -561,7 +537,7 @@ proc nodeToRevalidate*(r: RoutingTable): Node =
return b.nodes[^1]
proc randomNodes*(r: RoutingTable, maxAmount: int,
pred: proc(x: Node): bool {.gcsafe, noSideEffect, raises: [].} = nil): seq[Node] =
pred: proc(x: Node): bool {.gcsafe, noSideEffect.} = nil): seq[Node] =
## Get a `maxAmount` of random nodes from the routing table with the `pred`
## predicate function applied as filter on the nodes selected.
var maxAmount = maxAmount
@ -584,8 +560,7 @@ proc randomNodes*(r: RoutingTable, maxAmount: int,
# while it will take less total time compared to e.g. an (async)
# randomLookup, the time might be wasted as all nodes are possibly seen
# already.
# We check against the number of nodes to avoid an infinite loop in case of a filter.
while len(result) < maxAmount and len(seen) < sz:
while len(seen) < maxAmount:
let bucket = r.rng[].sample(r.buckets)
if bucket.nodes.len != 0:
let node = r.rng[].sample(bucket.nodes)

View File

@ -1,4 +1,4 @@
# logos-storage-dht - Logos Storage DHT
# codex-dht - Codex DHT
# Copyright (c) 2022 Status Research & Development GmbH
# Licensed and distributed under either of
# * MIT license (license terms in the root directory or at https://opensource.org/licenses/MIT).
@ -16,11 +16,11 @@
## - the one derived in the key-exchange started by the other node.
## To alleviate this issue, we store two decryption keys in each session.
{.push raises: [].}
{.push raises: [Defect].}
import
std/[net, options],
stint, stew/endians2,
std/options,
stint, stew/endians2, stew/shims/net,
node, lru
export lru
@ -39,7 +39,7 @@ type
func makeKey(id: NodeId, address: Address): SessionKey =
var pos = 0
result[pos ..< pos+sizeof(id)] = toBytesBE(id)
result[pos ..< pos+sizeof(id)] = toBytes(id)
pos.inc(sizeof(id))
case address.ip.family
of IpAddressFamily.IpV4:
@ -47,7 +47,7 @@ func makeKey(id: NodeId, address: Address): SessionKey =
of IpAddressFamily.IpV6:
result[pos ..< pos+sizeof(address.ip.address_v6)] = address.ip.address_v6
pos.inc(sizeof(address.ip.address_v6))
result[pos ..< pos+sizeof(address.port)] = toBytesBE(address.port.uint16)
result[pos ..< pos+sizeof(address.port)] = toBytes(address.port.uint16)
func swapr*(s: var Sessions, id: NodeId, address: Address) =
var value: array[3 * sizeof(AesKey), byte]

View File

@ -6,10 +6,10 @@
#
import
chronicles,
results,
std/[net, options, strutils, sugar],
pkg/stew/[byteutils, arrayops],
std/[options, strutils, sugar],
pkg/stew/[results, byteutils, arrayops],
stew/endians2,
stew/shims/net,
stew/base64,
libp2p/crypto/crypto,
libp2p/crypto/secp,
@ -58,7 +58,7 @@ proc incSeqNo*(
proc update*(
r: var SignedPeerRecord,
pk: crypto.PrivateKey,
ip: Option[IpAddress],
ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port] = none[Port]()):
RecordResult[void] =
## Update a `SignedPeerRecord` with given ip address, tcp port, udp port and optional
@ -97,8 +97,9 @@ proc update*(
if udpPort.isNone and tcpPort.isNone:
return err "No existing address in SignedPeerRecord with no port provided"
let ipAddr = ip.get
let ipAddr = try: ValidIpAddress.init(ip.get)
except ValueError as e:
return err ("Existing address contains invalid address: " & $e.msg).cstring
if tcpPort.isSome:
transProto = IpTransportProtocol.tcpProtocol
transProtoPort = tcpPort.get
@ -122,13 +123,9 @@ proc update*(
.mapErr((e: string) => e.cstring)
existingIp =
if existingNetProtoFam == MultiCodec.codec("ip6"):
IpAddress(
family: IPv6, address_v6: array[16, byte].initCopyFrom(existingNetProtoAddr)
)
ipv6 array[16, byte].initCopyFrom(existingNetProtoAddr)
else:
IpAddress(
family: IPv4, address_v4: array[4, byte].initCopyFrom(existingNetProtoAddr)
)
ipv4 array[4, byte].initCopyFrom(existingNetProtoAddr)
ipAddr = ip.get(existingIp)
@ -226,7 +223,7 @@ proc init*(
T: type SignedPeerRecord,
seqNum: uint64,
pk: PrivateKey,
ip: Option[IpAddress],
ip: Option[ValidIpAddress],
tcpPort, udpPort: Option[Port]):
RecordResult[T] =
## Initialize a `SignedPeerRecord` with given sequence number, private key, optional
@ -241,7 +238,9 @@ proc init*(
tcpPort, udpPort
var
ipAddr = static parseIpAddress("127.0.0.1")
ipAddr = try: ValidIpAddress.init("127.0.0.1")
except ValueError as e:
return err ("Existing address contains invalid address: " & $e.msg).cstring
proto: IpTransportProtocol
protoPort: Port

View File

@ -6,38 +6,26 @@
# Everything below the handling of ordinary messages
import
std/[net, tables, options, sets],
std/[tables, options, sets],
bearssl/rand,
chronos,
chronicles,
metrics,
libp2p/crypto/crypto,
stew/shims/net,
"."/[node, encoding, sessions]
const
handshakeTimeout* = 500.milliseconds ## timeout for the reply on the
handshakeTimeout* = 2.seconds ## timeout for the reply on the
## whoareyou message
responseTimeout* = 1.seconds ## timeout for the response of a request-response
responseTimeout* = 4.seconds ## timeout for the response of a request-response
## call
logScope:
topics = "discv5 transport"
declarePublicCounter dht_transport_tx_packets,
"Discovery transport packets sent", labels = ["state"]
declarePublicCounter dht_transport_tx_bytes,
"Discovery transport bytes sent", labels = ["state"]
declarePublicCounter dht_transport_rx_packets,
"Discovery transport packets received", labels = ["state"]
declarePublicCounter dht_transport_rx_bytes,
"Discovery transport bytes received", labels = ["state"]
type
Transport* [Client] = ref object
client: Client
bindAddress: Address ## UDP binding address
transp: DatagramTransport
pendingRequests: Table[AESGCMNonce, (PendingRequest, Moment)]
pendingRequests: Table[AESGCMNonce, PendingRequest]
keyexchangeInProgress: HashSet[NodeId]
pendingRequestsByNode: Table[NodeId, seq[seq[byte]]]
codec*: Codec
@ -65,11 +53,7 @@ proc sendToA(t: Transport, a: Address, msg: seq[byte]) =
# nodes. Else the revalidation might end up clearing the routing tabl
# because of ping failures due to own network connection failure.
warn "Discovery send failed", msg = f.readError.msg
dht_transport_tx_packets.inc(labelValues = ["failed"])
dht_transport_tx_bytes.inc(msg.len.int64, labelValues = ["failed"])
)
dht_transport_tx_packets.inc()
dht_transport_tx_bytes.inc(msg.len.int64)
proc send(t: Transport, n: Node, data: seq[byte]) =
doAssert(n.address.isSome())
@ -86,7 +70,7 @@ proc sendMessage*(t: Transport, toId: NodeId, toAddr: Address, message: seq[byte
proc registerRequest(t: Transport, n: Node, message: seq[byte],
nonce: AESGCMNonce) =
let request = PendingRequest(node: n, message: message)
if not t.pendingRequests.hasKeyOrPut(nonce, (request, Moment.now())):
if not t.pendingRequests.hasKeyOrPut(nonce, request):
sleepAsync(responseTimeout).addCallback() do(data: pointer):
t.pendingRequests.del(nonce)
@ -109,7 +93,7 @@ proc sendMessage*(t: Transport, toNode: Node, message: seq[byte]) =
t.send(toNode, data)
t.keyexchangeInProgress.incl(toNode.id)
trace "keyexchangeInProgress added", myport = t.bindAddress.port , dstId = toNode
sleepAsync(handshakeTimeout).addCallback() do(data: pointer):
sleepAsync(responseTimeout).addCallback() do(data: pointer):
t.keyexchangeInProgress.excl(toNode.id)
trace "keyexchangeInProgress removed (timeout)", myport = t.bindAddress.port , dstId = toNode
else:
@ -132,7 +116,8 @@ proc sendWhoareyou(t: Transport, toId: NodeId, a: Address,
let data = encodeWhoareyouPacket(t.rng[], t.codec, toId, a, requestNonce,
recordSeq, pubkey)
sleepAsync(handshakeTimeout).addCallback() do(data: pointer):
# handshake key is popped in decodeHandshakePacket. if not yet popped by timeout:
# TODO: should we still provide cancellation in case handshake completes
# correctly?
if t.codec.hasHandshake(key):
debug "Handshake timeout", myport = t.bindAddress.port , dstId = toId, address = a
t.codec.handshakes.del(key)
@ -157,8 +142,6 @@ proc sendPending(t:Transport, toNode: Node):
t.pendingRequestsByNode.del(toNode.id)
proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
dht_transport_rx_packets.inc()
dht_transport_rx_bytes.inc(packet.len.int64)
let decoded = t.codec.decodePacket(a, packet)
if decoded.isOk:
let packet = decoded[]
@ -183,16 +166,9 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
of Flag.Whoareyou:
trace "Received whoareyou packet", myport = t.bindAddress.port, address = a
var
prt: (PendingRequest, Moment)
if t.pendingRequests.take(packet.whoareyou.requestNonce, prt):
let
pr = prt[0]
startTime = prt[1]
toNode = pr.node
rtt = Moment.now() - startTime
# trace "whoareyou RTT:", rtt, node = toNode
toNode.registerRtt(rtt)
var pr: PendingRequest
if t.pendingRequests.take(packet.whoareyou.requestNonce, pr):
let toNode = pr.node
# This is a node we previously contacted and thus must have an address.
doAssert(toNode.address.isSome())
let address = toNode.address.get()
@ -230,16 +206,11 @@ proc receive*(t: Transport, a: Address, packet: openArray[byte]) =
if node.address.isSome() and a == node.address.get():
# TODO: maybe here we could verify that the address matches what we were
# sending the 'whoareyou' message to. In that case, we can set 'seen'
# TODO: verify how this works with restrictive NAT and firewall scenarios.
node.registerSeen()
node.seen = true
if t.client.addNode(node):
trace "Added new node to routing table after handshake", node, tablesize=t.client.nodesDiscovered()
discard t.sendPending(node)
else:
trace "address mismatch, not adding seen flag", node, address = a, nodeAddress = node.address.get()
else:
dht_transport_rx_packets.inc(labelValues = ["failed_decode"])
dht_transport_rx_bytes.inc(packet.len.int64, labelValues = ["failed_decode"])
trace "Packet decoding error", myport = t.bindAddress.port, error = decoded.error, address = a
proc processClient[T](transp: DatagramTransport, raddr: TransportAddress):
@ -258,7 +229,7 @@ proc processClient[T](transp: DatagramTransport, raddr: TransportAddress):
except ValueError as e:
error "Not a valid IpAddress", exception = e.name, msg = e.msg
return
let a = Address(ip: ip, port: raddr.port)
let a = Address(ip: ValidIpAddress.init(ip), port: raddr.port)
t.receive(a, buf)
@ -291,7 +262,7 @@ proc newTransport*[T](
Transport[T](
client: client,
bindAddress: Address(ip: bindIp, port: bindPort),
bindAddress: Address(ip: ValidIpAddress.init(bindIp), port: bindPort),
codec: Codec(
localNode: localNode,
privKey: privKey,

View File

@ -1,6 +1,10 @@
switch("define", "libp2p_pki_schemes=secp256k1")
include "build.nims"
# begin Nimble config (version 2)
--noNimblePath
when withDir(thisDir(), system.fileExists("nimble.paths")):
include "nimble.paths"
# end Nimble config

347
nimble.lock Normal file
View File

@ -0,0 +1,347 @@
{
"version": 2,
"packages": {
"nim": {
"version": "1.6.18",
"vcsRevision": "a749a8b742bd0a4272c26a65517275db4720e58a",
"url": "https://github.com/nim-lang/Nim.git",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "58115cfda490735d846a2116ab53e814bb5559d3"
}
},
"asynctest": {
"version": "0.4.3",
"vcsRevision": "9f31323a5f38bf9d7402676f8171aceea45fa091",
"url": "https://github.com/markspanbroek/asynctest",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "55d18b230dd63a1bebefed235aceee6e986b9193"
}
},
"results": {
"version": "0.4.0",
"vcsRevision": "113d433f48894ee8e7da3e340c8fe19ad7b9db4d",
"url": "https://github.com/arnetheduck/nim-results",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "bb03f4dd49b66ba6973257ae1fc121756704b0b1"
}
},
"unittest2": {
"version": "0.0.9",
"vcsRevision": "299bc9a5741f1cd34e5014187e66d904b2f00b37",
"url": "https://github.com/status-im/nim-unittest2",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "2f316e59b1c61a6a51d0c1bfa8b4fea24a7c60ea"
}
},
"stew": {
"version": "0.1.0",
"vcsRevision": "2c2544aec13536304438be045bfdd22452741466",
"url": "https://github.com/status-im/nim-stew",
"downloadMethod": "git",
"dependencies": [
"results",
"unittest2"
],
"checksums": {
"sha1": "0d3c4f15a4cff934ec30e5b2d5fe590922839a4e"
}
},
"faststreams": {
"version": "0.3.0",
"vcsRevision": "720fc5e5c8e428d9d0af618e1e27c44b42350309",
"url": "https://github.com/status-im/nim-faststreams",
"downloadMethod": "git",
"dependencies": [
"stew",
"unittest2"
],
"checksums": {
"sha1": "ab178ba25970b95d953434b5d86b4d60396ccb64"
}
},
"serialization": {
"version": "0.2.2",
"vcsRevision": "4d541ec43454809904fc4c3c0a7436410ad597d2",
"url": "https://github.com/status-im/nim-serialization",
"downloadMethod": "git",
"dependencies": [
"faststreams",
"unittest2",
"stew"
],
"checksums": {
"sha1": "1dcdb29f17d0aff295e7e57edf530b1e16fb6c59"
}
},
"bearssl": {
"version": "0.2.1",
"vcsRevision": "d55d3a86d7ec3ad11b244e17b3bad490bfbd076d",
"url": "https://github.com/status-im/nim-bearssl",
"downloadMethod": "git",
"dependencies": [
"unittest2"
],
"checksums": {
"sha1": "5327c983483c4dd465347c6b8a974239c7c6c612"
}
},
"httputils": {
"version": "0.3.0",
"vcsRevision": "77a59297ad635d91a6352ef48ee09c6bde0c5d86",
"url": "https://github.com/status-im/nim-http-utils",
"downloadMethod": "git",
"dependencies": [
"stew",
"unittest2"
],
"checksums": {
"sha1": "f53480dd7ed0ac9b3ca7972f8b5e0f1862ce99ac"
}
},
"chronos": {
"version": "3.2.0",
"vcsRevision": "c41599a6d6d8b11c729032bf8913e06f4171e0fb",
"url": "https://github.com/status-im/nim-chronos",
"downloadMethod": "git",
"dependencies": [
"results",
"stew",
"bearssl",
"httputils",
"unittest2"
],
"checksums": {
"sha1": "41227a7f20f19cbc6f343c9cae86861f81b7a7d0"
}
},
"questionable": {
"version": "0.10.12",
"vcsRevision": "2dd6b6b220f9f14a1231f6cafdf24f012bcc8414",
"url": "https://github.com/markspanbroek/questionable",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "3860fae4394990982d878182f3d3931b8fcb5a10"
}
},
"sqlite3_abi": {
"version": "3.40.1.1",
"vcsRevision": "362e1bd9f689ad9f5380d9d27f0705b3d4dfc7d3",
"url": "https://github.com/arnetheduck/nim-sqlite3-abi",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "8e91db8156a82383d9c48f53b33e48f4e93077b1"
}
},
"upraises": {
"version": "0.1.0",
"vcsRevision": "d9f268db1021959fe0f2c7a5e49fba741f9932a0",
"url": "https://github.com/markspanbroek/upraises",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "176234f808b44a0be763df706ed634d6e8df17bb"
}
},
"datastore": {
"version": "0.0.1",
"vcsRevision": "cadf38db576a2cf6145188f285f042e56aad7c91",
"url": "https://github.com/codex-storage/nim-datastore",
"downloadMethod": "git",
"dependencies": [
"asynctest",
"chronos",
"questionable",
"sqlite3_abi",
"stew",
"unittest2",
"upraises"
],
"checksums": {
"sha1": "5b5bc0576f7d0565bb624a4ae7ad2717805b6849"
}
},
"testutils": {
"version": "0.5.0",
"vcsRevision": "dfc4c1b39f9ded9baf6365014de2b4bfb4dafc34",
"url": "https://github.com/status-im/nim-testutils",
"downloadMethod": "git",
"dependencies": [
"unittest2"
],
"checksums": {
"sha1": "756d0757c4dd06a068f9d38c7f238576ba5ee897"
}
},
"json_serialization": {
"version": "0.2.2",
"vcsRevision": "3f1ce24ee116daedbc9c8be525e63ec03e185a28",
"url": "https://github.com/status-im/nim-json-serialization",
"downloadMethod": "git",
"dependencies": [
"serialization",
"stew"
],
"checksums": {
"sha1": "da0d38b775f222703784b273225fe89267430482"
}
},
"chronicles": {
"version": "0.10.3",
"vcsRevision": "32ac8679680ea699f7dbc046e8e0131cac97d41a",
"url": "https://github.com/status-im/nim-chronicles",
"downloadMethod": "git",
"dependencies": [
"testutils",
"json_serialization"
],
"checksums": {
"sha1": "79f09526d4d9b9196dd2f6a75310d71a890c4f88"
}
},
"nimcrypto": {
"version": "0.6.0",
"vcsRevision": "a079df92424968d46a6ac258299ce9380aa153f2",
"url": "https://github.com/cheatfate/nimcrypto",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "be9a4654dd5839b97f39d5060d98f18ba868623c"
}
},
"zlib": {
"version": "0.1.0",
"vcsRevision": "a2f44bb7f65571a894227ff6fde9298a104e03a5",
"url": "https://github.com/status-im/nim-zlib",
"downloadMethod": "git",
"dependencies": [
"stew"
],
"checksums": {
"sha1": "edbf76ebdecb63d302d1883fe4b23b2eb0608cb7"
}
},
"websock": {
"version": "0.1.0",
"vcsRevision": "f8ed9b40a5ff27ad02a3c237c4905b0924e3f982",
"url": "https://github.com/status-im/nim-websock",
"downloadMethod": "git",
"dependencies": [
"chronos",
"httputils",
"chronicles",
"stew",
"nimcrypto",
"bearssl",
"zlib"
],
"checksums": {
"sha1": "94f836ae589056b2deb04bdfdcd614fff80adaf5"
}
},
"secp256k1": {
"version": "0.6.0.3.1",
"vcsRevision": "2acbbdcc0e63002a013fff49f015708522875832",
"url": "https://github.com/status-im/nim-secp256k1",
"downloadMethod": "git",
"dependencies": [
"stew",
"nimcrypto"
],
"checksums": {
"sha1": "146818431dec16ededb951f42fc36832949bcc8f"
}
},
"dnsclient": {
"version": "0.3.4",
"vcsRevision": "23214235d4784d24aceed99bbfe153379ea557c8",
"url": "https://github.com/ba0f3/dnsclient.nim",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "65262c7e533ff49d6aca5539da4bc6c6ce132f40"
}
},
"npeg": {
"version": "1.2.0",
"vcsRevision": "22449099d92d8bbd535fcd950287274c8d11daed",
"url": "https://github.com/zevv/npeg",
"downloadMethod": "git",
"dependencies": [],
"checksums": {
"sha1": "bfe84b7b89a4f92e9ca6d9be8256bdda032e556b"
}
},
"stint": {
"version": "2.0.0",
"vcsRevision": "711cda4456c32d3ba3c6c4524135b3453dffeb9c",
"url": "https://github.com/status-im/nim-stint",
"downloadMethod": "git",
"dependencies": [
"stew"
],
"checksums": {
"sha1": "432d8fa883c807932fc78ecd33fc944637e2d328"
}
},
"metrics": {
"version": "0.0.1",
"vcsRevision": "51f1227d0fd04ce84b1ef784b11280cb7875348c",
"url": "https://github.com/status-im/nim-metrics",
"downloadMethod": "git",
"dependencies": [
"chronos"
],
"checksums": {
"sha1": "948aaca1763c838a7752251bab9a8bcfda66acfe"
}
},
"protobuf_serialization": {
"version": "0.3.0",
"vcsRevision": "5a31137a82c2b6a989c9ed979bb636c7a49f570e",
"url": "https://github.com/status-im/nim-protobuf-serialization",
"downloadMethod": "git",
"dependencies": [
"stew",
"faststreams",
"serialization",
"npeg",
"unittest2"
],
"checksums": {
"sha1": "ed8270a5f874af35c5e9c04b50020c8a27ba61f5"
}
},
"libp2p": {
"version": "1.1.0",
"vcsRevision": "e3c967ad1939fb33b8e13759037d193734acd202",
"url": "https://github.com/status-im/nim-libp2p",
"downloadMethod": "git",
"dependencies": [
"nimcrypto",
"dnsclient",
"bearssl",
"chronicles",
"chronos",
"metrics",
"secp256k1",
"stew",
"websock",
"unittest2"
],
"checksums": {
"sha1": "fbc9aef1a5d8e9cc79fed0328266eacc0339de47"
}
}
},
"tasks": {}
}

2
tests/coverage.nim Normal file
View File

@ -0,0 +1,2 @@
include ./testAll

15
tests/coverage.nims Normal file
View File

@ -0,0 +1,15 @@
switch("define", "testsAll")
switch("debugger", "native")
switch("lineDir", "on")
switch("define", "debug")
# switch("opt", "none")
switch("verbosity", "0")
switch("hints", "off")
switch("warnings", "off")
switch("define", "chronicles_log_level=INFO")
switch("nimcache", "nimcache")
switch("passC", "-fprofile-arcs")
switch("passC", "-ftest-coverage")
switch("passL", "-fprofile-arcs")
switch("passL", "-ftest-coverage")

View File

@ -1,14 +1,17 @@
import
std/net,
bearssl/rand,
chronos,
libp2p/crypto/[crypto, secp],
libp2p/multiaddress,
codexdht/discv5/[node, routing_table, spr],
codexdht/discv5/protocol as discv5_protocol
codexdht/discv5/crypto as dhtcrypto,
codexdht/discv5/protocol as discv5_protocol,
stew/shims/net
export net
proc localAddress*(port: int): Address =
Address(ip: IPv4_loopback(), port: Port(port))
Address(ip: ValidIpAddress.init("127.0.0.1"), port: Port(port))
proc example*(T: type PrivateKey, rng: ref HmacDrbgContext): PrivateKey =
PrivateKey.random(PKScheme.Secp256k1, rng[]).expect("Valid rng for private key")
@ -51,7 +54,7 @@ proc nodeIdInNodes*(id: NodeId, nodes: openArray[Node]): bool =
if id == n.id: return true
proc generateNode*(privKey: PrivateKey, port: int,
ip: IpAddress = parseIpAddress("127.0.0.1")): Node =
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): Node =
let
port = Port(port)
@ -69,7 +72,7 @@ proc generateNRandomNodes*(rng: ref HmacDrbgContext, n: int): seq[Node] =
res
proc nodeAndPrivKeyAtDistance*(n: Node, rng: var HmacDrbgContext, d: uint32,
ip: IpAddress = parseIpAddress("127.0.0.1")): (Node, PrivateKey) =
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): (Node, PrivateKey) =
while true:
let
privKey = PrivateKey.random(rng).expect("Valid rng for private key")
@ -78,37 +81,37 @@ proc nodeAndPrivKeyAtDistance*(n: Node, rng: var HmacDrbgContext, d: uint32,
return (node, privKey)
proc nodeAtDistance*(n: Node, rng: var HmacDrbgContext, d: uint32,
ip: IpAddress = parseIpAddress("127.0.0.1")): Node =
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): Node =
let (node, _) = n.nodeAndPrivKeyAtDistance(rng, d, ip)
node
proc nodesAtDistance*(
n: Node, rng: var HmacDrbgContext, d: uint32, amount: int,
ip: IpAddress = parseIpAddress("127.0.0.1")): seq[Node] =
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): seq[Node] =
for i in 0..<amount:
result.add(nodeAtDistance(n, rng, d, ip))
proc nodesAtDistanceUniqueIp*(
n: Node, rng: var HmacDrbgContext, d: uint32, amount: int,
ip: IpAddress = parseIpAddress("127.0.0.1")): seq[Node] =
ip: ValidIpAddress = ValidIpAddress.init("127.0.0.1")): seq[Node] =
var ta = initTAddress(ip, Port(0))
for i in 0..<amount:
ta.inc()
result.add(nodeAtDistance(n, rng, d, ta.address()))
result.add(nodeAtDistance(n, rng, d, ValidIpAddress.init(ta.address())))
proc addSeenNode*(d: discv5_protocol.Protocol, n: Node): bool =
# Add it as a seen node, warning: for testing convenience only!
n.registerSeen()
n.seen = true
d.addNode(n)
func udpExample*(_: type MultiAddress): MultiAddress =
## creates a new udp MultiAddress on a random port
MultiAddress.init("/ip4/0.0.0.0/udp/0")
## creates a new udp multiaddress on a random port
Multiaddress.init("/ip4/0.0.0.0/udp/0")
func udpExamples*(_: type MultiAddress, count: int): seq[MultiAddress] =
var res: seq[MultiAddress] = @[]
for i in 1..count:
res.add MultiAddress.init("/ip4/0.0.0.0/udp/" & $i).get
res.add Multiaddress.init("/ip4/0.0.0.0/udp/" & $i).get
return res
proc toSignedPeerRecord*(privKey: PrivateKey) : SignedPeerRecord =

View File

@ -2,10 +2,11 @@
import std/sequtils
import pkg/chronos
import pkg/asynctest/chronos/unittest
import pkg/asynctest
import pkg/datastore
from pkg/libp2p import PeerId
import codexdht/dht
import codexdht/private/eth/p2p/discoveryv5/spr
import codexdht/private/eth/p2p/discoveryv5/providers
import codexdht/discv5/node

View File

@ -10,15 +10,18 @@
{.used.}
import
std/[options],
asynctest/chronos/unittest2,
std/[options, sequtils],
asynctest,
bearssl/rand,
chronicles,
chronos,
nimcrypto,
libp2p/crypto/[crypto, secp],
libp2p/[multiaddress, multicodec, multihash, routing_record, signed_envelope],
codexdht/dht,
codexdht/discv5/crypto as dhtcrypto,
codexdht/discv5/protocol as discv5_protocol,
stew/byteutils,
test_helper
proc bootstrapNodes(

View File

@ -2,7 +2,7 @@
import
std/tables,
chronos, chronicles, stint, asynctest/chronos/unittest,
chronos, chronicles, stint, asynctest, stew/shims/net,
stew/byteutils, bearssl/rand,
libp2p/crypto/crypto,
codexdht/discv5/[transport, spr, node, routing_table, encoding, sessions, nodes_verification],
@ -287,7 +287,7 @@ suite "Discovery v5 Tests":
await mainNode.closeWait()
await testNode.closeWait()
proc testLookupTargets(fast: bool = false): Future[bool] {.async.} =
proc testLookupTargets(fast: bool = false) {.async: (raises: [Exception]).} =
const
nodeCount = 17
@ -306,9 +306,9 @@ suite "Discovery v5 Tests":
for t in nodes:
if n != t:
let pong = await n.ping(t.localNode)
check pong.isOk()
if pong.isErr():
echo pong.error
return false
# check (await n.ping(t.localNode)).isOk()
for i in 1 ..< nodeCount:
@ -318,19 +318,16 @@ suite "Discovery v5 Tests":
let target = nodes[i]
let discovered = await nodes[nodeCount-1].lookup(target.localNode.id, fast = fast)
debug "Lookup result", target = target.localNode, discovered
if discovered[0] != target.localNode:
return false
check discovered[0] == target.localNode
for node in nodes:
await node.closeWait()
return true
test "Lookup targets":
check await testLookupTargets()
await testLookupTargets()
test "Lookup targets using traditional findNode":
check await testLookupTargets(fast = true)
await testLookupTargets(fast = true)
test "Resolve target":
let
@ -415,37 +412,31 @@ suite "Discovery v5 Tests":
await mainNode.closeWait()
await lookupNode.closeWait()
test "Random nodes, also with filter":
let
lookupNode = initDiscoveryNode(rng, PrivateKey.example(rng), localAddress(20301))
targetNode = initDiscoveryNode(rng, PrivateKey.example(rng), localAddress(20302))
otherNode = initDiscoveryNode(rng, PrivateKey.example(rng), localAddress(20303))
anotherNode = initDiscoveryNode(rng, PrivateKey.example(rng), localAddress(20304))
# We no longer support field filtering
# test "Random nodes with spr field filter":
# let
# lookupNode = initDiscoveryNode(rng, PrivateKey.example(rng), localAddress(20301))
# targetNode = generateNode(PrivateKey.example(rng))
# otherNode = generateNode(PrivateKey.example(rng))
# anotherNode = generateNode(PrivateKey.example(rng))
check:
lookupNode.addNode(targetNode.localNode.record)
lookupNode.addNode(otherNode.localNode.record)
lookupNode.addNode(anotherNode.localNode.record)
# check:
# lookupNode.addNode(targetNode)
# lookupNode.addNode(otherNode)
# lookupNode.addNode(anotherNode)
let discovered = lookupNode.randomNodes(10)
check discovered.len == 3
let discoveredFiltered = lookupNode.randomNodes(10,
proc(n: Node) : bool = n.address.get.port == Port(20302))
check discoveredFiltered.len == 1 and discoveredFiltered.contains(targetNode.localNode)
let discoveredEmpty = lookupNode.randomNodes(10,
proc(n: Node) : bool = n.address.get.port == Port(20305))
check discoveredEmpty.len == 0
await lookupNode.closeWait()
await targetNode.closeWait()
await otherNode.closeWait()
await anotherNode.closeWait()
# let discovered = lookupNode.randomNodes(10)
# check discovered.len == 3
# let discoveredFiltered = lookupNode.randomNodes(10,
# ("test", @[byte 1,2,3,4]))
# check discoveredFiltered.len == 1 and discoveredFiltered.contains(targetNode)
# await lookupNode.closeWait()
test "New protocol with spr":
let
privKey = PrivateKey.example(rng)
ip = some(parseIpAddress("127.0.0.1"))
ip = some(ValidIpAddress.init("127.0.0.1"))
port = Port(20301)
node = newProtocol(privKey, ip, some(port), some(port), bindPort = port,
rng = rng)
@ -540,7 +531,7 @@ suite "Discovery v5 Tests":
let
port = Port(9000)
fromNoderecord = SignedPeerRecord.init(1, PrivateKey.example(rng),
some(parseIpAddress("11.12.13.14")),
some(ValidIpAddress.init("11.12.13.14")),
some(port), some(port))[]
fromNode = newNode(fromNoderecord)[]
privKey = PrivateKey.example(rng)
@ -552,7 +543,7 @@ suite "Discovery v5 Tests":
block: # Duplicates
let
record = SignedPeerRecord.init(
1, privKey, some(parseIpAddress("12.13.14.15")),
1, privKey, some(ValidIpAddress.init("12.13.14.15")),
some(port), some(port))[]
# Exact duplicates
@ -562,7 +553,7 @@ suite "Discovery v5 Tests":
# Node id duplicates
let recordSameId = SignedPeerRecord.init(
1, privKey, some(parseIpAddress("212.13.14.15")),
1, privKey, some(ValidIpAddress.init("212.13.14.15")),
some(port), some(port))[]
records.add(recordSameId)
nodes = verifyNodesRecords(records, fromNode, limit, targetDistance)
@ -571,7 +562,7 @@ suite "Discovery v5 Tests":
block: # No address
let
recordNoAddress = SignedPeerRecord.init(
1, privKey, none(IpAddress), some(port), some(port))[]
1, privKey, none(ValidIpAddress), some(port), some(port))[]
records = [recordNoAddress]
test = verifyNodesRecords(records, fromNode, limit, targetDistance)
check test.len == 0
@ -579,7 +570,7 @@ suite "Discovery v5 Tests":
block: # Invalid address - site local
let
recordInvalidAddress = SignedPeerRecord.init(
1, privKey, some(parseIpAddress("10.1.2.3")),
1, privKey, some(ValidIpAddress.init("10.1.2.3")),
some(port), some(port))[]
records = [recordInvalidAddress]
test = verifyNodesRecords(records, fromNode, limit, targetDistance)
@ -588,7 +579,7 @@ suite "Discovery v5 Tests":
block: # Invalid address - loopback
let
recordInvalidAddress = SignedPeerRecord.init(
1, privKey, some(parseIpAddress("127.0.0.1")),
1, privKey, some(ValidIpAddress.init("127.0.0.1")),
some(port), some(port))[]
records = [recordInvalidAddress]
test = verifyNodesRecords(records, fromNode, limit, targetDistance)
@ -597,7 +588,7 @@ suite "Discovery v5 Tests":
block: # Invalid distance
let
recordInvalidDistance = SignedPeerRecord.init(
1, privKey, some(parseIpAddress("12.13.14.15")),
1, privKey, some(ValidIpAddress.init("12.13.14.15")),
some(port), some(port))[]
records = [recordInvalidDistance]
test = verifyNodesRecords(records, fromNode, limit, @[0'u16])
@ -606,7 +597,7 @@ suite "Discovery v5 Tests":
block: # Invalid distance but distance validation is disabled
let
recordInvalidDistance = SignedPeerRecord.init(
1, privKey, some(parseIpAddress("12.13.14.15")),
1, privKey, some(ValidIpAddress.init("12.13.14.15")),
some(port), some(port))[]
records = [recordInvalidDistance]
test = verifyNodesRecords(records, fromNode, limit)
@ -633,7 +624,7 @@ suite "Discovery v5 Tests":
let
privKey = PrivateKey.example(rng)
enrRec = SignedPeerRecord.init(1, privKey,
some(parseIpAddress("127.0.0.1")), some(Port(9000)),
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
some(Port(9000))).expect("Properly intialized private key")
sendNode = newNode(enrRec).expect("Properly initialized record")
var codec = Codec(localNode: sendNode, privKey: privKey, sessions: Sessions.init(5))
@ -662,7 +653,7 @@ suite "Discovery v5 Tests":
let
privKey = PrivateKey.example(rng)
enrRec = SignedPeerRecord.init(1, privKey,
some(parseIpAddress("127.0.0.1")), some(Port(9000)),
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
some(Port(9000))).expect("Properly intialized private key")
sendNode = newNode(enrRec).expect("Properly initialized record")
var codec = Codec(localNode: sendNode, privKey: privKey, sessions: Sessions.init(5))
@ -693,7 +684,7 @@ suite "Discovery v5 Tests":
a = localAddress(20303)
privKey = PrivateKey.example(rng)
enrRec = SignedPeerRecord.init(1, privKey,
some(parseIpAddress("127.0.0.1")), some(Port(9000)),
some(ValidIpAddress.init("127.0.0.1")), some(Port(9000)),
some(Port(9000))).expect("Properly intialized private key")
sendNode = newNode(enrRec).expect("Properly initialized record")
var codec = Codec(localNode: sendNode, privKey: privKey, sessions: Sessions.init(5))

View File

@ -2,13 +2,14 @@
import
std/[options, sequtils, tables],
asynctest/chronos/unittest2,
asynctest/unittest2,
bearssl/rand,
chronos,
libp2p/crypto/secp,
codexdht/discv5/[messages, messages_encoding, encoding, spr, node, sessions],
codexdht/discv5/crypto,
stew/byteutils,
stew/shims/net,
stint,
../dht/test_helper
@ -274,11 +275,11 @@ suite "Discovery v5.1 Packet Encodings Test Vectors":
let
enrRecA = SignedPeerRecord.init(1, privKeyA,
some(parseIpAddress("127.0.0.1")), some(Port(9001)),
some(ValidIpAddress.init("127.0.0.1")), some(Port(9001)),
some(Port(9001))).expect("Properly intialized private key")
enrRecB = SignedPeerRecord.init(1, privKeyB,
some(parseIpAddress("127.0.0.1")), some(Port(9001)),
some(ValidIpAddress.init("127.0.0.1")), some(Port(9001)),
some(Port(9001))).expect("Properly intialized private key")
nodeA = newNode(enrRecA).expect("Properly initialized record")
@ -507,11 +508,11 @@ suite "Discovery v5.1 Additional Encode/Decode":
let
enrRecA = SignedPeerRecord.init(1, privKeyA,
some(parseIpAddress("127.0.0.1")), some(Port(9001)),
some(ValidIpAddress.init("127.0.0.1")), some(Port(9001)),
some(Port(9001))).expect("Properly intialized private key")
enrRecB = SignedPeerRecord.init(1, privKeyB,
some(parseIpAddress("127.0.0.1")), some(Port(9001)),
some(ValidIpAddress.init("127.0.0.1")), some(Port(9001)),
some(Port(9001))).expect("Properly intialized private key")
nodeA = newNode(enrRecA).expect("Properly initialized record")

View File

@ -1,13 +0,0 @@
# Package
version = "0.4.0"
author = "Status Research & Development GmbH"
description = "Tests for Logos Storage DHT"
license = "MIT"
installFiles = @["build.nims"]
# Dependencies
requires "asynctest >= 0.5.2 & < 0.6.0"
requires "unittest2 <= 0.0.9"
include "build.nims"

View File

@ -8,13 +8,13 @@ var cmds: seq[string]
when defined(testsPart1) or defined(testsAll):
cmds.add [
"nim c -r --hints:off --verbosity:0 dht/test_providers.nim",
"nim c -r --hints:off --verbosity:0 dht/test_providermngr.nim",
"nim c -r --hints:off --verbosity:0 tests/dht/test_providers.nim",
"nim c -r --hints:off --verbosity:0 tests/dht/test_providermngr.nim",
]
when defined(testsPart2) or defined(testsAll):
cmds.add [
"nim c -r --hints:off --verbosity:0 discv5/test_discoveryv5.nim",
"nim c -r --hints:off --verbosity:0 discv5/test_discoveryv5_encoding.nim",
"nim c -r --hints:off --verbosity:0 tests/discv5/test_discoveryv5.nim",
"nim c -r --hints:off --verbosity:0 tests/discv5/test_discoveryv5_encoding.nim",
]
echo "Running Test Commands: ", cmds

3
vendor/atlas.workspace vendored Normal file
View File

@ -0,0 +1,3 @@
deps=""
resolver="MaxVer"
overrides="urls.rules"

8
vendor/urls.rules vendored Normal file
View File

@ -0,0 +1,8 @@
https://github.com/status-im/nim-libp2p-dht.git -> https://github.com/codex-storage/nim-codex-dht.git
https://github.com/markspanbroek/questionable -> https://github.com/codex-storage/questionable
https://github.com/status-im/questionable -> https://github.com/codex-storage/questionable
https://github.com/status-im/asynctest -> https://github.com/codex-storage/asynctest
https://github.com/status-im/nim-datastore -> https://github.com/codex-storage/nim-datastore
https://github.com/cheatfate/nimcrypto -> https://github.com/status-im/nimcrypto
protobufserialization -> protobuf_serialization
protobufserialization -> https://github.com/status-im/nim-protobuf-serialization