Merge branch 'master' into feat/service-incentivization-poc

This commit is contained in:
Sergei Tikhomirov 2025-04-01 14:01:42 +02:00
commit ebf11dc318
45 changed files with 8577 additions and 7299 deletions

View File

@ -21,7 +21,7 @@ jobs:
permissions:
pull-requests: read
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
name: Checkout code
id: checkout
- uses: dorny/paths-filter@v2
@ -61,7 +61,7 @@ jobs:
name: build-${{ matrix.os }}
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Get submodules hash
id: submodules
@ -78,6 +78,11 @@ jobs:
- name: Build binaries
run: make V=1 QUICK_AND_DIRTY_COMPILER=1 all tools
build-windows:
uses: ./.github/workflows/windows-build.yml
with:
branch: ${{ github.ref }}
test:
needs: changes
@ -92,7 +97,7 @@ jobs:
name: test-${{ matrix.os }}
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Get submodules hash
id: submodules
@ -158,7 +163,7 @@ jobs:
needs: build
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Get submodules hash
id: submodules

View File

@ -44,7 +44,7 @@ jobs:
- name: Checkout code
if: ${{ steps.secrets.outcome == 'success' }}
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Get submodules hash
id: submodules

View File

@ -53,7 +53,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- uses: actions/checkout@v3
- uses: actions/checkout@v4
name: Checkout code
id: checkout
- uses: dorny/paths-filter@v2

View File

@ -20,7 +20,7 @@ jobs:
runs-on: ubuntu-22.04
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: Vars
id: vars
@ -42,7 +42,7 @@ jobs:
runs-on: ${{ matrix.os }}
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4
- name: prep variables
id: vars
@ -117,7 +117,7 @@ jobs:
needs: [ tag-name, build-and-publish ]
steps:
- name: Checkout code
uses: actions/checkout@v3
uses: actions/checkout@v4
with:
fetch-depth: 0
ref: master

120
.github/workflows/windows-build.yml vendored Normal file
View File

@ -0,0 +1,120 @@
name: ci / build-windows
on:
workflow_call:
inputs:
branch:
required: true
type: string
jobs:
build:
runs-on: windows-latest
defaults:
run:
shell: msys2 {0}
env:
MSYSTEM: MINGW64
steps:
- name: Checkout code
uses: actions/checkout@v3
- name: Setup MSYS2
uses: msys2/setup-msys2@v2
with:
update: true
install: >-
git
base-devel
mingw-w64-x86_64-toolchain
make
cmake
upx
mingw-w64-x86_64-rust
mingw-w64-x86_64-postgresql
mingw-w64-x86_64-gcc
mingw-w64-x86_64-gcc-libs
mingw-w64-x86_64-libwinpthread-git
mingw-w64-x86_64-zlib
mingw-w64-x86_64-openssl
mingw-w64-x86_64-python
mingw-w64-x86_64-cmake
mingw-w64-x86_64-llvm
mingw-w64-x86_64-clang
- name: Add UPX to PATH
run: |
echo "/usr/bin:$PATH" >> $GITHUB_PATH
echo "/mingw64/bin:$PATH" >> $GITHUB_PATH
echo "/usr/lib:$PATH" >> $GITHUB_PATH
echo "/mingw64/lib:$PATH" >> $GITHUB_PATH
- name: Verify dependencies
run: |
which upx gcc g++ make cmake cargo rustc python
- name: Updating submodules
run: git submodule update --init --recursive
- name: Creating tmp directory
run: mkdir -p tmp
- name: Building Nim
run: |
cd vendor/nimbus-build-system/vendor/Nim
./build_all.bat
cd ../../../..
- name: Building libunwind
run: |
cd vendor/nim-libbacktrace
mkdir -p vendor/libunwind/build
pushd vendor/libunwind
cmake -S runtimes \
-DLLVM_ENABLE_RUNTIMES="libunwind" \
-DLIBUNWIND_ENABLE_SHARED=OFF -DLIBUNWIND_ENABLE_STATIC=ON \
-DLIBUNWIND_INCLUDE_DOCS=OFF -DLIBUNWIND_INSTALL_HEADERS=ON \
-DCMAKE_INSTALL_PREFIX="$(pwd)/../install/usr" \
-G "MinGW Makefiles" -B build
cd build
mingw32-make VERBOSE=1 clean
mingw32-make VERBOSE=1 unwind_static
mingw32-make VERBOSE=1 install-unwind
popd
mkdir -p install/usr/lib
cp -r vendor/libunwind/build/lib/libunwind.a install/usr/lib/
- name: Building miniupnpc
run: |
cd vendor/nim-nat-traversal/vendor/miniupnp/miniupnpc
git checkout little_chore_windows_support
make -f Makefile.mingw CC=gcc CXX=g++ libminiupnpc.a V=1
cd ../../../../..
- name: Building libnatpmp
run: |
cd ./vendor/nim-nat-traversal/vendor/libnatpmp-upstream
make CC="gcc -fPIC -D_WIN32_WINNT=0x0600 -DNATPMP_STATICLIB" libnatpmp.a V=1
cd ../../../../
- name: Building wakunode2
run: |
cd vendor/nim-libbacktrace
cp ./vendor/libunwind/build/lib/libunwind.a install/usr/lib
cd ../..
make wakunode2 LOG_LEVEL=DEBUG V=3 -j8
- name: Check Executable
run: |
if [ -f "./build/wakunode2.exe" ]; then
echo "wakunode2.exe build successful"
else
echo "Build failed: wakunode2.exe not found"
exit 1
fi

View File

@ -1,3 +1,90 @@
## v0.35.0 (2025-03-03)
### Notes
- Deprecated parameter
- max-relay-peers
- New parameters
- relay-service-ratio
String value with peers distribution within max-connections parameter.
This percentage ratio represents the relay peers to service peers.
For example, 60:40, tells that 60% of the max-connections will be used for relay protocol
and the other 40% of max-connections will be reserved for other service protocols (e.g.,
filter, lightpush, store, metadata, etc.)
- rendezvous
boolean attribute that optionally activates waku rendezvous discovery server.
True by default.
### Release highlights
- New filter approach to keep push stream opened within subscription period.
- Waku sync protocol.
- Libwaku async
- Lite-protocol-tester enhancements.
- New panels and metrics in RLN to control outstanding request quota.
### Features
- waku sync shard matching check ([#3259](https://github.com/waku-org/nwaku/issues/3259)) ([42fd6b827](https://github.com/waku-org/nwaku/commit/42fd6b827))
- waku store sync 2.0 config & setup ([#3217](https://github.com/waku-org/nwaku/issues/3217)) ([7f64dc03a](https://github.com/waku-org/nwaku/commit/7f64dc03a))
- waku store sync 2.0 protocols & tests ([#3216](https://github.com/waku-org/nwaku/issues/3216)) ([6ee494d90](https://github.com/waku-org/nwaku/commit/6ee494d90))
- waku store sync 2.0 storage & tests ([#3215](https://github.com/waku-org/nwaku/issues/3215)) ([54a7a6875](https://github.com/waku-org/nwaku/commit/54a7a6875))
- waku store sync 2.0 common types & codec ([#3213](https://github.com/waku-org/nwaku/issues/3213)) ([29fda2dab](https://github.com/waku-org/nwaku/commit/29fda2dab))
- add txhash-based eligibility checks for incentivization PoC ([#3166](https://github.com/waku-org/nwaku/issues/3166)) ([505ec84ce](https://github.com/waku-org/nwaku/commit/505ec84ce))
- connection change event ([#3225](https://github.com/waku-org/nwaku/issues/3225)) ([e81a5517b](https://github.com/waku-org/nwaku/commit/e81a5517b))
- libwaku add protected topic ([#3211](https://github.com/waku-org/nwaku/issues/3211)) ([d932dd10c](https://github.com/waku-org/nwaku/commit/d932dd10c))
- topic health tracking ([#3212](https://github.com/waku-org/nwaku/issues/3212)) ([6020a673b](https://github.com/waku-org/nwaku/commit/6020a673b))
- allowing configuration of application level callbacks ([#3206](https://github.com/waku-org/nwaku/issues/3206)) ([049fbeabb](https://github.com/waku-org/nwaku/commit/049fbeabb))
- waku rendezvous wrapper ([#2962](https://github.com/waku-org/nwaku/issues/2962)) ([650a9487e](https://github.com/waku-org/nwaku/commit/650a9487e))
- making dns discovery async ([#3175](https://github.com/waku-org/nwaku/issues/3175)) ([d7d00bfd7](https://github.com/waku-org/nwaku/commit/d7d00bfd7))
- remove Waku Sync 1.0 & Negentropy ([#3185](https://github.com/waku-org/nwaku/issues/3185)) ([2ab9c3d36](https://github.com/waku-org/nwaku/commit/2ab9c3d36))
- add waku_dial_peer and get_connected_peers to libwaku ([#3149](https://github.com/waku-org/nwaku/issues/3149)) ([507b1fc4d](https://github.com/waku-org/nwaku/commit/507b1fc4d))
- running periodicaly peer exchange if discv5 is disabled ([#3150](https://github.com/waku-org/nwaku/issues/3150)) ([400d7a54f](https://github.com/waku-org/nwaku/commit/400d7a54f))
### Bug Fixes
- avoid double db migration for sqlite ([#3244](https://github.com/waku-org/nwaku/issues/3244)) ([2ce245354](https://github.com/waku-org/nwaku/commit/2ce245354))
- libwaku waku_relay_unsubscribe ([#3207](https://github.com/waku-org/nwaku/issues/3207)) ([ab0c1d4aa](https://github.com/waku-org/nwaku/commit/ab0c1d4aa))
- libwaku support string and int64 for timestamps ([#3205](https://github.com/waku-org/nwaku/issues/3205)) ([2022f54f5](https://github.com/waku-org/nwaku/commit/2022f54f5))
- lite-protocol-tester receiver exit check ([#3187](https://github.com/waku-org/nwaku/issues/3187)) ([beb21c78f](https://github.com/waku-org/nwaku/commit/beb21c78f))
- linting error ([#3156](https://github.com/waku-org/nwaku/issues/3156)) ([99ac68447](https://github.com/waku-org/nwaku/commit/99ac68447))
### Changes
- more efficient metrics usage ([#3298](https://github.com/waku-org/nwaku/issues/3298)) ([6f004d5d4](https://github.com/waku-org/nwaku/commit/6f004d5d4))([c07e278d8](https://github.com/waku-org/nwaku/commit/c07e278d82c3aa771b9988e85bad7422890e4d74))
- filter refactor subscription management and react when the remote peer closes the stream. See the following commits in chronological order:
- issue: [#3281](https://github.com/waku-org/nwaku/issues/3281) commit: [5392b8ea4](https://github.com/waku-org/nwaku/commit/5392b8ea4)
- issue: [#3198](https://github.com/waku-org/nwaku/issues/3198) commit: [287e9b12c](https://github.com/waku-org/nwaku/commit/287e9b12c)
- issue: [#3267](https://github.com/waku-org/nwaku/issues/3267) commit: [46747fd49](https://github.com/waku-org/nwaku/commit/46747fd49)
- send msg hash as string on libwaku message event ([#3234](https://github.com/waku-org/nwaku/issues/3234)) ([9c209b4c3](https://github.com/waku-org/nwaku/commit/9c209b4c3))
- separate heaptrack from debug build ([#3249](https://github.com/waku-org/nwaku/issues/3249)) ([81f24cc25](https://github.com/waku-org/nwaku/commit/81f24cc25))
- capping mechanism for relay and service connections ([#3184](https://github.com/waku-org/nwaku/issues/3184)) ([2942782f9](https://github.com/waku-org/nwaku/commit/2942782f9))
- add extra migration to sqlite and improving error message ([#3240](https://github.com/waku-org/nwaku/issues/3240)) ([bfd60ceab](https://github.com/waku-org/nwaku/commit/bfd60ceab))
- optimize libwaku size ([#3242](https://github.com/waku-org/nwaku/issues/3242)) ([9c0ad8517](https://github.com/waku-org/nwaku/commit/9c0ad8517))
- golang example end using negentropy dependency plus simple readme.md ([#3235](https://github.com/waku-org/nwaku/issues/3235)) ([0e0fcfb1a](https://github.com/waku-org/nwaku/commit/0e0fcfb1a))
- enhance libwaku store protocol and more ([#3223](https://github.com/waku-org/nwaku/issues/3223)) ([22ce9ee87](https://github.com/waku-org/nwaku/commit/22ce9ee87))
- add two RLN metrics and panel ([#3181](https://github.com/waku-org/nwaku/issues/3181)) ([1b532e8ab](https://github.com/waku-org/nwaku/commit/1b532e8ab))
- libwaku async ([#3180](https://github.com/waku-org/nwaku/issues/3180)) ([47a623541](https://github.com/waku-org/nwaku/commit/47a623541))
- filter protocol in libwaku ([#3177](https://github.com/waku-org/nwaku/issues/3177)) ([f856298ca](https://github.com/waku-org/nwaku/commit/f856298ca))
- add supervisor for lite-protocol-tester infra ([#3176](https://github.com/waku-org/nwaku/issues/3176)) ([a7264d68c](https://github.com/waku-org/nwaku/commit/a7264d68c))
- libwaku better error handling and better waku thread destroy handling ([#3167](https://github.com/waku-org/nwaku/issues/3167)) ([294dd03c4](https://github.com/waku-org/nwaku/commit/294dd03c4))
- libwaku allow several multiaddresses for a single peer in store queries ([#3171](https://github.com/waku-org/nwaku/issues/3171)) ([3cb8ebdd8](https://github.com/waku-org/nwaku/commit/3cb8ebdd8))
- naming connectPeer procedure ([#3157](https://github.com/waku-org/nwaku/issues/3157)) ([b3656d6ee](https://github.com/waku-org/nwaku/commit/b3656d6ee))
This release supports the following [libp2p protocols](https://docs.libp2p.io/concepts/protocols/):
| Protocol | Spec status | Protocol id |
| ---: | :---: | :--- |
| [`11/WAKU2-RELAY`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/11/relay.md) | `stable` | `/vac/waku/relay/2.0.0` |
| [`12/WAKU2-FILTER`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/12/filter.md) | `draft` | `/vac/waku/filter/2.0.0-beta1` <br />`/vac/waku/filter-subscribe/2.0.0-beta1` <br />`/vac/waku/filter-push/2.0.0-beta1` |
| [`13/WAKU2-STORE`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/13/store.md) | `draft` | `/vac/waku/store/2.0.0-beta4` |
| [`19/WAKU2-LIGHTPUSH`](https://github.com/vacp2p/rfc-index/blob/main/waku/standards/core/19/lightpush.md) | `draft` | `/vac/waku/lightpush/2.0.0-beta1` |
| [`66/WAKU2-METADATA`](https://github.com/waku-org/specs/blob/master/standards/core/metadata.md) | `raw` | `/vac/waku/metadata/1.0.0` |
| [`WAKU-SYNC`](https://github.com/waku-org/specs/blob/master/standards/core/sync.md) | `draft` | `/vac/waku/reconciliation/1.0.0` & `/vac/waku/transfer/1.0.0` |
## v0.34.0 (2024-10-29)
### Notes:

View File

@ -27,7 +27,7 @@ RUN make -j$(nproc) ${NIM_COMMIT} $MAKE_TARGET LOG_LEVEL=${LOG_LEVEL} NIMFLAGS="
# PRODUCTION IMAGE -------------------------------------------------------------
FROM alpine:3.18 as prod
FROM alpine:3.18 AS prod
ARG MAKE_TARGET=wakunode2

View File

@ -202,13 +202,6 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
var enrBuilder = EnrBuilder.init(nodeKey)
let relayShards = RelayShards.init(conf.clusterId, conf.shards).valueOr:
error "Relay shards initialization failed", error = error
return 1
enrBuilder.withWakuRelaySharding(relayShards).isOkOr:
error "Building ENR with relay sharding failed", error = error
return 1
let recordRes = enrBuilder.build()
let record =
if recordRes.isErr():
@ -233,8 +226,6 @@ proc main(rng: ref HmacDrbgContext): Future[int] {.async.} =
)
let node = builder.build().tryGet()
node.mountMetadata(conf.clusterId).isOkOr:
error "failed to mount waku metadata protocol: ", err = error
if conf.ping:
try:

View File

@ -1,5 +1,5 @@
# Dockerfile to build a distributable container image from pre-existing binaries
FROM debian:stable-slim as prod
FROM debian:stable-slim AS prod
ARG MAKE_TARGET=wakunode2

View File

@ -0,0 +1,90 @@
---
title: Performance Benchmarks and Test Reports
---
## Introduction
This page summarises key performance metrics for nwaku and provides links to detailed test reports.
> ## TL;DR
>
> - Average Waku bandwidth usage: ~**10 KB/s** (minus discv5 Discovery) for 1KB message size and message injection rate of 1msg/s.
Confirmed for topologies of up to 2000 Relay nodes.
> - Average time for a message to propagate to 100% of nodes: **0.4s** for topologies of up to 2000 Relay nodes.
> - Average per-node bandwidth usage of the discv5 protocol: **8 KB/s** for incoming traffic and **7.4 KB/s** for outgoing traffic,
in a network with 100 continuously online nodes.
> - Future improvements: A messaging API is currently in development to streamline interactions with the Waku protocol suite.
Once completed, it will enable benchmarking at the messaging API level, allowing applications to more easily compare their
own performance results.
## Insights
### Relay Bandwidth Usage: nwaku v0.34.0
The average per-node `libp2p` bandwidth usage in a 1000-node Relay network with 1KB messages at varying injection rates.
| Message Injection Rate | Average libp2p incoming bandwidth (KB/s) | Average libp2p outgoing bandwidth (KB/s) |
|------------------------|------------------------------------------|------------------------------------------|
| 1 msg/s | ~10.1 | ~10.3 |
| 1 msg/10s | ~1.8 | ~1.9 |
### Message Propagation Latency: nwaku v0.34.0-rc1
The message propagation latency is measured as the total time for a message to reach all nodes.
We compare the latency in different network configurations for the following simulation parameters:
- Total messages published: 600
- Message size: 1KB
- Message injection rate: 1msg/s
The different network configurations tested are:
- Relay Config: 1000 nodes with relay enabled
- Mixed Config: 210 nodes, consisting of bootstrap nodes, filter clients and servers, lightpush clients and servers, store nodes
- Non-persistent Relay Config: 500 persistent relay nodes, 10 store nodes and 100 non-persistent relay nodes
Click on a specific config to see the detailed test report.
| Config | Average Message Propagation Latency (s) | Max Message Propagation Latency (s)|
|------------------------------------------------------------------------------------------------------------------------------|-----------------------------------------|------------------------------------|
| [Relay](https://www.notion.so/Waku-regression-testing-v0-34-1618f96fb65c803bb7bad6ecd6bafff9) (1000 nodes) | 0.05 | 1.6 |
| [Mixed](https://www.notion.so/Mixed-environment-analysis-1688f96fb65c809eb235c59b97d6e15b) (210 nodes) | 0.0125 | 0.007 |
| [Non-persistent Relay](https://www.notion.so/High-Churn-Relay-Store-Reliability-16c8f96fb65c8008bacaf5e86881160c) (510 nodes)| 0.0125 | 0.25 |
### Discv5 Bandwidth Usage: nwaku v0.34.0
The average bandwidth usage of discv5 for a network of 100 nodes and message injection rate of 0 or 1msg/s.
The measurements are based on a stable network where all nodes have already connected to peers to form a healthy mesh.
|Message size |Average discv5 incoming bandwidth (KB/s)|Average discv5 outgoing bandwidth (KB/s)|
|-------------------- |----------------------------------------|----------------------------------------|
| no message injection| 7.88 | 6.70 |
| 1KB | 8.04 | 7.40 |
| 10KB | 8.03 | 7.45 |
## Testing
### DST
The VAC DST team performs regression testing on all new **nwaku** releases, comparing performance with previous versions.
They simulate large Waku networks with a variety of network and protocol configurations that are representative of real-world usage.
**Test Reports**: [DST Reports](https://www.notion.so/DST-Reports-1228f96fb65c80729cd1d98a7496fe6f)
### QA
The VAC QA team performs interoperability tests for **nwaku** and **go-waku** using the latest main branch builds.
These tests run daily and verify protocol functionality by targeting specific features of each protocol.
**Test Reports**: [QA Reports](https://discord.com/channels/1110799176264056863/1196933819614363678)
### nwaku
The **nwaku** team follows a structured release procedure for all release candidates.
This involves deploying RCs to `status.staging` fleet for validation and performing sanity checks.
**Release Process**: [nwaku Release Procedure](https://github.com/waku-org/nwaku/blob/master/.github/ISSUE_TEMPLATE/prepare_release.md)
### Research
The Waku Research team conducts a variety of benchmarking, performance testing, proof-of-concept validations and debugging efforts.
They also maintain a Waku simulator designed for small-scale, single-purpose, on-demand testing.
**Test Reports**: [Waku Research Reports](https://www.notion.so/Miscellaneous-2c02516248db4a28ba8cb2797a40d1bb)
**Waku Simulator**: [Waku Simulator Book](https://waku-org.github.io/waku-simulator/)

File diff suppressed because it is too large Load Diff

View File

@ -30,7 +30,7 @@ import
const os* {.strdefine.} = ""
when os == "Linux" and
# GitHub only supports container actions on Linux
# and we need to start a postgress database in a docker container
# and we need to start a postgres database in a docker container
defined(postgres):
import
./waku_archive/test_driver_postgres_query,
@ -106,3 +106,4 @@ import
import ./waku_rln_relay/test_all
# Node Factory
import ./factory/test_config

View File

@ -0,0 +1,157 @@
{.used.}
import
std/options,
testutils/unittests,
chronos,
libp2p/crypto/[crypto, secp],
libp2p/multiaddress,
nimcrypto/utils,
secp256k1,
confutils
import
../../waku/factory/external_config,
../../waku/factory/internal_config,
../../waku/factory/networks_config,
../../waku/common/logging
suite "Waku config - apply preset":
test "Default preset is TWN":
## Setup
let expectedConf = ClusterConf.TheWakuNetworkConf()
## Given
let preConfig = WakuNodeConf(cmd: noCommand, preset: "twn")
## When
let res = applyPresetConfiguration(preConfig)
assert res.isOk(), $res.error
## Then
let conf = res.get()
assert conf.maxMessageSize == expectedConf.maxMessageSize
assert conf.clusterId == expectedConf.clusterId
assert conf.rlnRelay == expectedConf.rlnRelay
assert conf.rlnRelayEthContractAddress == expectedConf.rlnRelayEthContractAddress
assert conf.rlnRelayDynamic == expectedConf.rlnRelayDynamic
assert conf.rlnRelayChainId == expectedConf.rlnRelayChainId
assert conf.rlnRelayBandwidthThreshold == expectedConf.rlnRelayBandwidthThreshold
assert conf.rlnEpochSizeSec == expectedConf.rlnEpochSizeSec
assert conf.rlnRelayUserMessageLimit == expectedConf.rlnRelayUserMessageLimit
assert conf.numShardsInNetwork == expectedConf.numShardsInNetwork
assert conf.discv5BootstrapNodes == expectedConf.discv5BootstrapNodes
test "Subscribes to all valid shards in twn":
## Setup
let expectedConf = ClusterConf.TheWakuNetworkConf()
## Given
let shards: seq[uint16] = @[0, 1, 2, 3, 4, 5, 6, 7]
let preConfig = WakuNodeConf(cmd: noCommand, preset: "twn", shards: shards)
## When
let res = applyPresetConfiguration(preConfig)
assert res.isOk(), $res.error
## Then
let conf = res.get()
assert conf.shards.len == expectedConf.numShardsInNetwork.int
test "Subscribes to some valid shards in twn":
## Setup
let expectedConf = ClusterConf.TheWakuNetworkConf()
## Given
let shards: seq[uint16] = @[0, 4, 7]
let preConfig = WakuNodeConf(cmd: noCommand, preset: "twn", shards: shards)
## When
let resConf = applyPresetConfiguration(preConfig)
let res = validateShards(resConf.get())
assert res.isOk(), $res.error
## Then
let conf = resConf.get()
assert conf.shards.len() == shards.len()
for index, shard in shards:
assert shard in conf.shards
test "Subscribes to invalid shards in twn":
## Setup
## Given
let shards: seq[uint16] = @[0, 4, 7, 10]
let preConfig = WakuNodeConf(cmd: noCommand, preset: "twn", shards: shards)
let postConfig = applyPresetConfiguration(preConfig)
## When
let res = validateShards(postConfig.get())
## Then
assert res.isErr(), "Invalid shard was accepted"
suite "Waku config - node key":
test "Passed node key is used":
## Setup
let nodeKeyStr =
"0011223344556677889900aabbccddeeff0011223344556677889900aabbccddeeff"
let nodekey = block:
let key = SkPrivateKey.init(utils.fromHex(nodeKeyStr)).tryGet()
crypto.PrivateKey(scheme: Secp256k1, skkey: key)
## Given
let config = WakuNodeConf.load(version = "", cmdLine = @["--nodekey=" & nodeKeyStr])
## When
let res = getNodeKey(config)
assert res.isOk(), $res.error
## Then
let resKey = res.get()
assert utils.toHex(resKey.getRawBytes().get()) ==
utils.toHex(nodekey.getRawBytes().get())
suite "Waku config - Shards":
test "Shards are valid":
## Setup
## Given
let shards: seq[uint16] = @[0, 2, 4]
let numShardsInNetwork = 5.uint32
let config = WakuNodeConf(
cmd: noCommand, shards: shards, numShardsInNetwork: numShardsInNetwork
)
## When
let res = validateShards(config)
## Then
assert res.isOk(), $res.error
test "Shards are not in range":
## Setup
## Given
let shards: seq[uint16] = @[0, 2, 5]
let numShardsInNetwork = 5.uint32
let config = WakuNodeConf(
cmd: noCommand, shards: shards, numShardsInNetwork: numShardsInNetwork
)
## When
let res = validateShards(config)
## Then
assert res.isErr(), "Invalid shard was accepted"
test "Shard is passed without num shards":
## Setup
## Given
let config = WakuNodeConf.load(version = "", cmdLine = @["--shard=32"])
## When
let res = validateShards(config)
## Then
assert res.isOk(), $res.error

View File

@ -224,6 +224,7 @@ suite "Waku Lightpush message delivery":
## When
let res = await lightNode.lightpushPublish(some(DefaultPubsubTopic), message)
assert res.isOk(), $res.error
assert res.get() == 1, "Expected to relay the message to 1 node"
## Then
check await completionFutRelay.withTimeout(5.seconds)

View File

@ -1,188 +0,0 @@
{.used.}
import std/net, testutils/unittests, chronos, libp2p/crypto/crypto
import
../../waku/
[node/waku_node, node/peer_manager, waku_core, waku_store, waku_archive, waku_sync],
../waku_store/store_utils,
../waku_archive/archive_utils,
../testlib/[wakucore, wakunode, testasync]
suite "Store Sync - End to End":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode
asyncSetup:
let timeOrigin = now()
let messages =
@[
fakeWakuMessage(@[byte 00], ts = ts(-90, timeOrigin)),
fakeWakuMessage(@[byte 01], ts = ts(-80, timeOrigin)),
fakeWakuMessage(@[byte 02], ts = ts(-70, timeOrigin)),
fakeWakuMessage(@[byte 03], ts = ts(-60, timeOrigin)),
fakeWakuMessage(@[byte 04], ts = ts(-50, timeOrigin)),
fakeWakuMessage(@[byte 05], ts = ts(-40, timeOrigin)),
fakeWakuMessage(@[byte 06], ts = ts(-30, timeOrigin)),
fakeWakuMessage(@[byte 07], ts = ts(-20, timeOrigin)),
fakeWakuMessage(@[byte 08], ts = ts(-10, timeOrigin)),
fakeWakuMessage(@[byte 09], ts = ts(00, timeOrigin)),
]
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))
let serverArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
let clientArchiveDriver = newArchiveDriverWithMessages(DefaultPubsubTopic, messages)
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)
assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()
await server.mountStore()
await client.mountStore()
client.mountStoreClient()
server.mountStoreClient()
let mountServerSync = await server.mountWakuSync(
maxFrameSize = 0, syncInterval = 1.hours, relayJitter = 0.seconds
)
let mountClientSync = await client.mountWakuSync(
maxFrameSize = 0, syncInterval = 2.milliseconds, relayJitter = 0.seconds
)
assert mountServerSync.isOk(), mountServerSync.error
assert mountClientSync.isOk(), mountClientSync.error
# messages are retreived when mounting Waku sync
# but based on interval so this is needed for client only
for msg in messages:
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await allFutures(server.start(), client.start())
let serverRemotePeerInfo = server.peerInfo.toRemotePeerInfo()
let clientRemotePeerInfo = client.peerInfo.toRemotePeerInfo()
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuSyncCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuSyncCodec)
client.peerManager.addServicePeer(serverRemotePeerInfo, WakuStoreCodec)
server.peerManager.addServicePeer(clientRemotePeerInfo, WakuStoreCodec)
asyncTeardown:
# prevent premature channel shutdown
await sleepAsync(10.milliseconds)
await allFutures(client.stop(), server.stop())
asyncTest "no message set differences":
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
asyncTest "client message set differences":
let msg = fakeWakuMessage(@[byte 10])
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
asyncTest "server message set differences":
let msg = fakeWakuMessage(@[byte 10])
server.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
check:
client.wakuSync.storageSize() != server.wakuSync.storageSize()
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == server.wakuSync.storageSize()
suite "Waku Sync - Pruning":
var server {.threadvar.}: WakuNode
var client {.threadvar.}: WakuNode
asyncSetup:
let
serverKey = generateSecp256k1Key()
clientKey = generateSecp256k1Key()
server = newTestWakuNode(serverKey, IPv4_any(), Port(0))
client = newTestWakuNode(clientKey, IPv4_any(), Port(0))
let serverArchiveDriver = newSqliteArchiveDriver()
let clientArchiveDriver = newSqliteArchiveDriver()
let mountServerArchiveRes = server.mountArchive(serverArchiveDriver)
let mountClientArchiveRes = client.mountArchive(clientArchiveDriver)
assert mountServerArchiveRes.isOk()
assert mountClientArchiveRes.isOk()
await server.mountStore()
await client.mountStore()
client.mountStoreClient()
server.mountStoreClient()
let mountServerSync = await server.mountWakuSync(
maxFrameSize = 0,
relayJitter = 0.seconds,
syncRange = 1.hours,
syncInterval = 5.minutes,
)
let mountClientSync = await client.mountWakuSync(
maxFrameSize = 0,
syncRange = 10.milliseconds,
syncInterval = 10.milliseconds,
relayJitter = 0.seconds,
)
assert mountServerSync.isOk(), mountServerSync.error
assert mountClientSync.isOk(), mountClientSync.error
await allFutures(server.start(), client.start())
asyncTeardown:
await sleepAsync(10.milliseconds)
await allFutures(client.stop(), server.stop())
asyncTest "pruning":
for _ in 0 ..< 4:
for _ in 0 ..< 10:
let msg = fakeWakuMessage()
client.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await client.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
server.wakuSync.messageIngress(DefaultPubsubTopic, msg)
await server.wakuArchive.handleMessage(DefaultPubsubTopic, msg)
await sleepAsync(10.milliseconds)
check:
client.wakuSync.storageSize() == 10
server.wakuSync.storageSize() == 40

View File

@ -10,7 +10,7 @@ import
waku_core,
waku_core/message/digest,
waku_archive/driver/sqlite_driver,
waku_archive
waku_archive,
],
../waku_archive/archive_utils,
../testlib/wakucore
@ -113,15 +113,18 @@ suite "Waku Archive - message handling":
test "convert query to label":
check:
convertQueryToMetricLabel("SELECT version();") == "select_version"
convertQueryToMetricLabel("SELECT messageHash FROM messages WHERE pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?") == "msg_hash_no_ctopic"
convertQueryToMetricLabel(""" SELECT child.relname AS partition_name
convertQueryToMetricLabel(
"SELECT messageHash FROM messages WHERE pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?"
) == "msg_hash_no_ctopic"
convertQueryToMetricLabel(
""" SELECT child.relname AS partition_name
FROM pg_inherits
JOIN pg_class parent ON pg_inherits.inhparent = parent.oid
JOIN pg_class child ON pg_inherits.inhrelid = child.oid
JOIN pg_namespace nmsp_parent ON nmsp_parent.oid = parent.relnamespace
JOIN pg_namespace nmsp_child ON nmsp_child.oid = child.relnamespace
WHERE parent.relname='messages""") == "get_partitions_list"
WHERE parent.relname='messages"""
) == "get_partitions_list"
procSuite "Waku Archive - find messages":
## Fixtures

View File

@ -33,7 +33,7 @@ proc testWakuNode(): WakuNode =
newTestWakuNode(privkey, bindIp, port, some(extIp), some(port))
suite "Waku v2 REST API - Debug":
asyncTest "Get node info - GET /debug/v1/info":
asyncTest "Get node info - GET /info":
# Given
let node = testWakuNode()
await node.start()
@ -62,7 +62,7 @@ suite "Waku v2 REST API - Debug":
await restServer.closeWait()
await node.stop()
asyncTest "Get node version - GET /debug/v1/version":
asyncTest "Get node version - GET /version":
# Given
let node = testWakuNode()
await node.start()

View File

@ -163,14 +163,28 @@ task libwakuStatic, "Build the cbindings waku node library":
let name = "libwaku"
buildLibrary name,
"library/",
"""-d:chronicles_line_numbers -d:chronicles_runtime_filtering=on -d:chronicles_sinks="textlines,json" -d:chronicles_default_output_device=Dynamic -d:chronicles_disabled_topics="eth,dnsdisc.client" --warning:Deprecated:off --warning:UnusedImport:on """,
"""-d:chronicles_line_numbers \
-d:chronicles_runtime_filtering=on \
-d:chronicles_sinks="textlines,json" \
-d:chronicles_default_output_device=Dynamic \
-d:chronicles_disabled_topics="eth,dnsdisc.client" \
--warning:Deprecated:off \
--warning:UnusedImport:on \
-d:chronicles_log_level=TRACE """,
"static"
task libwakuDynamic, "Build the cbindings waku node library":
let name = "libwaku"
buildLibrary name,
"library/",
"""-d:chronicles_line_numbers -d:chronicles_runtime_filtering=on -d:chronicles_sinks="textlines,json" -d:chronicles_default_output_device=Dynamic -d:chronicles_disabled_topics="eth,dnsdisc.client" --warning:Deprecated:off --warning:UnusedImport:on """,
"""-d:chronicles_line_numbers \
-d:chronicles_runtime_filtering=on \
-d:chronicles_sinks="textlines,json" \
-d:chronicles_default_output_device=Dynamic \
-d:chronicles_disabled_topics="eth,dnsdisc.client" \
--warning:Deprecated:off \
--warning:UnusedImport:on \
-d:chronicles_log_level=TRACE """,
"dynamic"
### Mobile Android

View File

@ -7,25 +7,27 @@ declarePublicCounter query_count,
"number of times a query is being performed", labels = ["query"]
## Maps parts of the possible known queries with a fixed and shorter query label.
const QueriesToMetricMap* = toTable({
"contentTopic IN": "content_topic",
"SELECT version()": "select_version",
"WITH min_timestamp": "messages_lookup",
"SELECT messageHash FROM messages WHERE pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"msg_hash_no_ctopic",
"AS partition_name": "get_partitions_list",
"SELECT COUNT(1) FROM messages": "count_msgs",
"SELECT messageHash FROM messages WHERE (timestamp, messageHash) < (?,?) AND pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"msg_hash_with_cursor",
"SELECT pg_database_size(current_database())": "get_database_size",
"DELETE FROM messages_lookup WHERE timestamp": "delete_from_msgs_lookup",
"DROP TABLE messages_": "drop_partition_table",
"ALTER TABLE messages DETACH PARTITION": "detach_partition",
"SELECT pg_size_pretty(pg_total_relation_size(C.oid))": "get_partition_size",
"pg_try_advisory_lock": "try_advisory_lock",
"SELECT messageHash FROM messages ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"get_all_msg_hash",
"SELECT pg_advisory_unlock": "advisory_unlock",
"ANALYZE messages": "analyze_messages",
"SELECT EXISTS": "check_version_table_exists",
})
const QueriesToMetricMap* = toTable(
{
"contentTopic IN": "content_topic",
"SELECT version()": "select_version",
"WITH min_timestamp": "messages_lookup",
"SELECT messageHash FROM messages WHERE pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"msg_hash_no_ctopic",
"AS partition_name": "get_partitions_list",
"SELECT COUNT(1) FROM messages": "count_msgs",
"SELECT messageHash FROM messages WHERE (timestamp, messageHash) < (?,?) AND pubsubTopic = ? AND timestamp >= ? AND timestamp <= ? ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"msg_hash_with_cursor",
"SELECT pg_database_size(current_database())": "get_database_size",
"DELETE FROM messages_lookup WHERE timestamp": "delete_from_msgs_lookup",
"DROP TABLE messages_": "drop_partition_table",
"ALTER TABLE messages DETACH PARTITION": "detach_partition",
"SELECT pg_size_pretty(pg_total_relation_size(C.oid))": "get_partition_size",
"pg_try_advisory_lock": "try_advisory_lock",
"SELECT messageHash FROM messages ORDER BY timestamp DESC, messageHash DESC LIMIT ?":
"get_all_msg_hash",
"SELECT pg_advisory_unlock": "advisory_unlock",
"ANALYZE messages": "analyze_messages",
"SELECT EXISTS": "check_version_table_exists",
}
)

View File

@ -22,6 +22,8 @@ import
libp2p/stream/connection,
libp2p/utility
import std/times except TimeInterval, Duration, seconds, minutes
import ./[single_token_limiter, service_metrics, timed_map]
export token_bucket, setting, service_metrics
@ -76,8 +78,15 @@ template checkUsageLimit*(
bodyWithinLimit, bodyRejected: untyped,
) =
if t.checkUsage(proto, conn):
let requestStartTime = getTime().toUnixFloat()
waku_service_requests.inc(labelValues = [proto, "served"])
bodyWithinLimit
let requestDurationSec = getTime().toUnixFloat() - requestStartTime
waku_service_request_handling_duration_seconds.observe(
requestDurationSec, labelValues = [proto]
)
else:
waku_service_requests.inc(labelValues = [proto, "rejected"])
bodyRejected

View File

@ -17,3 +17,6 @@ proc setServiceLimitMetric*(service: string, limit: Option[RateLimitSetting]) =
waku_service_requests_limit.set(
limit.get().calculateLimitPerSecond(), labelValues = [service]
)
declarePublicHistogram waku_service_request_handling_duration_seconds,
"duration of non-relay service handling", ["service"]

View File

@ -4,6 +4,8 @@
import std/[options], chronos/timer, libp2p/stream/connection, libp2p/utility
import std/times except TimeInterval, Duration
import ./[token_bucket, setting, service_metrics]
export token_bucket, setting, service_metrics
@ -43,8 +45,15 @@ template checkUsageLimit*(
bodyWithinLimit, bodyRejected: untyped,
) =
if t.checkUsage(proto):
let requestStartTime = getTime().toUnixFloat()
waku_service_requests.inc(labelValues = [proto, "served"])
bodyWithinLimit
let requestDurationSec = getTime().toUnixFloat() - requestStartTime
waku_service_request_handling_duration_seconds.observe(
requestDurationSec, labelValues = [proto]
)
else:
waku_service_requests.inc(labelValues = [proto, "rejected"])
bodyRejected

View File

@ -76,7 +76,7 @@ type WakuNodeConf* = object
.}: EthRpcUrl
rlnRelayEthContractAddress* {.
desc: "Address of membership contract on an Ethereum testnet",
desc: "Address of membership contract on an Ethereum testnet.",
defaultValue: "",
name: "rln-relay-eth-contract-address"
.}: string
@ -100,6 +100,7 @@ type WakuNodeConf* = object
name: "rln-relay-eth-private-key"
.}: string
# TODO: Remove "Default is" when it's already visible on the CLI
rlnRelayUserMessageLimit* {.
desc:
"Set a user message limit for the rln membership registration. Must be a positive integer. Default is 1.",
@ -145,6 +146,13 @@ type WakuNodeConf* = object
.}: seq[ProtectedShard]
## General node config
preset* {.
desc:
"Network preset to use. 'twn' is The RLN-protected Waku Network (cluster 1).",
defaultValue: "",
name: "preset"
.}: string
clusterId* {.
desc:
"Cluster id that the node is running in. Node in a different cluster id is disconnected.",
@ -276,7 +284,7 @@ hence would have reachability issues.""",
.}: bool
rlnRelay* {.
desc: "Enable spam protection through rln-relay: true|false",
desc: "Enable spam protection through rln-relay: true|false.",
defaultValue: false,
name: "rln-relay"
.}: bool
@ -287,7 +295,7 @@ hence would have reachability issues.""",
.}: Option[uint]
rlnRelayDynamic* {.
desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false",
desc: "Enable waku-rln-relay with on-chain dynamic group management: true|false.",
defaultValue: false,
name: "rln-relay-dynamic"
.}: bool
@ -311,7 +319,8 @@ hence would have reachability issues.""",
.}: string
rlnRelayBandwidthThreshold* {.
desc: "Message rate in bytes/sec after which verification of proofs should happen",
desc:
"Message rate in bytes/sec after which verification of proofs should happen.",
defaultValue: 0, # to maintain backwards compatibility
name: "rln-relay-bandwidth-threshold"
.}: int
@ -327,6 +336,7 @@ hence would have reachability issues.""",
name: "keep-alive"
.}: bool
# TODO: This is trying to do too much, this should only be used for autosharding, which itself should be configurable
# If numShardsInNetwork is not set, we use the number of shards configured as numShardsInNetwork
numShardsInNetwork* {.
desc: "Number of shards in the network",
@ -597,7 +607,7 @@ with the drawback of consuming some more bandwidth.""",
## Discovery v5 config
discv5Discovery* {.
desc: "Enable discovering nodes via Node Discovery v5",
desc: "Enable discovering nodes via Node Discovery v5.",
defaultValue: false,
name: "discv5-discovery"
.}: bool

View File

@ -4,7 +4,7 @@ import
libp2p/crypto/crypto,
libp2p/multiaddress,
libp2p/nameresolving/dnsresolver,
std/[options, sequtils, net],
std/[options, sequtils, strutils, net],
results
import
./external_config,
@ -12,7 +12,8 @@ import
../node/config,
../waku_enr/capabilities,
../waku_enr,
../waku_core
../waku_core,
./networks_config
proc enrConfiguration*(
conf: WakuNodeConf, netConfig: NetConfig, key: crypto.PrivateKey
@ -157,3 +158,71 @@ proc networkConfiguration*(conf: WakuNodeConf, clientId: string): NetConfigResul
)
return netConfigRes
proc applyPresetConfiguration*(srcConf: WakuNodeConf): Result[WakuNodeConf, string] =
var resConf = srcConf
if resConf.clusterId == 1:
warn(
"TWN - The Waku Network configuration will not be applied when `--cluster-id=1` is passed in future releases. Use `--preset=twn` instead."
)
resConf.preset = "twn"
case toLowerAscii(resConf.preset)
of "twn":
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
# Override configuration
resConf.maxMessageSize = twnClusterConf.maxMessageSize
resConf.clusterId = twnClusterConf.clusterId
resConf.rlnRelay = twnClusterConf.rlnRelay
resConf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
resConf.rlnRelayChainId = twnClusterConf.rlnRelayChainId
resConf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
resConf.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold
resConf.discv5Discovery = twnClusterConf.discv5Discovery
resConf.discv5BootstrapNodes =
resConf.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes
resConf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
resConf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
resConf.numShardsInNetwork = twnClusterConf.numShardsInNetwork
if resConf.relay:
resConf.rlnRelay = twnClusterConf.rlnRelay
else:
discard
return ok(resConf)
# TODO: numShardsInNetwork should be mandatory with autosharding, and unneeded otherwise
proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 =
if conf.numShardsInNetwork != 0:
return conf.numShardsInNetwork
# If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec
# https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding
return uint32(MaxShardIndex + 1)
proc validateShards*(conf: WakuNodeConf): Result[void, string] =
let numShardsInNetwork = getNumShardsInNetwork(conf)
for shard in conf.shards:
if shard >= numShardsInNetwork:
let msg =
"validateShards invalid shard: " & $shard & " when numShardsInNetwork: " &
$numShardsInNetwork # fmt doesn't work
error "validateShards failed", error = msg
return err(msg)
return ok()
proc getNodeKey*(
conf: WakuNodeConf, rng: ref HmacDrbgContext = crypto.newRng()
): Result[PrivateKey, string] =
if conf.nodekey.isSome():
return ok(conf.nodekey.get())
warn "missing node key, generating new set"
let key = crypto.PrivateKey.random(Secp256k1, rng[]).valueOr:
error "Failed to generate key", error = error
return err("Failed to generate key: " & $error)
return ok(key)

View File

@ -30,6 +30,7 @@ proc TheWakuNetworkConf*(T: type ClusterConf): ClusterConf =
rlnRelayUserMessageLimit: 100,
numShardsInNetwork: 8,
discv5Discovery: true,
# TODO: Why is this part of the conf? eg an edge node would not have this
discv5BootstrapNodes:
@[
"enr:-QESuED0qW1BCmF-oH_ARGPr97Nv767bl_43uoy70vrbah3EaCAdK3Q0iRQ6wkSTTpdrg_dU_NC2ydO8leSlRpBX4pxiAYJpZIJ2NIJpcIRA4VDAim11bHRpYWRkcnO4XAArNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQZ2XwAtNiZub2RlLTAxLmRvLWFtczMud2FrdS5zYW5kYm94LnN0YXR1cy5pbQYfQN4DgnJzkwABCAAAAAEAAgADAAQABQAGAAeJc2VjcDI1NmsxoQOTd-h5owwj-cx7xrmbvQKU8CV3Fomfdvcv1MBc-67T5oN0Y3CCdl-DdWRwgiMohXdha3UyDw",

View File

@ -137,13 +137,6 @@ proc initNode(
## Mount protocols
proc getNumShardsInNetwork*(conf: WakuNodeConf): uint32 =
if conf.numShardsInNetwork != 0:
return conf.numShardsInNetwork
# If conf.numShardsInNetwork is not set, use 1024 - the maximum possible as per the static sharding spec
# https://github.com/waku-org/specs/blob/master/standards/core/relay-sharding.md#static-sharding
return uint32(MaxShardIndex + 1)
proc getAutoshards*(
node: WakuNode, contentTopics: seq[string]
): Result[seq[RelayShard], string] =
@ -265,6 +258,7 @@ proc setupProtocols(
if conf.numShardsInNetwork == 0:
warn "Number of shards in network not configured, setting it to",
# TODO: If not configured, it mounts 1024 shards! Make it a mandatory configuration instead
numShardsInNetwork = $numShardsInNetwork
node.mountSharding(conf.clusterId, numShardsInNetwork).isOkOr:

View File

@ -99,19 +99,6 @@ proc logConfig(conf: WakuNodeConf) =
func version*(waku: Waku): string =
waku.version
proc validateShards(conf: WakuNodeConf): Result[void, string] =
let numShardsInNetwork = getNumShardsInNetwork(conf)
for shard in conf.shards:
if shard >= numShardsInNetwork:
let msg =
"validateShards invalid shard: " & $shard & " when numShardsInNetwork: " &
$numShardsInNetwork # fmt doesn't work
error "validateShards failed", error = msg
return err(msg)
return ok()
proc setupSwitchServices(
waku: Waku, conf: WakuNodeConf, circuitRelay: Relay, rng: ref HmacDrbgContext
) =
@ -214,46 +201,28 @@ proc new*(
shards.add(shard)
confCopy.shards = shards
case confCopy.clusterId
# Why can't I replace this block with a concise `.valueOr`?
confCopy = block:
let res = applyPresetConfiguration(confCopy)
if res.isErr():
error "Failed to complete the config", error = res.error
return err("Failed to complete the config:" & $res.error)
res.get()
# cluster-id=1 (aka The Waku Network)
of 1:
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
# Override configuration
confCopy.maxMessageSize = twnClusterConf.maxMessageSize
confCopy.clusterId = twnClusterConf.clusterId
confCopy.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
confCopy.rlnRelayChainId = twnClusterConf.rlnRelayChainId
confCopy.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
confCopy.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold
confCopy.discv5Discovery = twnClusterConf.discv5Discovery
confCopy.discv5BootstrapNodes =
confCopy.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes
confCopy.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
confCopy.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
confCopy.numShardsInNetwork = twnClusterConf.numShardsInNetwork
# Only set rlnRelay to true if relay is configured
if confCopy.relay:
confCopy.rlnRelay = twnClusterConf.rlnRelay
else:
discard
logConfig(confCopy)
info "Running nwaku node", version = git_version
logConfig(confCopy)
let validateShardsRes = validateShards(confCopy)
if validateShardsRes.isErr():
error "Failed validating shards", error = $validateShardsRes.error
return err("Failed validating shards: " & $validateShardsRes.error)
if not confCopy.nodekey.isSome():
let keyRes = crypto.PrivateKey.random(Secp256k1, rng[])
if keyRes.isErr():
error "Failed to generate key", error = $keyRes.error
return err("Failed to generate key: " & $keyRes.error)
confCopy.nodekey = some(keyRes.get())
let keyRes = getNodeKey(confCopy, rng)
if keyRes.isErr():
error "Failed to generate key", error = $keyRes.error
return err("Failed to generate key: " & $keyRes.error)
confCopy.nodeKey = some(keyRes.get())
var relay = newCircuitRelay(confCopy.isRelayClient)
@ -284,6 +253,7 @@ proc new*(
var waku = Waku(
version: git_version,
# TODO: WakuNodeConf is re-used for too many context, `conf` here should be a dedicated subtype
conf: confCopy,
rng: rng,
key: confCopy.nodekey.get(),

View File

@ -74,6 +74,7 @@ proc getPeer*(wps: WakuPeerStore, peerId: PeerId): RemotePeerInfo =
)
proc addPeer*(wps: WakuPeerStore, peer: RemotePeerInfo) =
## Only used in tests
wps[AddressBook][peer.peerId] = peer.addrs
wps[ProtoBook][peer.peerId] = peer.protocols
wps[AgentBook][peer.peerId] = peer.agent

View File

@ -214,9 +214,9 @@ proc mountStoreSync*(
storeSyncInterval = 300,
storeSyncRelayJitter = 20,
): Future[Result[void, string]] {.async.} =
let idsChannel = newAsyncQueue[SyncID](100)
let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100)
let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](100)
let idsChannel = newAsyncQueue[SyncID](0)
let wantsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0)
let needsChannel = newAsyncQueue[(PeerId, WakuMessageHash)](0)
var cluster: uint16
var shards: seq[uint16]
@ -237,12 +237,24 @@ proc mountStoreSync*(
node.wakuStoreReconciliation = recon
let reconMountRes = catch:
node.switch.mount(
node.wakuStoreReconciliation, protocolMatcher(WakuReconciliationCodec)
)
if reconMountRes.isErr():
return err(reconMountRes.error.msg)
let transfer = SyncTransfer.new(
node.peerManager, node.wakuArchive, idsChannel, wantsChannel, needsChannel
)
node.wakuStoreTransfer = transfer
let transMountRes = catch:
node.switch.mount(node.wakuStoreTransfer, protocolMatcher(WakuTransferCodec))
if transMountRes.isErr():
return err(transMountRes.error.msg)
return ok()
## Waku relay
@ -1134,7 +1146,7 @@ proc lightpushPublishHandler(
): Future[lightpush_protocol.WakuLightPushResult] {.async.} =
let msgHash = pubsubTopic.computeMessageHash(message).to0xHex()
if not node.wakuLightpushClient.isNil():
notice "publishing message with legacy lightpush",
notice "publishing message with lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,
@ -1142,7 +1154,7 @@ proc lightpushPublishHandler(
return await node.wakuLightpushClient.publish(some(pubsubTopic), message, peer)
if not node.wakuLightPush.isNil():
notice "publishing message with self hosted legacy lightpush",
notice "publishing message with self hosted lightpush",
pubsubTopic = pubsubTopic,
contentTopic = message.contentTopic,
target_peer_id = peer.peerId,

View File

@ -148,7 +148,7 @@ proc startRestServerProtocolSupport*(
"/relay endpoints are not available. Please check your configuration: --relay"
## Filter REST API
if conf.filternode != "" and node.wakuFilterClient != nil:
if node.wakuFilterClient != nil:
let filterCache = MessageCache.init()
let filterDiscoHandler =
@ -161,8 +161,7 @@ proc startRestServerProtocolSupport*(
router, node, filterCache, filterDiscoHandler
)
else:
restServerNotInstalledTab["filter"] =
"/filter endpoints are not available. Please check your configuration: --filternode"
restServerNotInstalledTab["filter"] = "/filter endpoints are not available."
## Store REST API
let storeDiscoHandler =
@ -175,9 +174,10 @@ proc startRestServerProtocolSupport*(
rest_store_legacy_api.installStoreApiHandlers(router, node, storeDiscoHandler)
## Light push API
## Install it either if lightpushnode (lightpush service node) is configured and client is mounted)
## Install it either if client is mounted)
## or install it to be used with self-hosted lightpush service
if (conf.lightpushnode != "" and node.wakuLegacyLightpushClient != nil) or
## We either get lightpushnode (lightpush service node) from config or discovered or self served
if (node.wakuLegacyLightpushClient != nil) or
(conf.lightpush and node.wakuLegacyLightPush != nil and node.wakuRelay != nil):
let lightDiscoHandler =
if not wakuDiscv5.isNil():
@ -190,8 +190,7 @@ proc startRestServerProtocolSupport*(
)
rest_lightpush_api.installLightPushRequestHandler(router, node, lightDiscoHandler)
else:
restServerNotInstalledTab["lightpush"] =
"/lightpush endpoints are not available. Please check your configuration: --lightpushnode"
restServerNotInstalledTab["lightpush"] = "/lightpush endpoints are not available."
info "REST services are installed"
return ok()

View File

@ -11,10 +11,10 @@ logScope:
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc debugInfoV1*(): RestResponse[DebugWakuInfo] {.
rest, endpoint: "/debug/v1/info", meth: HttpMethod.MethodGet
rest, endpoint: "/info", meth: HttpMethod.MethodGet
.}
# TODO: Check how we can use a constant to set the method endpoint (improve "rest" pragma under nim-presto)
proc debugVersionV1*(): RestResponse[string] {.
rest, endpoint: "/debug/v1/version", meth: HttpMethod.MethodGet
rest, endpoint: "/version", meth: HttpMethod.MethodGet
.}

View File

@ -8,10 +8,12 @@ export types
logScope:
topics = "waku node rest debug_api"
const ROUTE_DEBUG_INFOV1* = "/debug/v1/info"
const ROUTE_INFOV1* = "/info"
# /debug route is deprecated, will be removed
const ROUTE_DEBUG_INFOV1 = "/debug/v1/info"
proc installDebugInfoV1Handler(router: var RestRouter, node: WakuNode) =
router.api(MethodGet, ROUTE_DEBUG_INFOV1) do() -> RestApiResponse:
let getInfo = proc(): RestApiResponse =
let info = node.info().toDebugWakuInfo()
let resp = RestApiResponse.jsonResponse(info, status = Http200)
if resp.isErr():
@ -20,11 +22,22 @@ proc installDebugInfoV1Handler(router: var RestRouter, node: WakuNode) =
return resp.get()
const ROUTE_DEBUG_VERSIONV1* = "/debug/v1/version"
# /debug route is deprecated, will be removed
router.api(MethodGet, ROUTE_DEBUG_INFOV1) do() -> RestApiResponse:
return getInfo()
router.api(MethodGet, ROUTE_INFOV1) do() -> RestApiResponse:
return getInfo()
const ROUTE_VERSIONV1* = "/version"
# /debug route is deprecated, will be removed
const ROUTE_DEBUG_VERSIONV1 = "/debug/v1/version"
proc installDebugVersionV1Handler(router: var RestRouter, node: WakuNode) =
# /debug route is deprecated, will be removed
router.api(MethodGet, ROUTE_DEBUG_VERSIONV1) do() -> RestApiResponse:
return RestApiResponse.textResponse(git_version, status = Http200)
router.api(MethodGet, ROUTE_VERSIONV1) do() -> RestApiResponse:
return RestApiResponse.textResponse(git_version, status = Http200)
proc installDebugApiHandlers*(router: var RestRouter, node: WakuNode) =
installDebugInfoV1Handler(router, node)

View File

@ -33,7 +33,7 @@ const NoPeerNoneFoundError =
RestApiResponse.serviceUnavailable("No suitable service peer & none discovered")
proc useSelfHostedLightPush(node: WakuNode): bool =
return node.wakuLegacyLightPush != nil and node.wakuLegacyLightPushClient == nil
return node.wakuLightPush != nil and node.wakuLightPushClient == nil
proc convertErrorKindToHttpStatus(statusCode: LightpushStatusCode): HttpCode =
## Lightpush status codes are matching HTTP status codes by design

View File

@ -132,6 +132,9 @@ proc syncMessageIngress*(
pubsubTopic: PubsubTopic,
msg: WakuMessage,
): Future[Result[void, string]] {.async.} =
if msg.ephemeral:
return err("ephemeral message, will not store")
let msgHashHex = msgHash.to0xHex()
trace "handling message in syncMessageIngress",

View File

@ -5,7 +5,6 @@ const
WakuFilterPushCodec* = "/vac/waku/filter-push/2.0.0-beta1"
WakuLightPushCodec* = "/vac/waku/lightpush/3.0.0"
WakuLegacyLightPushCodec* = "/vac/waku/lightpush/2.0.0-beta1"
WakuSyncCodec* = "/vac/waku/sync/1.0.0"
WakuReconciliationCodec* = "/vac/waku/reconciliation/1.0.0"
WakuTransferCodec* = "/vac/waku/transfer/1.0.0"
WakuMetadataCodec* = "/vac/waku/metadata/1.0.0"

View File

@ -26,7 +26,7 @@ const capabilityToCodec = {
Capabilities.Store: WakuStoreCodec,
Capabilities.Filter: WakuFilterSubscribeCodec,
Capabilities.Lightpush: WakuLightPushCodec,
Capabilities.Sync: WakuSyncCodec,
Capabilities.Sync: WakuReconciliationCodec,
}.toTable
func init*(

View File

@ -98,6 +98,11 @@ proc handleRequest*(
none[string]()
else:
handleRes.error.desc,
relayPeerCount:
if isSuccess:
some(handleRes.get())
else:
none[uint32](),
)
if not isSuccess:

View File

@ -229,7 +229,7 @@ proc populateEnrCache(wpx: WakuPeerExchange) =
# swap cache for new
wpx.enrCache = newEnrCache
debug "ENR cache populated"
trace "ENR cache populated"
proc updatePxEnrCache(wpx: WakuPeerExchange) {.async.} =
# try more aggressively to fill the cache at startup

View File

@ -7,6 +7,7 @@ import ../common/enr, ../waku_enr/capabilities, ../waku_enr/sharding
const DiscoverLimit* = 1000
const DefaultRegistrationTTL* = 60.seconds
const DefaultRegistrationInterval* = 10.seconds
const MaxRegistrationInterval* = 5.minutes
const PeersRequestedCount* = 12
proc computeNamespace*(clusterId: uint16, shard: uint16): string =

View File

@ -32,6 +32,7 @@ type WakuRendezVous* = ref object
relayShard: RelayShards
capabilities: seq[Capabilities]
registrationInterval: timer.Duration
periodicRegistrationFut: Future[void]
proc batchAdvertise*(
@ -42,7 +43,7 @@ proc batchAdvertise*(
): Future[Result[void, string]] {.async: (raises: []).} =
## Register with all rendezvous peers under a namespace
# rendezvous.advertise except already opened connections
# rendezvous.advertise expects already opened connections
# must dial first
var futs = collect(newSeq):
for peerId in peers:
@ -62,7 +63,7 @@ proc batchAdvertise*(
fut.read()
if catchable.isErr():
error "rendezvous dial failed", error = catchable.error.msg
warn "a rendezvous dial failed", cause = catchable.error.msg
continue
let connOpt = catchable.get()
@ -91,7 +92,7 @@ proc batchRequest*(
): Future[Result[seq[PeerRecord], string]] {.async: (raises: []).} =
## Request all records from all rendezvous peers matching a namespace
# rendezvous.request except already opened connections
# rendezvous.request expects already opened connections
# must dial first
var futs = collect(newSeq):
for peerId in peers:
@ -111,7 +112,7 @@ proc batchRequest*(
fut.read()
if catchable.isErr():
error "rendezvous dial failed", error = catchable.error.msg
warn "a rendezvous dial failed", cause = catchable.error.msg
continue
let connOpt = catchable.get()
@ -143,7 +144,6 @@ proc advertiseAll(
for pubsubTopic in pubsubTopics:
# Get a random RDV peer for that shard
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
error "could not get a peer supporting RendezVousCodec"
continue
let namespace = computeNamespace(pubsubTopic.clusterId, pubsubTopic.shardId)
@ -151,6 +151,9 @@ proc advertiseAll(
# Advertise yourself on that peer
self.batchAdvertise(namespace, DefaultRegistrationTTL, @[rpi.peerId])
if futs.len < 1:
return err("could not get a peer supporting RendezVousCodec")
let catchable = catch:
await allFinished(futs)
@ -159,7 +162,7 @@ proc advertiseAll(
for fut in catchable.get():
if fut.failed():
error "rendezvous advertisement failed", error = fut.error.msg
warn "a rendezvous advertisement failed", cause = fut.error.msg
debug "waku rendezvous advertisements finished"
@ -178,12 +181,14 @@ proc initialRequestAll*(
# Get a random RDV peer for that shard
let rpi = self.peerManager.selectPeer(RendezVousCodec, some($pubsubTopic)).valueOr:
error "could not get a peer supporting RendezVousCodec"
continue
# Ask for peer records for that shard
self.batchRequest(namespace, PeersRequestedCount, @[rpi.peerId])
if futs.len < 1:
return err("could not get a peer supporting RendezVousCodec")
let catchable = catch:
await allFinished(futs)
@ -192,12 +197,13 @@ proc initialRequestAll*(
for fut in catchable.get():
if fut.failed():
error "rendezvous request failed", error = fut.error.msg
warn "a rendezvous request failed", cause = fut.error.msg
elif fut.finished():
let res = fut.value()
let records = res.valueOr:
return err($res.error)
warn "a rendezvous request failed", cause = $res.error
continue
for record in records:
rendezvousPeerFoundTotal.inc()
@ -209,15 +215,23 @@ proc initialRequestAll*(
proc periodicRegistration(self: WakuRendezVous) {.async.} =
debug "waku rendezvous periodic registration started",
interval = DefaultRegistrationInterval
interval = self.registrationInterval
# infinite loop
while true:
await sleepAsync(DefaultRegistrationInterval)
await sleepAsync(self.registrationInterval)
(await self.advertiseAll()).isOkOr:
debug "waku rendezvous advertisements failed", error = error
if self.registrationInterval > MaxRegistrationInterval:
self.registrationInterval = MaxRegistrationInterval
else:
self.registrationInterval += self.registrationInterval
# Back to normal interval if no errors
self.registrationInterval = DefaultRegistrationInterval
proc new*(
T: type WakuRendezVous, switch: Switch, peerManager: PeerManager, enr: Record
): Result[T, string] {.raises: [].} =
@ -246,6 +260,7 @@ proc new*(
wrv.peerManager = peerManager
wrv.relayshard = relayshard
wrv.capabilities = capabilities
wrv.registrationInterval = DefaultRegistrationInterval
debug "waku rendezvous initialized",
cluster = relayshard.clusterId,

View File

@ -200,14 +200,10 @@ method generateProof*(
messageId = messageId,
).valueOr:
return err("proof generation failed: " & $error)
return ok(proof)
if lastProcessedEpoch != epoch:
lastProcessedEpoch = epoch
waku_rln_proof_remining.set(g.userMessageLimit.get().float64 - 1)
else:
waku_rln_proof_remining.dec()
waku_rln_proofs_generated_total.inc()
waku_rln_remaining_proofs_per_epoch.dec()
waku_rln_total_generated_proofs.inc()
return ok(proof)
method isReady*(g: GroupManager): Future[bool] {.base, async.} =
raise newException(

View File

@ -63,12 +63,12 @@ declarePublicGauge(
)
declarePublicGauge(
waku_rln_proof_remining,
waku_rln_remaining_proofs_per_epoch,
"number of proofs remaining to be generated for the current epoch",
)
declarePublicGauge(
waku_rln_proofs_generated_total,
waku_rln_total_generated_proofs,
"total number of proofs generated since the node started",
)
@ -84,6 +84,7 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
var cumulativeValidMessages = 0.float64
var cumulativeProofsVerified = 0.float64
var cumulativeProofsGenerated = 0.float64
var cumulativeProofsRemaining = 100.float64
when defined(metrics):
logMetrics = proc() =
@ -102,7 +103,10 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
waku_rln_proof_verification_total, cumulativeProofsVerified
)
let freshProofsGeneratedCount =
parseAndAccumulate(waku_rln_proofs_generated_total, cumulativeProofsGenerated)
parseAndAccumulate(waku_rln_total_generated_proofs, cumulativeProofsGenerated)
let freshProofsRemainingCount = parseAndAccumulate(
waku_rln_remaining_proofs_per_epoch, cumulativeProofsRemaining
)
info "Total messages", count = freshMsgCount
info "Total spam messages", count = freshSpamCount
@ -111,4 +115,6 @@ proc getRlnMetricsLogger*(): RLNMetricsLogger =
info "Total errors", count = freshErrorCount
info "Total proofs verified", count = freshProofsVerifiedCount
info "Total proofs generated", count = freshProofsGeneratedCount
info "Total proofs remaining", count = freshProofsRemainingCount
return logMetrics

View File

@ -89,6 +89,7 @@ type WakuRLNRelay* = ref object of RootObj
groupManager*: GroupManager
onFatalErrorAction*: OnFatalErrorHandler
nonceManager*: NonceManager
epochMonitorFuture*: Future[void]
proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
## gets time `t` as `flaot64` with subseconds resolution in the fractional part
@ -96,6 +97,18 @@ proc calcEpoch*(rlnPeer: WakuRLNRelay, t: float64): Epoch =
let e = uint64(t / rlnPeer.rlnEpochSizeSec.float64)
return toEpoch(e)
proc nextEpoch*(rlnPeer: WakuRLNRelay, time: float64): float64 =
let
currentEpoch = uint64(time / rlnPeer.rlnEpochSizeSec.float64)
nextEpochTime = float64(currentEpoch + 1) * rlnPeer.rlnEpochSizeSec.float64
currentTime = epochTime()
# Ensure we always return a future time
if nextEpochTime > currentTime:
return nextEpochTime
else:
return epochTime()
proc stop*(rlnPeer: WakuRLNRelay) {.async: (raises: [Exception]).} =
## stops the rln-relay protocol
## Throws an error if it cannot stop the rln-relay protocol
@ -392,6 +405,19 @@ proc generateRlnValidator*(
return validator
proc monitorEpochs(wakuRlnRelay: WakuRLNRelay) {.async.} =
while true:
try:
waku_rln_remaining_proofs_per_epoch.set(
wakuRlnRelay.groupManager.userMessageLimit.get().float64
)
except CatchableError:
error "Error in epoch monitoring", error = getCurrentExceptionMsg()
let nextEpochTime = wakuRlnRelay.nextEpoch(epochTime())
let sleepDuration = int((nextEpochTime - epochTime()) * 1000)
await sleepAsync(sleepDuration)
proc mount(
conf: WakuRlnConfig, registrationHandler = none(RegistrationHandler)
): Future[RlnRelayResult[WakuRlnRelay]] {.async.} =
@ -445,17 +471,19 @@ proc mount(
(await groupManager.startGroupSync()).isOkOr:
return err("could not start the group sync: " & $error)
return ok(
WakuRLNRelay(
groupManager: groupManager,
nonceManager:
NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float),
rlnEpochSizeSec: conf.rlnEpochSizeSec,
rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1),
onFatalErrorAction: conf.onFatalErrorAction,
)
wakuRlnRelay = WakuRLNRelay(
groupManager: groupManager,
nonceManager:
NonceManager.init(conf.rlnRelayUserMessageLimit, conf.rlnEpochSizeSec.float),
rlnEpochSizeSec: conf.rlnEpochSizeSec,
rlnMaxEpochGap: max(uint64(MaxClockGapSeconds / float64(conf.rlnEpochSizeSec)), 1),
onFatalErrorAction: conf.onFatalErrorAction,
)
# Start epoch monitoring in the background
wakuRlnRelay.epochMonitorFuture = monitorEpochs(wakuRlnRelay)
return ok(wakuRlnRelay)
proc isReady*(rlnPeer: WakuRLNRelay): Future[bool] {.async: (raises: [Exception]).} =
## returns true if the rln-relay protocol is ready to relay messages
## returns false otherwise

View File

@ -68,6 +68,9 @@ type SyncReconciliation* = ref object of LPProtocol
proc messageIngress*(
self: SyncReconciliation, pubsubTopic: PubsubTopic, msg: WakuMessage
) =
if msg.ephemeral:
return
let msgHash = computeMessageHash(pubsubTopic, msg)
let id = SyncID(time: msg.timestamp, hash: msgHash)
@ -78,6 +81,9 @@ proc messageIngress*(
proc messageIngress*(
self: SyncReconciliation, msgHash: WakuMessageHash, msg: WakuMessage
) =
if msg.ephemeral:
return
let id = SyncID(time: msg.timestamp, hash: msgHash)
self.storage.insert(id).isOkOr:
@ -129,10 +135,10 @@ proc processRequest(
sendPayload.shards = self.shards.toSeq()
for hash in hashToSend:
await self.remoteNeedsTx.addLast((conn.peerId, hash))
self.remoteNeedsTx.addLastNoWait((conn.peerId, hash))
for hash in hashToRecv:
await self.localWantstx.addLast((conn.peerId, hash))
self.localWantsTx.addLastNoWait((conn.peerId, hash))
rawPayload = sendPayload.deltaEncode()

View File

@ -118,6 +118,10 @@ proc needsReceiverLoop(self: SyncTransfer) {.async.} =
error "failed to query archive", error = error
continue
if response.messages.len < 1:
error "failed to fetch message from db"
continue
let msg =
WakuMessageAndTopic(pubsub: response.topics[0], message: response.messages[0])