mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-06-04 13:09:32 +00:00
Merge branch 'master' into chore/retrieve-roots-current
This commit is contained in:
commit
9e227fe402
6
.github/workflows/ci-daily.yml
vendored
6
.github/workflows/ci-daily.yml
vendored
@ -3,6 +3,7 @@ name: Daily logos-delivery CI
|
||||
on:
|
||||
schedule:
|
||||
- cron: '30 6 * * *'
|
||||
workflow_dispatch:
|
||||
|
||||
env:
|
||||
NPROC: 2
|
||||
@ -77,3 +78,8 @@ jobs:
|
||||
}" \
|
||||
"$DISCORD_WEBHOOK_URL"
|
||||
|
||||
# RLN end-to-end against the simulator. Defaults from tests/simulator/rln-sim.env.
|
||||
rln-simulator:
|
||||
uses: ./.github/workflows/ci-rln-simulator.yml
|
||||
secrets: inherit
|
||||
|
||||
|
||||
271
.github/workflows/ci-rln-simulator.yml
vendored
Normal file
271
.github/workflows/ci-rln-simulator.yml
vendored
Normal file
@ -0,0 +1,271 @@
|
||||
name: RLN E2E — Simulator
|
||||
|
||||
# Validates the full RLN flow end-to-end against logos-delivery-simulator:
|
||||
# keystore generation, on-chain registration, gossipsub propagation,
|
||||
# per-epoch rate-limit enforcement, and epoch-boundary recovery.
|
||||
#
|
||||
# Why this exists: logos-dev runs with RLN disabled, so there is no
|
||||
# production traffic exercising RLN. Until RLN is enabled there, this is
|
||||
# the only end-to-end coverage of the RLN + zerokit path.
|
||||
#
|
||||
# The image is built ON the runner and tested ON the same runner, so the
|
||||
# AVX-512 portability issue in container-image.yml does not apply here.
|
||||
#
|
||||
# No own schedule: ci-daily.yml is the single daily entry point and calls
|
||||
# this via workflow_call. workflow_dispatch allows manual runs.
|
||||
# Run defaults live in tests/simulator/rln-sim.env; inputs override per-run.
|
||||
|
||||
on:
|
||||
workflow_call:
|
||||
inputs:
|
||||
branch:
|
||||
type: string
|
||||
default: ''
|
||||
num_nodes:
|
||||
type: string
|
||||
default: ''
|
||||
msg_limit:
|
||||
type: string
|
||||
default: ''
|
||||
epoch_sec:
|
||||
type: string
|
||||
default: ''
|
||||
workflow_dispatch:
|
||||
inputs:
|
||||
branch:
|
||||
description: 'logos-delivery branch to build & test (blank = use rln-sim.env)'
|
||||
type: string
|
||||
default: ''
|
||||
num_nodes:
|
||||
description: 'Number of nwaku nodes (blank = use rln-sim.env)'
|
||||
type: string
|
||||
default: ''
|
||||
msg_limit:
|
||||
description: 'RLN_RELAY_MSG_LIMIT, must be >= contract min ~20 (blank = use rln-sim.env)'
|
||||
type: string
|
||||
default: ''
|
||||
epoch_sec:
|
||||
description: 'RLN_RELAY_EPOCH_SEC, large enough a burst cannot straddle an epoch (blank = use rln-sim.env)'
|
||||
type: string
|
||||
default: ''
|
||||
|
||||
env:
|
||||
NPROC: 2
|
||||
MAKEFLAGS: "-j2"
|
||||
NIM_VERSION: '2.2.4'
|
||||
NIMBLE_VERSION: '0.22.3'
|
||||
|
||||
jobs:
|
||||
rln-e2e:
|
||||
runs-on: ubuntu-22.04
|
||||
timeout-minutes: 120
|
||||
name: rln-e2e
|
||||
|
||||
steps:
|
||||
# First checkout: the ref that triggered this workflow (CI branch /
|
||||
# master). This is where the e2e test script and rln-sim.env live —
|
||||
# the build branch may not contain them.
|
||||
- name: Checkout CI ref (for the test script)
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: false
|
||||
|
||||
# Defaults come from tests/simulator/rln-sim.env (single source of truth);
|
||||
# a non-blank input (dispatch or workflow_call) overrides the matching value.
|
||||
- name: Resolve parameters
|
||||
id: cfg
|
||||
env:
|
||||
IN_BRANCH: ${{ inputs.branch }}
|
||||
IN_NUM_NODES: ${{ inputs.num_nodes }}
|
||||
IN_MSG_LIMIT: ${{ inputs.msg_limit }}
|
||||
IN_EPOCH_SEC: ${{ inputs.epoch_sec }}
|
||||
run: |
|
||||
set -euo pipefail
|
||||
set -a; . tests/simulator/rln-sim.env; set +a
|
||||
{
|
||||
echo "branch=${IN_BRANCH:-$BRANCH}"
|
||||
echo "num_nodes=${IN_NUM_NODES:-$NUM_NODES}"
|
||||
echo "msg_limit=${IN_MSG_LIMIT:-$MSG_LIMIT}"
|
||||
echo "epoch_sec=${IN_EPOCH_SEC:-$EPOCH_SEC}"
|
||||
} >> "$GITHUB_OUTPUT"
|
||||
|
||||
- name: Stash e2e test script outside the workspace
|
||||
run: |
|
||||
test -f tests/simulator/rln-e2e-test.py \
|
||||
|| { echo "tests/simulator/rln-e2e-test.py missing on CI ref"; exit 1; }
|
||||
cp tests/simulator/rln-e2e-test.py "$RUNNER_TEMP/rln-e2e-test.py"
|
||||
|
||||
# Second checkout: the branch to build & test. Overwrites the workspace;
|
||||
# the stashed test script in RUNNER_TEMP survives.
|
||||
- name: Checkout logos-delivery (${{ steps.cfg.outputs.branch }})
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
ref: ${{ steps.cfg.outputs.branch }}
|
||||
submodules: false
|
||||
clean: true
|
||||
|
||||
- name: Get submodules hash
|
||||
id: submodules
|
||||
run: echo "hash=$(git submodule status | awk '{print $1}' | sort | shasum -a 256 | sed 's/[ -]*//g')" >> $GITHUB_OUTPUT
|
||||
|
||||
- name: Cache submodules
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
vendor/
|
||||
.git/modules
|
||||
key: ${{ runner.os }}-vendor-modules-${{ steps.submodules.outputs.hash }}
|
||||
|
||||
- name: Install Nim ${{ env.NIM_VERSION }}
|
||||
uses: jiro4989/setup-nim-action@v2
|
||||
with:
|
||||
nim-version: ${{ env.NIM_VERSION }}
|
||||
repo-token: ${{ secrets.GITHUB_TOKEN }}
|
||||
|
||||
- name: Install Nimble ${{ env.NIMBLE_VERSION }}
|
||||
run: |
|
||||
cd /tmp && nimble install "nimble@${{ env.NIMBLE_VERSION }}" -y
|
||||
echo "$HOME/.nimble/bin" >> $GITHUB_PATH
|
||||
|
||||
- name: Cache nimble deps
|
||||
id: cache-nimbledeps
|
||||
uses: actions/cache@v3
|
||||
with:
|
||||
path: |
|
||||
nimbledeps/
|
||||
nimble.paths
|
||||
key: ${{ runner.os }}-nimbledeps-nimble${{ env.NIMBLE_VERSION }}-${{ hashFiles('nimble.lock', 'BearSSL.mk', 'Nat.mk') }}
|
||||
|
||||
- name: Install nimble deps
|
||||
if: steps.cache-nimbledeps.outputs.cache-hit != 'true'
|
||||
run: |
|
||||
nimble setup --localdeps -y
|
||||
make rebuild-nat-libs-nimbledeps
|
||||
make rebuild-bearssl-nimbledeps
|
||||
touch nimbledeps/.nimble-setup
|
||||
|
||||
- name: Build wakunode2
|
||||
run: |
|
||||
make -j${NPROC} V=1 POSTGRES=1 \
|
||||
NIMFLAGS="-d:disableMarchNative -d:chronicles_colors:none" \
|
||||
wakunode2
|
||||
|
||||
- name: Build local Docker image
|
||||
run: |
|
||||
docker build -t nwaku-rln-ci:test -f docker/binaries/Dockerfile.bn.amd64 .
|
||||
|
||||
- name: Clone logos-delivery-simulator
|
||||
run: |
|
||||
git clone --depth 1 https://github.com/logos-messaging/logos-delivery-simulator.git "$RUNNER_TEMP/logos-delivery-simulator"
|
||||
|
||||
- name: Write simulator .env
|
||||
working-directory: ${{ runner.temp }}/logos-delivery-simulator
|
||||
run: |
|
||||
cat > .env <<EOF
|
||||
LD_IMAGE=nwaku-rln-ci:test
|
||||
NUM_LD_NODES=${{ steps.cfg.outputs.num_nodes }}
|
||||
MSG_SIZE_KBYTES=1
|
||||
TRAFFIC_DELAY_SECONDS=5
|
||||
RLN_RELAY_EPOCH_SEC=${{ steps.cfg.outputs.epoch_sec }}
|
||||
RLN_RELAY_MSG_LIMIT=${{ steps.cfg.outputs.msg_limit }}
|
||||
MAX_MESSAGE_LIMIT=100
|
||||
RPC_URL=http://foundry:8545
|
||||
PRIVATE_KEY=0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80
|
||||
ETH_FROM=0xf39fd6e51aad88f6f4ce6ab8827279cfffb92266
|
||||
RLN_CONTRACT_REPO_COMMIT=e75ac913e579ad872f54b2225eec35d1de3d98b0
|
||||
WATCHTOWER_ENABLED=false
|
||||
EOF
|
||||
|
||||
- name: Bring up simulator (RLN subset)
|
||||
working-directory: ${{ runner.temp }}/logos-delivery-simulator
|
||||
run: |
|
||||
docker compose up -d foundry contract-repo-deployer nwaku-token-init bootstrap nwaku
|
||||
|
||||
- name: Wait for contract deployer
|
||||
working-directory: ${{ runner.temp }}/logos-delivery-simulator
|
||||
run: |
|
||||
for _ in $(seq 1 60); do
|
||||
st=$(docker inspect logos-delivery-simulator-contract-repo-deployer-1 --format='{{.State.Status}}' 2>/dev/null || echo missing)
|
||||
[ "$st" = "exited" ] && break
|
||||
echo "deployer status: $st"; sleep 15
|
||||
done
|
||||
ec=$(docker inspect logos-delivery-simulator-contract-repo-deployer-1 --format='{{.State.ExitCode}}')
|
||||
echo "deployer exit code: $ec"
|
||||
if [ "$ec" != "0" ]; then
|
||||
docker logs logos-delivery-simulator-contract-repo-deployer-1 2>&1 | tail -50
|
||||
exit 1
|
||||
fi
|
||||
|
||||
- name: Wait for nwaku fleet to register
|
||||
working-directory: ${{ runner.temp }}/logos-delivery-simulator
|
||||
run: |
|
||||
N=${{ steps.cfg.outputs.num_nodes }}
|
||||
for _ in $(seq 1 60); do
|
||||
up=$(docker ps --filter 'name=logos-delivery-simulator-nwaku-' --filter 'status=running' --format '{{.Names}}' | wc -l)
|
||||
echo "nwaku running: $up/$N"
|
||||
[ "$up" -ge "$N" ] && break
|
||||
sleep 15
|
||||
done
|
||||
# nwaku-1 must reach the "registered + started" marker
|
||||
timeout 300 docker logs -f logos-delivery-simulator-nwaku-1 2>&1 \
|
||||
| grep -m1 -E "Segmentation fault|Illegal instruction|Failed to register on-chain|I am a nwaku node" \
|
||||
| tee /tmp/nwaku1.verdict
|
||||
grep -q "I am a nwaku node" /tmp/nwaku1.verdict
|
||||
|
||||
- name: Run RLN e2e scenarios
|
||||
run: |
|
||||
TEST_SCRIPT="$RUNNER_TEMP/rln-e2e-test.py"
|
||||
test -f "$TEST_SCRIPT" \
|
||||
|| { echo "stashed test script missing at $TEST_SCRIPT"; exit 1; }
|
||||
docker run --rm \
|
||||
--network logos-delivery-simulator_simulation \
|
||||
-v "$TEST_SCRIPT:/test.py:ro" \
|
||||
python:3.11-slim \
|
||||
sh -c "pip install --quiet --disable-pip-version-check requests && \
|
||||
python /test.py \
|
||||
--hostname-prefix logos-delivery-simulator-nwaku- \
|
||||
--num-nodes ${{ steps.cfg.outputs.num_nodes }} \
|
||||
--msg-limit ${{ steps.cfg.outputs.msg_limit }} \
|
||||
--epoch-sec ${{ steps.cfg.outputs.epoch_sec }} \
|
||||
--health-deadline-sec 600"
|
||||
|
||||
- name: Collect logs on failure
|
||||
if: failure()
|
||||
working-directory: ${{ runner.temp }}/logos-delivery-simulator
|
||||
run: |
|
||||
mkdir -p "$RUNNER_TEMP/logs"
|
||||
for c in $(docker ps -a --filter 'name=logos-delivery-simulator-' --format '{{.Names}}'); do
|
||||
docker logs "$c" > "$RUNNER_TEMP/logs/$c.log" 2>&1 || true
|
||||
done
|
||||
|
||||
- name: Upload logs
|
||||
if: failure()
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: simulator-logs
|
||||
path: ${{ runner.temp }}/logs
|
||||
retention-days: 7
|
||||
|
||||
- name: Tear down
|
||||
if: always()
|
||||
working-directory: ${{ runner.temp }}/logos-delivery-simulator
|
||||
run: docker compose down -v || true
|
||||
|
||||
- name: Notify Discord
|
||||
if: always()
|
||||
env:
|
||||
DISCORD_WEBHOOK_URL: ${{ secrets.DISCORD_WEBHOOK_URL }}
|
||||
run: |
|
||||
[ -z "$DISCORD_WEBHOOK_URL" ] && exit 0
|
||||
STATUS="${{ job.status }}"
|
||||
BRANCH="${{ steps.cfg.outputs.branch }}"
|
||||
RUN_URL="https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}"
|
||||
if [ "$STATUS" = "success" ]; then COLOR=3066993; TITLE="✅ RLN E2E passed"; else COLOR=15158332; TITLE="❌ RLN E2E failed"; fi
|
||||
curl -H "Content-Type: application/json" -X POST -d "{
|
||||
\"embeds\":[{\"title\":\"$TITLE\",\"color\":$COLOR,
|
||||
\"fields\":[
|
||||
{\"name\":\"Branch\",\"value\":\"$BRANCH\",\"inline\":true},
|
||||
{\"name\":\"Status\",\"value\":\"$STATUS\",\"inline\":true}],
|
||||
\"url\":\"$RUN_URL\",
|
||||
\"footer\":{\"text\":\"Daily RLN simulator E2E\"}}]}" \
|
||||
"$DISCORD_WEBHOOK_URL"
|
||||
1
.github/workflows/ci.yml
vendored
1
.github/workflows/ci.yml
vendored
@ -43,6 +43,7 @@ jobs:
|
||||
- 'tools/**'
|
||||
- 'tests/all_tests_v2.nim'
|
||||
- 'tests/**'
|
||||
- 'channels/**'
|
||||
docker:
|
||||
- 'docker/**'
|
||||
|
||||
|
||||
4
.gitignore
vendored
4
.gitignore
vendored
@ -86,3 +86,7 @@ nimbledeps
|
||||
|
||||
**/anvil_state/state-deployed-contracts-mint-and-approved.json
|
||||
.gitnexus
|
||||
|
||||
# Python bytecode from tests/simulator
|
||||
__pycache__/
|
||||
*.pyc
|
||||
|
||||
53
AGENTS.md
53
AGENTS.md
@ -16,7 +16,7 @@ Key architectural decisions:
|
||||
|
||||
Resource-restricted first: Protocols differentiate between full nodes (relay) and light clients (filter, lightpush, store). Light clients can participate without maintaining full message history or relay capabilities. This explains the client/server split in protocol implementations.
|
||||
|
||||
Privacy through unlinkability: RLN (Rate Limiting Nullifier) provides DoS protection while preserving sender anonymity. Messages are routed through pubsub topics with automatic sharding across 8 shards. Code prioritizes metadata privacy alongside content encryption.
|
||||
Privacy through unlinkability: RLN (Rate Limiting Nullifier) provides DoS protection while preserving sender anonymity. Messages are routed through pubsub topics with automatic content-topic-based sharding (shard count is configurable; generation-zero defaults to 8 shards on cluster 0). Code prioritizes metadata privacy alongside content encryption.
|
||||
|
||||
Scalability via sharding: The network uses automatic content-topic-based sharding to distribute traffic. This is why you'll see sharding logic throughout the codebase and why pubsub topic selection is protocol-level, not application-level.
|
||||
|
||||
@ -36,7 +36,10 @@ See [documentation](https://docs.waku.org/learn/) for architectural details.
|
||||
### Key Terminology
|
||||
- ENR (Ethereum Node Record): Node identity and capability advertisement
|
||||
- Multiaddr: libp2p addressing format (e.g., `/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2...`)
|
||||
- PubsubTopic: Gossipsub topic for message routing (e.g., `/waku/2/default-waku/proto`)
|
||||
- PubsubTopic: Gossipsub topic for message routing (shard-based, e.g., `/waku/2/rs/<cluster-id>/<shard-id>`; the default is `/waku/2/rs/0/0`)
|
||||
- cluster-id: network id
|
||||
- shard-id: shard differentiator inside the network - drivers mesh forming.
|
||||
- autosharding: network supports n (configured) shards [0..n-1], shard derived from ContentTopic
|
||||
- ContentTopic: Application-level message categorization (e.g., `/my-app/1/chat/proto`)
|
||||
- Sharding: Partitioning network traffic across topics (static or auto-sharding)
|
||||
- RLN (Rate Limiting Nullifier): Zero-knowledge proof system for spam prevention
|
||||
@ -77,29 +80,29 @@ type WakuFilter* = ref object of LPProtocol
|
||||
### Build Requirements
|
||||
- Nim 2.x (check `waku.nimble` for minimum version)
|
||||
- Rust toolchain (required for RLN dependencies)
|
||||
- Build system: Make with nimbus-build-system
|
||||
- Build system: Make driven by Nimble (dependencies pinned in `nimble.lock`)
|
||||
|
||||
### Build System
|
||||
The project uses Makefile with nimbus-build-system (Status's Nim build framework):
|
||||
The project uses a Makefile that drives Nimble. Dependencies are resolved from
|
||||
`nimble.lock` into a local `nimbledeps/` directory (tracked by the
|
||||
`NIMBLEDEPS_STAMP` target).
|
||||
```bash
|
||||
# Initial build (updates submodules)
|
||||
# Initial build (resolves Nimble deps automatically)
|
||||
make wakunode2
|
||||
|
||||
# After git pull, update submodules
|
||||
make update
|
||||
|
||||
# Build with custom flags
|
||||
make wakunode2 NIMFLAGS="-d:chronicles_log_level=DEBUG"
|
||||
```
|
||||
|
||||
Note: The build system uses `--mm:refc` memory management (automatically enforced). Only relevant if compiling outside the standard build system.
|
||||
Note: The build uses `--mm:refc` memory management (passed automatically by the Nimble tasks in `waku.nimble`). Only relevant if compiling outside the standard build system.
|
||||
|
||||
### Common Make Targets
|
||||
```bash
|
||||
make wakunode2 # Build main node binary
|
||||
make test # Run all tests
|
||||
make testcommon # Run common tests only
|
||||
make libwakuStatic # Build static C library
|
||||
make libwaku # Build the legacy C library (libwaku)
|
||||
make liblogosdelivery. # Build actual C FFI library
|
||||
make chat2 # Build chat example
|
||||
make install-nph # Install git hook for auto-formatting
|
||||
```
|
||||
@ -127,7 +130,7 @@ suite "Waku ENR - Capabilities":
|
||||
test "check capabilities support":
|
||||
## Given
|
||||
let bitfield: CapabilitiesBitfield = 0b0000_1101u8
|
||||
|
||||
|
||||
## Then
|
||||
check:
|
||||
bitfield.supportsCapability(Capabilities.Relay)
|
||||
@ -135,7 +138,7 @@ suite "Waku ENR - Capabilities":
|
||||
```
|
||||
|
||||
### Code Formatting
|
||||
Mandatory: All code must be formatted with `nph` (vendored in `vendor/nph`)
|
||||
Mandatory: All code must be formatted with `nph` (installed via `make build-nph`, which fetches a pinned `nph` version with Nimble)
|
||||
```bash
|
||||
# Format specific file
|
||||
make nph/waku/waku_core.nim
|
||||
@ -162,7 +165,6 @@ Compile with log level:
|
||||
nim c -d:chronicles_log_level=TRACE myfile.nim
|
||||
```
|
||||
|
||||
|
||||
## Code Conventions
|
||||
|
||||
Common pitfalls:
|
||||
@ -181,8 +183,13 @@ Common pitfalls:
|
||||
- Exceptions: `XxxError` for CatchableError, `XxxDefect` for Defect
|
||||
- ref object types: `XxxRef` suffix
|
||||
|
||||
### Calls and Member Access
|
||||
- Prefer dot call syntax for predicates: `x.isNil()` instead of `isNil(x)`
|
||||
- Use parentheses for "verbs" (operations/actions): `isSome()`, `handleRequest()`
|
||||
- Omit parentheses for "nouns" (properties/values): `.len`, `.high`
|
||||
|
||||
### Imports Organization
|
||||
Group imports: stdlib, external libs, internal modules:
|
||||
Stdlib + external in one `import` block, internal modules in a separate block:
|
||||
```nim
|
||||
import
|
||||
std/[options, sequtils], # stdlib
|
||||
@ -214,11 +221,11 @@ proc subscribe(
|
||||
): Future[FilterSubscribeResult] {.async.} =
|
||||
if contentTopics.len > MaxContentTopicsPerRequest:
|
||||
return err(FilterSubscribeError.badRequest("exceeds maximum"))
|
||||
|
||||
|
||||
# Handle Result with isOkOr
|
||||
(await wf.subscriptions.addSubscription(peerId, criteria)).isOkOr:
|
||||
return err(FilterSubscribeError.serviceUnavailable(error))
|
||||
|
||||
|
||||
ok()
|
||||
```
|
||||
|
||||
@ -460,8 +467,7 @@ nim c -r \
|
||||
|
||||
### Vendor Directory
|
||||
- Never edit files directly in vendor - it is auto-generated from git submodules
|
||||
- Always run `make update` after pulling changes
|
||||
- Managed by `nimbus-build-system`
|
||||
- Nimble dependencies are resolved from `nimble.lock` into `nimbledeps/`
|
||||
|
||||
### Chronicles Performance
|
||||
- Log levels are configured at compile time for performance
|
||||
@ -475,7 +481,7 @@ nim c -r \
|
||||
|
||||
### RLN Dependencies
|
||||
- RLN code requires a Rust toolchain, which explains Rust imports in some modules
|
||||
- Pre-built `librln` libraries are checked into the repository
|
||||
- `librln` is built from the vendored `zerokit` submodule via the `librln`/`rln-deps` Make targets
|
||||
|
||||
## Quick Reference
|
||||
|
||||
@ -483,18 +489,19 @@ Language: Nim 2.x | License: MIT or Apache 2.0
|
||||
|
||||
### Important Files
|
||||
- `Makefile` - Primary build interface
|
||||
- `waku.nimble` - Package definition and build tasks (called via nimbus-build-system)
|
||||
- `vendor/nimbus-build-system/` - Status's build framework
|
||||
- `waku.nimble` - Package definition and build tasks (invoked by the Makefile via Nimble)
|
||||
- `nimble.lock` - Pinned dependency versions resolved into `nimbledeps/`
|
||||
- `waku/node/waku_node.nim` - Core node implementation
|
||||
- `apps/wakunode2/wakunode2.nim` - Main CLI application
|
||||
- `waku/factory/waku_conf.nim` - Configuration types
|
||||
- `library/libwaku.nim` - C bindings entry point
|
||||
- `liblogosdelivery/liblogosdelivery.nim` - C bindings entry point
|
||||
|
||||
### Testing Entry Points
|
||||
- `tests/all_tests_waku.nim` - All Waku protocol tests
|
||||
- `tests/all_tests_wakunode2.nim` - Node application tests
|
||||
- `tests/all_tests_common.nim` - Common utilities tests
|
||||
|
||||
#### in-flight testing
|
||||
- any test can be run separately by issuing `make test tests/<relativepath>/<unit-test-source>.nim`
|
||||
### Key Dependencies
|
||||
- `chronos` - Async framework
|
||||
- `nim-results` - Result type for error handling
|
||||
|
||||
15
BearSSL.mk
15
BearSSL.mk
@ -9,7 +9,7 @@
|
||||
## bearssl (nimbledeps) ##
|
||||
###########################
|
||||
# Rebuilds libbearssl.a from the package installed by nimble under
|
||||
# nimbledeps/pkgs2/. Used by `make update` / $(NIMBLEDEPS_STAMP).
|
||||
# nimbledeps/pkgs2/. Invoked via $(NIMBLEDEPS_STAMP) / build-deps.
|
||||
#
|
||||
# BEARSSL_NIMBLEDEPS_DIR is evaluated at parse time, so targets that
|
||||
# depend on it must be invoked via a recursive $(MAKE) call so the sub-make
|
||||
@ -29,18 +29,11 @@ else
|
||||
PORTABLE_BEARSSL_CFLAGS := -W -Wall -Os -fPIC
|
||||
endif
|
||||
|
||||
.PHONY: clean-bearssl-nimbledeps rebuild-bearssl-nimbledeps
|
||||
.PHONY: rebuild-bearssl-nimbledeps
|
||||
|
||||
clean-bearssl-nimbledeps:
|
||||
rebuild-bearssl-nimbledeps:
|
||||
ifeq ($(BEARSSL_NIMBLEDEPS_DIR),)
|
||||
$(error No bearssl package found under nimbledeps/pkgs2/ — run 'make update' first)
|
||||
endif
|
||||
+ [ -e "$(BEARSSL_CSOURCES_DIR)/build" ] && \
|
||||
"$(MAKE)" -C "$(BEARSSL_CSOURCES_DIR)" clean || true
|
||||
|
||||
rebuild-bearssl-nimbledeps: | clean-bearssl-nimbledeps
|
||||
ifeq ($(BEARSSL_NIMBLEDEPS_DIR),)
|
||||
$(error No bearssl package found under nimbledeps/pkgs2/ — run 'make update' first)
|
||||
$(error No bearssl package found under nimbledeps/pkgs2/ — run 'make build-deps' first)
|
||||
endif
|
||||
@echo "Rebuilding bearssl from $(BEARSSL_CSOURCES_DIR)"
|
||||
+ "$(MAKE)" -C "$(BEARSSL_CSOURCES_DIR)" CFLAGS="$(PORTABLE_BEARSSL_CFLAGS)" lib
|
||||
71
Makefile
71
Makefile
@ -24,6 +24,7 @@ export PATH := $(HOME)/.nimble/bin:$(PATH)
|
||||
# NIM binary location
|
||||
NIM_BINARY := $(shell which nim 2>/dev/null)
|
||||
NPH := $(HOME)/.nimble/bin/nph
|
||||
NIMBLE := $(HOME)/.nimble/bin/nimble
|
||||
NIMBLEDEPS_STAMP := nimbledeps/.nimble-setup
|
||||
|
||||
# Compilation parameters
|
||||
@ -42,7 +43,8 @@ endif
|
||||
##########
|
||||
## Main ##
|
||||
##########
|
||||
.PHONY: all test update clean examples deps nimble install-nim install-nimble
|
||||
# The Makefile automatically bootstraps dependency setup when needed for build and test targets.
|
||||
.PHONY: all test clean examples deps nimble install-nim install-nimble
|
||||
|
||||
# default target
|
||||
all: | wakunode2 libwaku liblogosdelivery
|
||||
@ -69,18 +71,16 @@ endif
|
||||
waku.nims:
|
||||
ln -s waku.nimble $@
|
||||
|
||||
$(NIMBLEDEPS_STAMP): nimble.lock | waku.nims
|
||||
$(MAKE) install-nimble
|
||||
nimble setup --localdeps
|
||||
$(MAKE) build-nph
|
||||
$(MAKE) rebuild-bearssl-nimbledeps
|
||||
$(MAKE) rebuild-nat-libs-nimbledeps
|
||||
$(NIMBLEDEPS_STAMP): nimble.lock | install-nimble build-nph waku.nims
|
||||
$(NIMBLE) setup --localdeps
|
||||
touch $@
|
||||
|
||||
update:
|
||||
rm -f $(NIMBLEDEPS_STAMP)
|
||||
$(MAKE) $(NIMBLEDEPS_STAMP)
|
||||
nimble lock
|
||||
# Must be phony so the recipe always runs and the sub-make re-evaluates
|
||||
# BEARSSL_NIMBLEDEPS_DIR / NAT_TRAVERSAL_NIMBLEDEPS_DIR (parse-time variables)
|
||||
# after nimble setup has populated nimbledeps/.
|
||||
.PHONY: build-deps
|
||||
build-deps: | $(NIMBLEDEPS_STAMP)
|
||||
$(MAKE) rebuild-bearssl-nimbledeps rebuild-nat-libs-nimbledeps
|
||||
|
||||
clean:
|
||||
rm -rf build 2> /dev/null || true
|
||||
@ -93,15 +93,14 @@ REQUIRED_NIM_VERSION := $(shell grep -E '^const RequiredNimVersion\s*=' waku.
|
||||
REQUIRED_NIMBLE_VERSION := $(shell grep -E '^const RequiredNimbleVersion\s*=' waku.nimble | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')
|
||||
|
||||
install-nim:
|
||||
ifneq ($(detected_OS),Windows)
|
||||
scripts/install_nim.sh $(REQUIRED_NIM_VERSION)
|
||||
endif
|
||||
|
||||
install-nimble: install-nim
|
||||
@nimble_ver=$$(nimble --version 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1); \
|
||||
if [ "$$nimble_ver" = "$(REQUIRED_NIMBLE_VERSION)" ]; then \
|
||||
echo "nimble $(REQUIRED_NIMBLE_VERSION) already installed, skipping."; \
|
||||
else \
|
||||
cd $$(mktemp -d) && nimble install "nimble@$(REQUIRED_NIMBLE_VERSION)" -y; \
|
||||
fi
|
||||
ifneq ($(detected_OS),Windows)
|
||||
scripts/install_nimble.sh $(REQUIRED_NIMBLE_VERSION)
|
||||
endif
|
||||
|
||||
build:
|
||||
mkdir -p build
|
||||
@ -203,7 +202,7 @@ clean: | clean-librln
|
||||
#################
|
||||
.PHONY: testcommon
|
||||
|
||||
testcommon: | $(NIMBLEDEPS_STAMP) build
|
||||
testcommon: | build-deps build
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble testcommon
|
||||
|
||||
@ -212,59 +211,59 @@ testcommon: | $(NIMBLEDEPS_STAMP) build
|
||||
##########
|
||||
.PHONY: testwaku wakunode2 testwakunode2 example2 chat2 chat2bridge liteprotocoltester
|
||||
|
||||
testwaku: | $(NIMBLEDEPS_STAMP) build rln-deps librln
|
||||
testwaku: | build-deps build rln-deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble test
|
||||
|
||||
wakunode2: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
wakunode2: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble wakunode2
|
||||
|
||||
benchmarks: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
benchmarks: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble benchmarks
|
||||
|
||||
testwakunode2: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
testwakunode2: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble testwakunode2
|
||||
|
||||
example2: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
example2: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble example2
|
||||
|
||||
chat2: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
chat2: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble chat2
|
||||
|
||||
chat2mix: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
chat2mix: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble chat2mix
|
||||
|
||||
rln-db-inspector: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
rln-db-inspector: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble rln_db_inspector
|
||||
|
||||
chat2bridge: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
chat2bridge: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble chat2bridge
|
||||
|
||||
liteprotocoltester: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
liteprotocoltester: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble liteprotocoltester
|
||||
|
||||
lightpushwithmix: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
lightpushwithmix: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble lightpushwithmix
|
||||
|
||||
api_example: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
api_example: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
$(ENV_SCRIPT) nim api_example $(NIM_PARAMS) waku.nims
|
||||
|
||||
build/%: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
build/%: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$*" && \
|
||||
nimble buildone $*
|
||||
|
||||
compile-test: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
compile-test: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "$(TEST_FILE)" "\"$(TEST_NAME)\"" && \
|
||||
nimble buildTest $(TEST_FILE) && \
|
||||
nimble execTest $(TEST_FILE) "\"$(TEST_NAME)\""
|
||||
@ -276,11 +275,11 @@ compile-test: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
|
||||
tools: networkmonitor wakucanary
|
||||
|
||||
wakucanary: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
wakucanary: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble wakucanary
|
||||
|
||||
networkmonitor: | $(NIMBLEDEPS_STAMP) build deps librln
|
||||
networkmonitor: | build-deps build deps librln
|
||||
echo -e $(BUILD_MSG) "build/$@" && \
|
||||
nimble networkmonitor
|
||||
|
||||
@ -424,10 +423,10 @@ else ifeq ($(detected_OS),Linux)
|
||||
BUILD_COMMAND := $(BUILD_COMMAND)Linux
|
||||
endif
|
||||
|
||||
libwaku: | $(NIMBLEDEPS_STAMP) librln
|
||||
libwaku: | build-deps librln
|
||||
nimble --verbose libwaku$(BUILD_COMMAND) waku.nimble
|
||||
|
||||
liblogosdelivery: | $(NIMBLEDEPS_STAMP) librln
|
||||
liblogosdelivery: | build-deps librln
|
||||
nimble --verbose liblogosdelivery$(BUILD_COMMAND) waku.nimble
|
||||
|
||||
logosdelivery_example: | build liblogosdelivery
|
||||
|
||||
19
Nat.mk
19
Nat.mk
@ -9,7 +9,7 @@
|
||||
## nat-libs (nimbledeps) ##
|
||||
###########################
|
||||
# Builds miniupnpc and libnatpmp from the package installed by nimble under
|
||||
# nimbledeps/pkgs2/. Used by `make update` / $(NIMBLEDEPS_STAMP).
|
||||
# nimbledeps/pkgs2/. Invoked via $(NIMBLEDEPS_STAMP) / build-deps.
|
||||
#
|
||||
# NAT_TRAVERSAL_NIMBLEDEPS_DIR is evaluated at parse time, so targets that
|
||||
# depend on it must be invoked via a recursive $(MAKE) call so the sub-make
|
||||
@ -28,20 +28,11 @@ else
|
||||
PORTABLE_NAT_MARCH :=
|
||||
endif
|
||||
|
||||
.PHONY: clean-cross-nimbledeps rebuild-nat-libs-nimbledeps
|
||||
.PHONY: rebuild-nat-libs-nimbledeps
|
||||
|
||||
clean-cross-nimbledeps:
|
||||
rebuild-nat-libs-nimbledeps:
|
||||
ifeq ($(NAT_TRAVERSAL_NIMBLEDEPS_DIR),)
|
||||
$(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make update' first)
|
||||
endif
|
||||
+ [ -e "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/miniupnp/miniupnpc" ] && \
|
||||
"$(MAKE)" -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/miniupnp/miniupnpc" CC=$(CC) clean $(HANDLE_OUTPUT) || true
|
||||
+ [ -e "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" ] && \
|
||||
"$(MAKE)" -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" CC=$(CC) clean $(HANDLE_OUTPUT) || true
|
||||
|
||||
rebuild-nat-libs-nimbledeps: | clean-cross-nimbledeps
|
||||
ifeq ($(NAT_TRAVERSAL_NIMBLEDEPS_DIR),)
|
||||
$(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make update' first)
|
||||
$(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make build-deps' first)
|
||||
endif
|
||||
@echo "Rebuilding nat-libs from $(NAT_TRAVERSAL_NIMBLEDEPS_DIR)"
|
||||
ifeq ($(OS), Windows_NT)
|
||||
@ -58,4 +49,4 @@ else
|
||||
+ "$(MAKE)" CFLAGS="-Wall -Wno-cpp -Os -fPIC $(PORTABLE_NAT_MARCH) -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4 $(CFLAGS)" \
|
||||
-C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" \
|
||||
CC=$(CC) libnatpmp.a $(HANDLE_OUTPUT)
|
||||
endif
|
||||
endif
|
||||
@ -1,7 +1,7 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import
|
||||
std/[tables, times, strutils, hashes, sequtils, json],
|
||||
std/[tables, times, strutils, hashes, sequtils, json, options],
|
||||
chronos,
|
||||
confutils,
|
||||
chronicles,
|
||||
@ -267,10 +267,16 @@ when isMainModule:
|
||||
else:
|
||||
nodev2ExtPort
|
||||
|
||||
let nodev2Key =
|
||||
if conf.nodekey.isSome():
|
||||
conf.nodekey.get()
|
||||
else:
|
||||
crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
|
||||
|
||||
let bridge = Chat2Matterbridge.new(
|
||||
mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)),
|
||||
mbGateway = conf.mbGateway,
|
||||
nodev2Key = conf.nodekey,
|
||||
nodev2Key = nodev2Key,
|
||||
nodev2BindIp = conf.listenAddress,
|
||||
nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
|
||||
nodev2ExtIp = nodev2ExtIp,
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
import
|
||||
std/options,
|
||||
confutils,
|
||||
confutils/defs,
|
||||
confutils/std/net,
|
||||
@ -45,7 +46,7 @@ type Chat2MatterbridgeConf* = object
|
||||
|
||||
metricsServerAddress* {.
|
||||
desc: "Listening address of the metrics server",
|
||||
defaultValue: parseIpAddress("127.0.0.1"),
|
||||
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
|
||||
name: "metrics-server-address"
|
||||
.}: IpAddress
|
||||
|
||||
@ -62,10 +63,8 @@ type Chat2MatterbridgeConf* = object
|
||||
.}: seq[string]
|
||||
|
||||
nodekey* {.
|
||||
desc: "P2P node private key as hex",
|
||||
defaultValue: crypto.PrivateKey.random(Secp256k1, newRng()[]).tryGet(),
|
||||
name: "nodekey"
|
||||
.}: crypto.PrivateKey
|
||||
desc: "P2P node private key as hex", defaultValueDesc: "random", name: "nodekey"
|
||||
.}: Option[crypto.PrivateKey]
|
||||
|
||||
store* {.
|
||||
desc: "Flag whether to start store protocol", defaultValue: true, name: "store"
|
||||
@ -94,7 +93,7 @@ type Chat2MatterbridgeConf* = object
|
||||
# Matterbridge options
|
||||
mbHostAddress* {.
|
||||
desc: "Listening address of the Matterbridge host",
|
||||
defaultValue: parseIpAddress("127.0.0.1"),
|
||||
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
|
||||
name: "mb-host-address"
|
||||
.}: IpAddress
|
||||
|
||||
|
||||
@ -162,7 +162,8 @@ type
|
||||
|
||||
metricsServerAddress* {.
|
||||
desc: "Listening address of the metrics server.",
|
||||
defaultValue: parseIpAddress("127.0.0.1"),
|
||||
defaultValue:
|
||||
IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
|
||||
name: "metrics-server-address"
|
||||
.}: IpAddress
|
||||
|
||||
@ -194,7 +195,10 @@ type
|
||||
|
||||
dnsDiscoveryNameServers* {.
|
||||
desc: "DNS name server IPs to query. Argument may be repeated.",
|
||||
defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
|
||||
defaultValue: @[
|
||||
IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1]),
|
||||
IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 0, 0, 1]),
|
||||
],
|
||||
name: "dns-discovery-name-server"
|
||||
.}: seq[IpAddress]
|
||||
|
||||
|
||||
@ -133,7 +133,7 @@ type LiteProtocolTesterConf* = object
|
||||
## Tester REST service configuration
|
||||
restAddress* {.
|
||||
desc: "Listening address of the REST HTTP server.",
|
||||
defaultValue: parseIpAddress("127.0.0.1"),
|
||||
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
|
||||
name: "rest-address"
|
||||
.}: IpAddress
|
||||
|
||||
|
||||
@ -116,7 +116,7 @@ type NetworkMonitorConf* = object
|
||||
|
||||
metricsServerAddress* {.
|
||||
desc: "Listening address of the metrics server.",
|
||||
defaultValue: parseIpAddress("127.0.0.1"),
|
||||
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
|
||||
name: "metrics-server-address"
|
||||
.}: IpAddress
|
||||
|
||||
|
||||
25
channels/encryption/encryption.nim
Normal file
25
channels/encryption/encryption.nim
Normal file
@ -0,0 +1,25 @@
|
||||
## Optional encryption hooks for the Reliable Channel API.
|
||||
##
|
||||
## Modelled as `RequestBroker`s: the broker pattern lets the channel
|
||||
## delegate work to a provider that may live in any module without
|
||||
## introducing a direct dependency. If no provider is registered the
|
||||
## broker returns an error, so installing the noop providers from
|
||||
## `noop_encryption` is required when the application does not want
|
||||
## actual encryption.
|
||||
##
|
||||
## Applied per-segment after SDS processing on outgoing, and before
|
||||
## SDS processing on incoming. No specific scheme is mandated.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import brokers/request_broker
|
||||
|
||||
export request_broker
|
||||
|
||||
RequestBroker:
|
||||
type Encrypt* = seq[byte]
|
||||
proc signature*(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.}
|
||||
|
||||
RequestBroker:
|
||||
type Decrypt* = seq[byte]
|
||||
proc signature*(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.}
|
||||
18
channels/encryption/noop_encryption.nim
Normal file
18
channels/encryption/noop_encryption.nim
Normal file
@ -0,0 +1,18 @@
|
||||
## No-op encryption providers. Install these when the application does
|
||||
## not want actual encryption so the `Encrypt` / `Decrypt` brokers have
|
||||
## something to dispatch to.
|
||||
|
||||
import results
|
||||
import chronos
|
||||
import ./encryption
|
||||
|
||||
proc setNoopEncryption*() =
|
||||
discard Encrypt.setProvider(
|
||||
proc(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} =
|
||||
return ok(Encrypt(payload))
|
||||
)
|
||||
|
||||
discard Decrypt.setProvider(
|
||||
proc(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} =
|
||||
return ok(Decrypt(payload))
|
||||
)
|
||||
23
channels/events.nim
Normal file
23
channels/events.nim
Normal file
@ -0,0 +1,23 @@
|
||||
## Reliable Channel event types emitted to API consumers.
|
||||
##
|
||||
## Lifecycle events for individual segments (sent / propagated / errored)
|
||||
## are the same as the network-level ones the DeliveryService already
|
||||
## emits — `requestId` is shared across layers — so we just re-export
|
||||
## `waku/events/message_events` and avoid declaring duplicates.
|
||||
##
|
||||
## Only the channel-level `MessageReceivedEvent` carries data that has
|
||||
## no analogue in the lower layer (reassembled application payload,
|
||||
## senderId, channelId), so it lives here.
|
||||
|
||||
import waku/events/message_events as waku_message_events
|
||||
import brokers/event_broker
|
||||
|
||||
import ./types as channel_types
|
||||
|
||||
export waku_message_events, channel_types, event_broker
|
||||
|
||||
EventBroker:
|
||||
type ChannelMessageReceivedEvent* = object
|
||||
channelId*: ChannelId
|
||||
senderId*: SdsParticipantID
|
||||
payload*: seq[byte]
|
||||
80
channels/rate_limit_manager/rate_limit_manager.nim
Normal file
80
channels/rate_limit_manager/rate_limit_manager.nim
Normal file
@ -0,0 +1,80 @@
|
||||
## Rate Limit Manager for the Reliable Channel API.
|
||||
##
|
||||
## Tracks messages sent per RLN epoch and delays dispatch when the
|
||||
## limit is approached, ensuring RLN compliance on enforcing relays.
|
||||
##
|
||||
## For the skeleton this is a pass-through: messages are immediately
|
||||
## released as ready-to-send. Real epoch budgeting will be added later.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import std/times
|
||||
import message
|
||||
import brokers/event_broker
|
||||
import brokers/broker_context
|
||||
|
||||
export event_broker, broker_context
|
||||
export message.SdsChannelID
|
||||
|
||||
const
|
||||
DefaultEpochPeriodSec* = 600
|
||||
DefaultMessagesPerEpoch* = 1
|
||||
|
||||
EventBroker:
|
||||
## Emitted by `enqueueToSend` carrying the batch of opaque message
|
||||
## blobs that may now leave the rate limiter and continue down the
|
||||
## outgoing pipeline (encryption -> dispatch). Bytes only: the rate
|
||||
## limiter is intentionally agnostic of SDS, so anything serialisable
|
||||
## can flow through it.
|
||||
##
|
||||
## `channelId` lets listeners filter to their own channel, since all
|
||||
## reliable channels share the underlying Waku node's broker context.
|
||||
type ReadyToSendEvent* = object
|
||||
channelId*: SdsChannelID
|
||||
msgs*: seq[seq[byte]]
|
||||
|
||||
type
|
||||
RateLimitConfig* = object
|
||||
enabled*: bool ## spec: rate limiting opt-in; SHOULD be true when RLN active
|
||||
epochPeriodSec*: int
|
||||
messagesPerEpoch*: int
|
||||
|
||||
RateLimitManager* = ref object
|
||||
config*: RateLimitConfig
|
||||
queue*: seq[seq[byte]]
|
||||
currentEpochStart*: Time
|
||||
sentInCurrentEpoch*: int
|
||||
channelId*: SdsChannelID ## tag for the emitted `ReadyToSendEvent`
|
||||
brokerCtx: BrokerContext
|
||||
|
||||
proc new*(
|
||||
T: type RateLimitManager,
|
||||
config: RateLimitConfig,
|
||||
channelId: SdsChannelID,
|
||||
brokerCtx: BrokerContext = globalBrokerContext(),
|
||||
): T =
|
||||
return T(
|
||||
config: config,
|
||||
queue: @[],
|
||||
currentEpochStart: getTime(),
|
||||
sentInCurrentEpoch: 0,
|
||||
channelId: channelId,
|
||||
brokerCtx: brokerCtx,
|
||||
)
|
||||
|
||||
proc enqueueToSend*(self: RateLimitManager, msg: seq[byte]) =
|
||||
## Skeleton behaviour: enqueue and immediately release as a single
|
||||
## ready batch. Real per-epoch budgeting will park messages on
|
||||
## `self.queue` and emit only when the budget allows.
|
||||
ReadyToSendEvent.emit(
|
||||
self.brokerCtx, ReadyToSendEvent(channelId: self.channelId, msgs: @[msg])
|
||||
)
|
||||
|
||||
proc dequeueReady*(self: RateLimitManager): seq[seq[byte]] =
|
||||
## Returns the set of queued messages that may be dispatched now
|
||||
## without exceeding the configured rate limit.
|
||||
discard
|
||||
|
||||
proc resetEpoch*(self: RateLimitManager) =
|
||||
self.currentEpochStart = getTime()
|
||||
self.sentInCurrentEpoch = 0
|
||||
264
channels/reliable_channel.nim
Normal file
264
channels/reliable_channel.nim
Normal file
@ -0,0 +1,264 @@
|
||||
## Reliable Channel type.
|
||||
##
|
||||
## A `ReliableChannel` orchestrates segmentation, SDS (end-to-end
|
||||
## reliability), optional encryption, and rate-limited dispatch on top
|
||||
## of the Messaging API for a single channel.
|
||||
##
|
||||
## Outgoing pipeline: Segment -> SDS -> Rate Limit -> Encrypt -> Dispatch
|
||||
## Incoming pipeline: Decrypt -> SDS -> Reassemble -> Emit event
|
||||
##
|
||||
## Channels are owned by a `ReliableChannelManager`. Lifecycle and send
|
||||
## operations are addressed by `ChannelId`, so callers only need to keep
|
||||
## an opaque handle around.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import std/[options, tables]
|
||||
import results
|
||||
import chronos
|
||||
import bearssl/rand
|
||||
import stew/byteutils
|
||||
import libp2p/crypto/crypto as libp2p_crypto
|
||||
|
||||
import waku/node/delivery_service/delivery_service
|
||||
import waku/node/delivery_service/send_service
|
||||
import waku/waku_core/topics
|
||||
|
||||
import ./events
|
||||
import ./segmentation/segmentation
|
||||
import ./scalable_data_sync/scalable_data_sync
|
||||
import ./rate_limit_manager/rate_limit_manager
|
||||
import ./encryption/encryption
|
||||
|
||||
export
|
||||
delivery_service, send_service, events, segmentation, scalable_data_sync,
|
||||
rate_limit_manager, encryption
|
||||
|
||||
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
|
||||
## Wire-format spec marker for the Reliable Channel layer, as defined
|
||||
## in the reliable-channel-api LIP (`Wire Format / Spec Marker`).
|
||||
## A `WakuMessage` whose `meta` field does not equal these bytes is
|
||||
## not addressed to this layer and is silently dropped on ingress.
|
||||
## The trailing `/N` is the wire-format version and is bumped only
|
||||
## on breaking on-the-wire changes; implementations pin one version.
|
||||
|
||||
type ReliableChannel* = ref object
|
||||
## Spec-defined public type. Fields are private so callers cannot
|
||||
## mutate internals and break invariants. Getters are added below
|
||||
## for the few values consumers may need.
|
||||
deliveryService: DeliveryService
|
||||
channelId: ChannelId
|
||||
contentTopic: ContentTopic
|
||||
senderId: SdsParticipantID
|
||||
rng: ref HmacDrbgContext
|
||||
segmentation: SegmentationHandler
|
||||
sdsHandler: SdsHandler
|
||||
rateLimit: RateLimitManager
|
||||
|
||||
requestIds: Table[RequestId, seq[RequestId]]
|
||||
pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]]
|
||||
brokerCtx: BrokerContext
|
||||
## Captured here so the channel emits `ChannelMessageReceivedEvent`
|
||||
## on the same broker context the owning manager registered its
|
||||
## listeners on. Without this, an emit via `globalBrokerContext()`
|
||||
## would land on whatever context happens to be thread-local at
|
||||
## emit time, which is not necessarily the manager's.
|
||||
|
||||
func getChannelId*(self: ReliableChannel): ChannelId {.inline.} =
|
||||
self.channelId
|
||||
|
||||
func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} =
|
||||
self.contentTopic
|
||||
|
||||
func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} =
|
||||
self.senderId
|
||||
|
||||
proc onReadyToSend(
|
||||
self: ReliableChannel, msgs: seq[seq[byte]]
|
||||
) {.async: (raises: []).} =
|
||||
## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent`
|
||||
## listener once `rate_limit_manager` releases a batch of opaque
|
||||
## blobs (already-encoded SDS messages):
|
||||
##
|
||||
## ... -> rate_limit_manager -> [encryption] -> dispatch
|
||||
for m in msgs:
|
||||
## Each `m` was preceded by exactly one push onto `pendingRequests`
|
||||
## in `send`, so this pop is always safe in the current skeleton.
|
||||
let pending = self.pendingRequests[0]
|
||||
self.pendingRequests.delete(0)
|
||||
|
||||
## TODO: revisit which fields of the SDS message must be encrypted.
|
||||
## Encrypting the whole encoded blob forces every receiver to attempt
|
||||
## decryption before it can route, which breaks selective dispatch.
|
||||
## Leave routing metadata (channelId, causal-history references) in
|
||||
## clear and encrypt only the application payload.
|
||||
let encRes = await Encrypt.request(m)
|
||||
let encrypted = encRes.valueOr:
|
||||
MessageErrorEvent.emit(
|
||||
self.brokerCtx,
|
||||
MessageErrorEvent(
|
||||
requestId: pending.parent,
|
||||
messageHash: "",
|
||||
error: "encryption failed: " & error,
|
||||
),
|
||||
)
|
||||
continue
|
||||
let wireBytes = seq[byte](encrypted)
|
||||
|
||||
let envelope = MessageEnvelope(
|
||||
contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral
|
||||
)
|
||||
|
||||
let deliveryReqId = RequestId.new(self.rng)
|
||||
let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr:
|
||||
## TODO: emit waku `MessageErrorEvent` for the parent request id.
|
||||
continue
|
||||
|
||||
## Stamp the Reliable Channel wire-format spec marker so the ingress
|
||||
## side of any peer can route this WakuMessage to its Reliable
|
||||
## Channel layer. Done on the constructed WakuMessage rather than
|
||||
## via the envelope because `MessageEnvelope` does not expose a
|
||||
## `meta` field.
|
||||
deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes()
|
||||
|
||||
asyncSpawn self.deliveryService.sendService.send(deliveryTask)
|
||||
self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId)
|
||||
|
||||
proc send*(
|
||||
self: ReliableChannel, payload: seq[byte], ephemeral: bool = false
|
||||
): Result[RequestId, string] =
|
||||
## Single application-level send. The first three stages of the
|
||||
## outgoing pipeline are chained explicitly so the flow is visible
|
||||
## at a glance:
|
||||
##
|
||||
## segmentation -> sds -> rate_limit_manager
|
||||
##
|
||||
## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with
|
||||
## the SDS messages cleared for transmission; the channel's listener
|
||||
## then runs the final stage (encryption -> dispatch). The `ephemeral`
|
||||
## flag is carried alongside each segment in `pendingRequests` and
|
||||
## stamped onto the eventual `MessageEnvelope`.
|
||||
##
|
||||
## The returned `RequestId` is the parent of one-or-more
|
||||
## delivery-service `RequestId`s; the mapping is recorded in
|
||||
## `self.requestIds`.
|
||||
if payload.len == 0:
|
||||
return err("empty payload")
|
||||
|
||||
let parentReqId = RequestId.new(self.rng)
|
||||
self.requestIds[parentReqId] = @[]
|
||||
|
||||
for segmentBytes in self.segmentation.performSegmentation(payload):
|
||||
## Segments arrive already encoded; the segmentation module owns
|
||||
## the wire format so SDS only ever sees opaque bytes.
|
||||
let sdsBytes = self.sdsHandler.wrapOutgoing(
|
||||
self.channelId, self.senderId, segmentBytes
|
||||
).valueOr:
|
||||
return err("SDS wrap failed: " & error)
|
||||
self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral))
|
||||
self.rateLimit.enqueueToSend(sdsBytes)
|
||||
|
||||
return ok(parentReqId)
|
||||
|
||||
proc onMessageReceived(
|
||||
self: ReliableChannel, messageHash: string, payload: seq[byte]
|
||||
) {.async: (raises: []).} =
|
||||
## Ingress pipeline made visible:
|
||||
##
|
||||
## payload -> decrypt -> sds -> reassemble -> emit
|
||||
##
|
||||
## Invoked from this channel's `MessageReceivedEvent` listener, which
|
||||
## already filtered on the spec marker and on `contentTopic`. The
|
||||
## channel only sees the raw payload bytes for itself.
|
||||
|
||||
## Notice that the following "request" is implemented implicitly as a broker call to
|
||||
## the `Decrypt` request broker.
|
||||
let decRes = await Decrypt.request(payload)
|
||||
let plaintext = decRes.valueOr:
|
||||
MessageErrorEvent.emit(
|
||||
self.brokerCtx,
|
||||
MessageErrorEvent(
|
||||
requestId: RequestId(""),
|
||||
messageHash: messageHash,
|
||||
error: "decryption failed: " & error,
|
||||
),
|
||||
)
|
||||
return
|
||||
let plaintextBytes = seq[byte](plaintext)
|
||||
|
||||
let unwrapped = self.sdsHandler.handleIncoming(plaintextBytes)
|
||||
if unwrapped.isErr():
|
||||
return
|
||||
|
||||
let reassembled = self.segmentation.handleIncomingSegment(unwrapped.get().content)
|
||||
if reassembled.isSome():
|
||||
## Emit on the captured `brokerCtx` (the manager's), so the
|
||||
## application listener that the manager has set up on that same
|
||||
## context picks the event up.
|
||||
ChannelMessageReceivedEvent.emit(
|
||||
self.brokerCtx,
|
||||
ChannelMessageReceivedEvent(
|
||||
channelId: self.channelId,
|
||||
senderId: self.senderId,
|
||||
payload: reassembled.get().payload,
|
||||
),
|
||||
)
|
||||
|
||||
proc new*(
|
||||
T: type ReliableChannel,
|
||||
deliveryService: DeliveryService,
|
||||
channelId: ChannelId,
|
||||
contentTopic: ContentTopic,
|
||||
senderId: SdsParticipantID,
|
||||
segConfig: SegmentationConfig,
|
||||
sdsConfig: SdsConfig,
|
||||
rateConfig: RateLimitConfig,
|
||||
brokerCtx: BrokerContext = globalBrokerContext(),
|
||||
): T =
|
||||
## Pipeline handlers (segmentation/SDS/rate-limit) are constructed
|
||||
## inside the channel rather than handed in by the caller — they are
|
||||
## implementation details of the channel, not knobs the API consumer
|
||||
## should be wiring up. Encryption is delegated to the `Encrypt`/
|
||||
## `Decrypt` request brokers, so the channel keeps no per-instance
|
||||
## encryption state either.
|
||||
let chn = T(
|
||||
deliveryService: deliveryService,
|
||||
channelId: channelId,
|
||||
contentTopic: contentTopic,
|
||||
senderId: senderId,
|
||||
rng: libp2p_crypto.newRng(),
|
||||
segmentation: SegmentationHandler.new(segConfig),
|
||||
sdsHandler: SdsHandler.new(sdsConfig, senderId),
|
||||
rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx),
|
||||
requestIds: initTable[RequestId, seq[RequestId]](),
|
||||
pendingRequests: @[],
|
||||
brokerCtx: brokerCtx,
|
||||
)
|
||||
|
||||
## Each channel owns its own egress + ingress listeners on
|
||||
## `chn.brokerCtx`, filtered to traffic addressed to this channel.
|
||||
## Keeping the listeners (and the procs they call) inside the
|
||||
## channel lets `onReadyToSend` and `onMessageReceived` stay private
|
||||
## — the manager doesn't need to know about them.
|
||||
discard ReadyToSendEvent.listen(
|
||||
chn.brokerCtx,
|
||||
proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} =
|
||||
if evt.channelId == chn.channelId:
|
||||
await chn.onReadyToSend(evt.msgs)
|
||||
,
|
||||
)
|
||||
|
||||
discard MessageReceivedEvent.listen(
|
||||
chn.brokerCtx,
|
||||
proc(evt: MessageReceivedEvent): Future[void] {.async: (raises: []).} =
|
||||
## Drop foreign traffic (non-Reliable-Channel `meta`) and traffic
|
||||
## for other channels before doing any decode work.
|
||||
if string.fromBytes(evt.message.meta) != LipWireReliableChannelVersion:
|
||||
return
|
||||
if evt.message.contentTopic != chn.contentTopic:
|
||||
return
|
||||
await chn.onMessageReceived(evt.messageHash, evt.message.payload)
|
||||
,
|
||||
)
|
||||
|
||||
return chn
|
||||
138
channels/reliable_channel_manager.nim
Normal file
138
channels/reliable_channel_manager.nim
Normal file
@ -0,0 +1,138 @@
|
||||
## Reliable Channel API entry point.
|
||||
##
|
||||
## Owns the set of `ReliableChannel` instances and exposes lifecycle and
|
||||
## send/receive operations addressed by `ChannelId`.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import std/tables
|
||||
import results
|
||||
import chronos
|
||||
import stew/byteutils
|
||||
|
||||
import waku/api/api
|
||||
import waku/api/api_conf
|
||||
import waku/events/message_events as waku_message_events
|
||||
import waku/factory/waku as waku_factory
|
||||
import waku/waku_core/topics
|
||||
|
||||
import ./reliable_channel
|
||||
import ./encryption/noop_encryption
|
||||
|
||||
export reliable_channel
|
||||
|
||||
type ReliableChannelManager* = ref object
|
||||
channels: Table[ChannelId, ReliableChannel]
|
||||
deliveryService: DeliveryService
|
||||
## Owned by the manager. The ownership chain is
|
||||
## ReliableChannelManager -> DeliveryService -> Waku -> WakuNode.
|
||||
## Hidden so callers can't substitute their own and bypass the
|
||||
## manager's pipeline.
|
||||
brokerCtx: BrokerContext
|
||||
|
||||
proc new*(
|
||||
T: type ReliableChannelManager,
|
||||
conf: WakuNodeConf,
|
||||
brokerCtx: BrokerContext = globalBrokerContext(),
|
||||
): Future[Result[T, string]] {.async.} =
|
||||
## TODO !! The proper ownership chain is:
|
||||
## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode,
|
||||
## and this will be implemented in the future. For now, `createNode`
|
||||
## is called here to get a DeliveryService instance, and the WakuNode is immediately discarded.
|
||||
## This is a temporary workaround to get the API
|
||||
|
||||
let waku = ?(await createNode(conf))
|
||||
|
||||
let manager = T(
|
||||
channels: initTable[ChannelId, ReliableChannel](),
|
||||
deliveryService: waku.deliveryService,
|
||||
brokerCtx: brokerCtx,
|
||||
)
|
||||
|
||||
return ok(manager)
|
||||
|
||||
proc start*(self: ReliableChannelManager): Result[void, string] =
|
||||
## Bring the owned DeliveryService up. Separated from `new` so callers
|
||||
## can register encryption providers / create channels before traffic
|
||||
## starts flowing.
|
||||
self.deliveryService.startDeliveryService()
|
||||
|
||||
proc stop*(self: ReliableChannelManager) {.async.} =
|
||||
if not self.deliveryService.isNil():
|
||||
await self.deliveryService.stopDeliveryService()
|
||||
|
||||
proc createReliableChannel*(
|
||||
self: ReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
contentTopic: ContentTopic,
|
||||
senderId: SdsParticipantID,
|
||||
): Result[ChannelId, string] =
|
||||
## Spec entry point. The `DeliveryService` and `rng` the channel needs
|
||||
## are sourced from the owning `ReliableChannelManager` rather than
|
||||
## passed per call. Encryption is wired up through the `Encrypt`/
|
||||
## `Decrypt` request brokers — the application installs its own
|
||||
## providers (or `setNoopEncryption()`) before traffic flows.
|
||||
##
|
||||
## Segmentation, SDS and rate-limit configs will eventually be read
|
||||
## from the node's `NodeConfig`. Defaults for now.
|
||||
if self.channels.hasKey(channelId):
|
||||
return err("channel already exists: " & channelId)
|
||||
|
||||
let segConfig = SegmentationConfig(
|
||||
segmentSizeBytes: DefaultSegmentSizeBytes,
|
||||
enableReedSolomon: false,
|
||||
persistence: nil,
|
||||
)
|
||||
let sdsConfig = SdsConfig(
|
||||
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
|
||||
maxRetransmissions: DefaultMaxRetransmissions,
|
||||
causalHistorySize: DefaultCausalHistorySize,
|
||||
persistence: nil,
|
||||
)
|
||||
let rateConfig = RateLimitConfig(
|
||||
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
|
||||
)
|
||||
|
||||
let chn = ReliableChannel.new(
|
||||
deliveryService = self.deliveryService,
|
||||
channelId = channelId,
|
||||
contentTopic = contentTopic,
|
||||
senderId = senderId,
|
||||
segConfig = segConfig,
|
||||
sdsConfig = sdsConfig,
|
||||
rateConfig = rateConfig,
|
||||
brokerCtx = self.brokerCtx,
|
||||
)
|
||||
|
||||
self.channels[channelId] = chn
|
||||
return ok(channelId)
|
||||
|
||||
proc closeChannel*(
|
||||
self: ReliableChannelManager, channelId: ChannelId
|
||||
): Result[void, string] =
|
||||
## Flush state, persist outstanding SDS buffers, release resources.
|
||||
if not self.channels.hasKey(channelId):
|
||||
return err("unknown channel: " & channelId)
|
||||
self.channels.del(channelId)
|
||||
return ok()
|
||||
|
||||
proc send*(
|
||||
self: ReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
appPayload: seq[byte],
|
||||
ephemeral: bool = false,
|
||||
): Result[RequestId, string] =
|
||||
## Spec-level entry point. Looks the channel up by id and delegates
|
||||
## to `ReliableChannel.send`, which exposes the visible pipeline
|
||||
## segmentation -> sds -> rate_limit_manager -> encryption.
|
||||
let chn = self.channels.getOrDefault(channelId)
|
||||
if chn.isNil():
|
||||
return err("unknown channel: " & channelId)
|
||||
return chn.send(appPayload, ephemeral)
|
||||
|
||||
## Inbound messages are not handed to the manager by direct call. Each
|
||||
## `ReliableChannel` installs its own `MessageReceivedEvent` listener
|
||||
## in `ReliableChannel.new`, filters by spec marker and `contentTopic`,
|
||||
## and routes to its private `onMessageReceived`. This keeps the lower
|
||||
## layer (MessagingAPI/Waku) unaware of the existence of ReliableChannel
|
||||
## and keeps the manager out of per-channel event dispatch.
|
||||
62
channels/scalable_data_sync/scalable_data_sync.nim
Normal file
62
channels/scalable_data_sync/scalable_data_sync.nim
Normal file
@ -0,0 +1,62 @@
|
||||
## Scalable Data Sync (SDS) component for the Reliable Channel API.
|
||||
##
|
||||
## Provides end-to-end delivery guarantees via causal history tracking,
|
||||
## acknowledgements, and retransmission of unacknowledged segments.
|
||||
##
|
||||
## Skeleton: `wrapOutgoing` and `handleIncoming` are pass-throughs so
|
||||
## the send/receive circuit can exercise the surrounding pipeline.
|
||||
## Real SDS wrapping will plug in via `nim-sds` later.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import results
|
||||
import message as sds_message
|
||||
|
||||
import ./sds_persistence
|
||||
|
||||
export sds_message, sds_persistence
|
||||
|
||||
const
|
||||
DefaultAcknowledgementTimeoutMs* = 5_000
|
||||
DefaultMaxRetransmissions* = 5
|
||||
DefaultCausalHistorySize* = 2
|
||||
|
||||
type
|
||||
SdsConfig* = object
|
||||
acknowledgementTimeoutMs*: int
|
||||
maxRetransmissions*: int
|
||||
causalHistorySize*: int
|
||||
persistence*: SdsPersistence
|
||||
|
||||
SdsHandler* = ref object
|
||||
config*: SdsConfig
|
||||
participantId*: SdsParticipantID
|
||||
|
||||
proc new*(
|
||||
T: type SdsHandler,
|
||||
config: SdsConfig,
|
||||
participantId: SdsParticipantID = SdsParticipantID(""),
|
||||
): T =
|
||||
return T(config: config, participantId: participantId)
|
||||
|
||||
proc wrapOutgoing*(
|
||||
self: SdsHandler,
|
||||
channelId: SdsChannelID,
|
||||
senderId: SdsParticipantID,
|
||||
payload: seq[byte],
|
||||
): Result[seq[byte], string] =
|
||||
## Stage 2 of the outgoing pipeline (segmentation -> sds -> rate_limit_manager -> encryption).
|
||||
## Skeleton: pass the encoded segment through unchanged. Real causal
|
||||
## history / lamport / bloom-filter population will replace this.
|
||||
return ok(payload)
|
||||
|
||||
proc handleIncoming*(
|
||||
self: SdsHandler, msg: seq[byte]
|
||||
): Result[tuple[content: seq[byte], channelId: SdsChannelID], string] =
|
||||
## Skeleton: pass the bytes through; channel id is left empty until
|
||||
## the real wire format provides it.
|
||||
return ok((content: msg, channelId: SdsChannelID("")))
|
||||
|
||||
proc tickRetransmissions*(self: SdsHandler) =
|
||||
## Drives retransmissions of unacknowledged messages.
|
||||
discard
|
||||
25
channels/scalable_data_sync/sds_persistence.nim
Normal file
25
channels/scalable_data_sync/sds_persistence.nim
Normal file
@ -0,0 +1,25 @@
|
||||
## Persistence backend for SDS outgoing buffer and causal history.
|
||||
##
|
||||
## TODO (raised in PR review): this surface is duplicating concerns that
|
||||
## should come from the SDS module itself. Once the SDS module exposes a
|
||||
## complete persistence contract, drop this file and import that surface
|
||||
## instead of re-declaring it here.
|
||||
|
||||
import message
|
||||
|
||||
type
|
||||
SdsPersistenceKind* {.pure.} = enum
|
||||
InMemory
|
||||
Sqlite
|
||||
|
||||
SdsPersistence* = ref object of RootObj
|
||||
kind*: SdsPersistenceKind
|
||||
|
||||
method storeOutgoing*(self: SdsPersistence, msg: SdsMessage) {.base.} =
|
||||
discard
|
||||
|
||||
method markAcknowledged*(self: SdsPersistence, messageId: SdsMessageID) {.base.} =
|
||||
discard
|
||||
|
||||
method unackedOlderThan*(self: SdsPersistence, ageMs: int): seq[SdsMessage] {.base.} =
|
||||
discard
|
||||
34
channels/segmentation/segment_message_proto.nim
Normal file
34
channels/segmentation/segment_message_proto.nim
Normal file
@ -0,0 +1,34 @@
|
||||
## Wire format for a single segment, per the Reliable Channel API spec.
|
||||
##
|
||||
## Skeleton: encode/decode treat the segment as just its payload bytes,
|
||||
## since for now we only ever produce a single segment per send.
|
||||
|
||||
type SegmentMessageProto* = object
|
||||
entireMessageHash*: seq[byte] ## Keccak256(original payload), 32 bytes
|
||||
dataSegmentIndex*: uint32 ## zero-indexed sequence number for data segments
|
||||
dataSegmentCount*: uint32 ## number of data segments (>= 1)
|
||||
payload*: seq[byte] ## segment payload (data or parity shard)
|
||||
paritySegmentIndex*: uint32 ## zero-based sequence number for parity segments
|
||||
paritySegmentCount*: uint32 ## number of parity segments
|
||||
isParity*: bool ## true for parity segments, false (default) for data segments
|
||||
|
||||
proc isParityMessage*(self: SegmentMessageProto): bool =
|
||||
self.isParity
|
||||
|
||||
proc isValid*(self: SegmentMessageProto): bool =
|
||||
## Validates hash length (32 bytes), segment indices and counts.
|
||||
discard
|
||||
|
||||
proc encode*(self: SegmentMessageProto): seq[byte] =
|
||||
self.payload
|
||||
|
||||
proc decode*(T: type SegmentMessageProto, buf: seq[byte]): T =
|
||||
T(
|
||||
entireMessageHash: @[],
|
||||
dataSegmentIndex: 0,
|
||||
dataSegmentCount: 1,
|
||||
payload: buf,
|
||||
paritySegmentIndex: 0,
|
||||
paritySegmentCount: 0,
|
||||
isParity: false,
|
||||
)
|
||||
70
channels/segmentation/segmentation.nim
Normal file
70
channels/segmentation/segmentation.nim
Normal file
@ -0,0 +1,70 @@
|
||||
## Segmentation component for the Reliable Channel API.
|
||||
##
|
||||
## Splits large application payloads into transmittable segments and
|
||||
## reassembles them on reception. Supports optional Reed-Solomon parity
|
||||
## segments for loss recovery, as per the Reliable Channel API spec.
|
||||
##
|
||||
## For the skeleton everything fits in a single segment: real chunking
|
||||
## and Reed-Solomon parity will be plugged in later.
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import std/options
|
||||
import ./segment_message_proto
|
||||
import ./segmentation_persistence
|
||||
|
||||
export segment_message_proto, segmentation_persistence
|
||||
|
||||
const
|
||||
DefaultSegmentSizeBytes* = 102_400
|
||||
SegmentsParityRate* = 0.125
|
||||
SegmentsReedSolomonMaxCount* = 256
|
||||
|
||||
type
|
||||
SegmentationConfig* = object
|
||||
segmentSizeBytes*: int
|
||||
enableReedSolomon*: bool
|
||||
persistence*: SegmentationPersistence
|
||||
|
||||
SegmentationHandler* = ref object
|
||||
config*: SegmentationConfig
|
||||
|
||||
ReassemblyResult* = object
|
||||
payload*: seq[byte]
|
||||
entireMessageHash*: seq[byte]
|
||||
|
||||
proc new*(T: type SegmentationHandler, config: SegmentationConfig): T =
|
||||
return T(config: config)
|
||||
|
||||
proc performSegmentation*(
|
||||
self: SegmentationHandler, payload: seq[byte]
|
||||
): seq[seq[byte]] =
|
||||
## Skeleton behaviour: emit exactly one segment carrying the whole
|
||||
## payload. Real chunking and Reed-Solomon parity will replace this.
|
||||
let segment = SegmentMessageProto(
|
||||
entireMessageHash: @[],
|
||||
dataSegmentIndex: 0,
|
||||
dataSegmentCount: 1,
|
||||
payload: payload,
|
||||
paritySegmentIndex: 0,
|
||||
paritySegmentCount: 0,
|
||||
isParity: false,
|
||||
)
|
||||
return @[segment.encode()]
|
||||
|
||||
proc handleIncomingSegment*(
|
||||
self: SegmentationHandler, segmentBytes: seq[byte]
|
||||
): Option[ReassemblyResult] =
|
||||
## Skeleton behaviour: every segment is already a complete message
|
||||
## (since `performSegmentation` always emits one), so just hand the
|
||||
## payload straight back.
|
||||
let segment = SegmentMessageProto.decode(segmentBytes)
|
||||
return some(
|
||||
ReassemblyResult(
|
||||
payload: segment.payload, entireMessageHash: segment.entireMessageHash
|
||||
)
|
||||
)
|
||||
|
||||
proc cleanupSegments*(self: SegmentationHandler) =
|
||||
## Drop expired partial-reassembly state.
|
||||
discard
|
||||
20
channels/segmentation/segmentation_persistence.nim
Normal file
20
channels/segmentation/segmentation_persistence.nim
Normal file
@ -0,0 +1,20 @@
|
||||
## Persistence backend interface for segmentation reassembly state.
|
||||
##
|
||||
## Allows partial reassembly state to survive process restarts.
|
||||
|
||||
type
|
||||
SegmentationPersistenceKind* {.pure.} = enum
|
||||
InMemory
|
||||
Sqlite
|
||||
|
||||
SegmentationPersistence* = ref object of RootObj
|
||||
kind*: SegmentationPersistenceKind
|
||||
|
||||
method put*(self: SegmentationPersistence, key: seq[byte], value: seq[byte]) {.base.} =
|
||||
discard
|
||||
|
||||
method get*(self: SegmentationPersistence, key: seq[byte]): seq[byte] {.base.} =
|
||||
discard
|
||||
|
||||
method delete*(self: SegmentationPersistence, key: seq[byte]) {.base.} =
|
||||
discard
|
||||
15
channels/types.nim
Normal file
15
channels/types.nim
Normal file
@ -0,0 +1,15 @@
|
||||
## Core identifier types for the Reliable Channel API.
|
||||
|
||||
import std/hashes
|
||||
import waku/api/types as api_types
|
||||
|
||||
import ./scalable_data_sync/scalable_data_sync
|
||||
|
||||
export scalable_data_sync
|
||||
export api_types
|
||||
|
||||
type ChannelId* = SdsChannelID
|
||||
|
||||
proc hash*(r: RequestId): Hash =
|
||||
## Allows `RequestId` to be used as a `Table` key.
|
||||
hash(string(r))
|
||||
@ -89,8 +89,15 @@
|
||||
inherit zerokitRln;
|
||||
gitVersion = "v${nimbleVersion}-g${builtins.substring 0 6 shortRev}";
|
||||
};
|
||||
|
||||
wakucanary = pkgs.callPackage ./nix/default.nix {
|
||||
inherit pkgs;
|
||||
src = ./.;
|
||||
targets = ["wakucanary"];
|
||||
zerokitRln = zerokit.packages.${system}.rln;
|
||||
};
|
||||
in {
|
||||
inherit liblogosdelivery;
|
||||
inherit liblogosdelivery wakucanary;
|
||||
# Expose the cargoHash-corrected librln so downstream consumers
|
||||
# (e.g. logos-delivery-module) bundle the exact same librln this
|
||||
# build links, instead of pulling zerokit's rln directly — whose
|
||||
|
||||
@ -250,7 +250,7 @@
|
||||
},
|
||||
"confutils": {
|
||||
"version": "0.1.0",
|
||||
"vcsRevision": "7728f6bd81a1eedcfe277d02ea85fdb805bcc05a",
|
||||
"vcsRevision": "36f3115ca350f40841ac0eecc7dfa5fe7790c864",
|
||||
"url": "https://github.com/status-im/nim-confutils",
|
||||
"downloadMethod": "git",
|
||||
"dependencies": [
|
||||
@ -260,7 +260,7 @@
|
||||
"results"
|
||||
],
|
||||
"checksums": {
|
||||
"sha1": "8bc8c30b107fdba73b677e5f257c6c42ae1cdc8e"
|
||||
"sha1": "2fbe6418ddd9f79fb11a0addd7666a3e787adbe0"
|
||||
}
|
||||
},
|
||||
"cbor_serialization": {
|
||||
|
||||
149
nix/default.nix
149
nix/default.nix
@ -1,6 +1,7 @@
|
||||
{ pkgs
|
||||
, src
|
||||
, zerokitRln
|
||||
, targets ? []
|
||||
, gitVersion ? "n/a"
|
||||
, enablePostgres ? true
|
||||
, enableNimDebugDlOpen ? true
|
||||
@ -10,6 +11,8 @@
|
||||
let
|
||||
deps = import ./deps.nix { inherit pkgs; };
|
||||
|
||||
buildWakucanary = builtins.elem "wakucanary" targets;
|
||||
|
||||
nimDefineArgs = pkgs.lib.concatStringsSep " \\\n " (
|
||||
[ "--define:disable_libbacktrace"
|
||||
"--define:git_version=${gitVersion}" ]
|
||||
@ -34,9 +37,29 @@ let
|
||||
if pkgs.stdenv.hostPlatform.isWindows then "dll"
|
||||
else if pkgs.stdenv.hostPlatform.isDarwin then "dylib"
|
||||
else "so";
|
||||
|
||||
# Shared `nim c` invocation. Callers vary the output, the source file and a
|
||||
# few mode-specific flags (e.g. --app:lib, --noMain, --header); everything
|
||||
# else (paths, defines, threading, gc, nimcache, rln linkage) is constant.
|
||||
# $NAT_TRAV and $NIMCACHE are shell variables defined in buildPhase.
|
||||
nimCompile = { outFile, sourceFile, extraArgs ? [] }: ''
|
||||
nim c \
|
||||
--noNimblePath \
|
||||
${pathArgs} \
|
||||
--path:$NAT_TRAV \
|
||||
--path:$NAT_TRAV/src \
|
||||
--passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \
|
||||
${nimDefineArgs} \
|
||||
--threads:on \
|
||||
--mm:refc \
|
||||
--nimcache:$NIMCACHE \
|
||||
--out:${outFile} \
|
||||
${pkgs.lib.concatStringsSep " \\\n " extraArgs} \
|
||||
${sourceFile}
|
||||
'';
|
||||
in
|
||||
pkgs.stdenv.mkDerivation {
|
||||
pname = "liblogosdelivery";
|
||||
pname = if buildWakucanary then "wakucanary" else "liblogosdelivery";
|
||||
version = "dev";
|
||||
|
||||
inherit src;
|
||||
@ -71,45 +94,47 @@ pkgs.stdenv.mkDerivation {
|
||||
make -C $NAT_TRAV/vendor/libnatpmp-upstream \
|
||||
CFLAGS="-Wall -Os -fPIC -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4" libnatpmp.a
|
||||
|
||||
${if buildWakucanary then ''
|
||||
echo "== Building wakucanary =="
|
||||
${nimCompile {
|
||||
outFile = "build/wakucanary";
|
||||
sourceFile = "apps/wakucanary/wakucanary.nim";
|
||||
extraArgs = [ "--path:." ];
|
||||
}}
|
||||
'' else ''
|
||||
echo "== Building liblogosdelivery (dynamic) =="
|
||||
nim c \
|
||||
--noNimblePath \
|
||||
${pathArgs} \
|
||||
--path:$NAT_TRAV \
|
||||
--path:$NAT_TRAV/src \
|
||||
--passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \
|
||||
${nimDefineArgs} \
|
||||
--out:build/liblogosdelivery.${libExt} \
|
||||
--app:lib \
|
||||
--threads:on \
|
||||
--opt:size \
|
||||
--noMain \
|
||||
--mm:refc \
|
||||
--header \
|
||||
--nimMainPrefix:liblogosdelivery \
|
||||
--nimcache:$NIMCACHE \
|
||||
liblogosdelivery/liblogosdelivery.nim
|
||||
${nimCompile {
|
||||
outFile = "build/liblogosdelivery.${libExt}";
|
||||
sourceFile = "liblogosdelivery/liblogosdelivery.nim";
|
||||
extraArgs = [
|
||||
"--app:lib"
|
||||
"--opt:size"
|
||||
"--noMain"
|
||||
"--header"
|
||||
"--nimMainPrefix:liblogosdelivery"
|
||||
];
|
||||
}}
|
||||
|
||||
echo "== Building liblogosdelivery (static) =="
|
||||
nim c \
|
||||
--noNimblePath \
|
||||
${pathArgs} \
|
||||
--path:$NAT_TRAV \
|
||||
--path:$NAT_TRAV/src \
|
||||
--passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \
|
||||
${nimDefineArgs} \
|
||||
--out:build/liblogosdelivery.a \
|
||||
--app:staticlib \
|
||||
--threads:on \
|
||||
--opt:size \
|
||||
--noMain \
|
||||
--mm:refc \
|
||||
--nimMainPrefix:liblogosdelivery \
|
||||
--nimcache:$NIMCACHE \
|
||||
liblogosdelivery/liblogosdelivery.nim
|
||||
${nimCompile {
|
||||
outFile = "build/liblogosdelivery.a";
|
||||
sourceFile = "liblogosdelivery/liblogosdelivery.nim";
|
||||
extraArgs = [
|
||||
"--app:staticlib"
|
||||
"--opt:size"
|
||||
"--noMain"
|
||||
"--nimMainPrefix:liblogosdelivery"
|
||||
];
|
||||
}}
|
||||
''}
|
||||
'';
|
||||
|
||||
installPhase = ''
|
||||
installPhase = if buildWakucanary then ''
|
||||
runHook preInstall
|
||||
mkdir -p $out/bin $out/lib
|
||||
cp build/wakucanary $out/bin/
|
||||
runHook postInstall
|
||||
'' else ''
|
||||
runHook preInstall
|
||||
mkdir -p $out/lib $out/include
|
||||
cp build/liblogosdelivery.${libExt} $out/lib/ 2>/dev/null || true
|
||||
@ -118,21 +143,47 @@ pkgs.stdenv.mkDerivation {
|
||||
runHook postInstall
|
||||
'';
|
||||
|
||||
# Bundle librln alongside liblogosdelivery so the output is self-contained.
|
||||
# Bundle librln alongside the produced artifact so the output is self-contained.
|
||||
# Use --add-rpath (not --set-rpath) so fixupPhase's stdenv RUNPATH injection
|
||||
# for libstdc++ is preserved.
|
||||
postInstall =
|
||||
pkgs.lib.optionalString pkgs.stdenv.isDarwin ''
|
||||
cp ${zerokitRln}/lib/librln.dylib $out/lib/
|
||||
chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib
|
||||
install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib
|
||||
install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib
|
||||
old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln)
|
||||
install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib
|
||||
install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib
|
||||
''
|
||||
+ pkgs.lib.optionalString pkgs.stdenv.isLinux ''
|
||||
cp ${zerokitRln}/lib/librln.so $out/lib/
|
||||
patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so
|
||||
'';
|
||||
if buildWakucanary then
|
||||
pkgs.lib.optionalString pkgs.stdenv.isDarwin ''
|
||||
cp ${zerokitRln}/lib/librln.dylib $out/lib/
|
||||
chmod +w $out/lib/librln.dylib $out/bin/wakucanary
|
||||
install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib
|
||||
old=$(otool -L $out/bin/wakucanary | awk 'NR>1{print $1}' | grep librln || true)
|
||||
if [ -n "$old" ]; then
|
||||
install_name_tool -change "$old" @rpath/librln.dylib $out/bin/wakucanary
|
||||
fi
|
||||
install_name_tool -add_rpath @loader_path/../lib $out/bin/wakucanary
|
||||
''
|
||||
+ pkgs.lib.optionalString pkgs.stdenv.isLinux ''
|
||||
cp ${zerokitRln}/lib/librln.so $out/lib/
|
||||
patchelf --add-rpath '$ORIGIN/../lib' $out/bin/wakucanary
|
||||
''
|
||||
else
|
||||
pkgs.lib.optionalString pkgs.stdenv.isDarwin ''
|
||||
cp ${zerokitRln}/lib/librln.dylib $out/lib/
|
||||
chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib
|
||||
install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib
|
||||
install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib
|
||||
old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln)
|
||||
install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib
|
||||
install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib
|
||||
''
|
||||
+ pkgs.lib.optionalString pkgs.stdenv.isLinux ''
|
||||
cp ${zerokitRln}/lib/librln.so $out/lib/
|
||||
patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so
|
||||
'';
|
||||
|
||||
meta = with pkgs.lib; {
|
||||
description =
|
||||
if buildWakucanary
|
||||
then "Waku network canary tool"
|
||||
else "logos-delivery shared/static library";
|
||||
homepage = "https://github.com/logos-messaging/logos-delivery";
|
||||
license = licenses.mit;
|
||||
platforms = platforms.unix;
|
||||
};
|
||||
}
|
||||
|
||||
@ -124,8 +124,8 @@
|
||||
|
||||
confutils = pkgs.fetchgit {
|
||||
url = "https://github.com/status-im/nim-confutils";
|
||||
rev = "7728f6bd81a1eedcfe277d02ea85fdb805bcc05a";
|
||||
sha256 = "18bj1ilx10jm2vmqx2wy2xl9rzy7alymi2m4n9jgpa4sbxnfh0x3";
|
||||
rev = "36f3115ca350f40841ac0eecc7dfa5fe7790c864";
|
||||
sha256 = "1vppqplwlpl7a61r8iki5hlzvhd8lnq41ixpqslv35dnm482c55j";
|
||||
fetchSubmodules = true;
|
||||
};
|
||||
|
||||
|
||||
@ -17,26 +17,36 @@ if [ -z "${NIM_VERSION}" ]; then
|
||||
exit 1
|
||||
fi
|
||||
|
||||
# Check if the right version is already installed
|
||||
NIM_DEST="${HOME}/.nim/nim-${NIM_VERSION}"
|
||||
|
||||
# 1. A matching Nim is already on PATH (e.g. provided by CI's setup-nim-action,
|
||||
# choosenim, or a previous run of this script). Use it as-is: installing over it
|
||||
# would symlink a freshly downloaded Nim into ~/.nimble/bin (first on PATH) and
|
||||
# shadow a known-good toolchain, which has caused C-backend build failures.
|
||||
nim_ver=$(nim --version 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1 || true)
|
||||
if [ "${nim_ver}" = "${NIM_VERSION}" ]; then
|
||||
echo "Nim ${NIM_VERSION} already installed, skipping."
|
||||
echo "Nim ${NIM_VERSION} already on PATH ($(command -v nim)), skipping install."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# 2. Already installed at our expected location from a previous run, but not on PATH.
|
||||
# Re-link binaries into ~/.nimble/bin.
|
||||
if [ -f "${NIM_DEST}/lib/system.nim" ]; then
|
||||
echo "Nim ${NIM_VERSION} already installed at ${NIM_DEST}, re-linking binaries."
|
||||
mkdir -p "${HOME}/.nimble/bin"
|
||||
for bin_path in "${NIM_DEST}/bin/"*; do
|
||||
ln -sf "${bin_path}" "${HOME}/.nimble/bin/$(basename "${bin_path}")"
|
||||
done
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if [ -n "${nim_ver}" ]; then
|
||||
newer=$(printf '%s\n%s\n' "${NIM_VERSION}" "${nim_ver}" | sort -V | tail -1)
|
||||
if [ "${newer}" = "${nim_ver}" ]; then
|
||||
echo "WARNING: Nim ${nim_ver} is installed; this repo is validated against ${NIM_VERSION}." >&2
|
||||
echo "WARNING: The build will proceed but may behave differently." >&2
|
||||
exit 0
|
||||
fi
|
||||
echo "INFO: Nim ${nim_ver} found in PATH; installing Nim ${NIM_VERSION} to ${NIM_DEST}." >&2
|
||||
fi
|
||||
|
||||
OS=$(uname -s | tr 'A-Z' 'a-z' | sed 's/darwin/macosx/')
|
||||
ARCH=$(uname -m | sed 's/x86_64/x64/;s/aarch64/arm64/')
|
||||
|
||||
NIM_DEST="${HOME}/.nim/nim-${NIM_VERSION}"
|
||||
BINARY_URL="https://nim-lang.org/download/nim-${NIM_VERSION}-${OS}_${ARCH}.tar.xz"
|
||||
WORK_DIR=$(mktemp -d)
|
||||
trap 'rm -rf "${WORK_DIR}"' EXIT
|
||||
@ -48,9 +58,7 @@ if [ "${HTTP_STATUS}" = "200" ]; then
|
||||
echo "Downloading pre-built binary from ${BINARY_URL}..."
|
||||
curl -fL "${BINARY_URL}" -o "${WORK_DIR}/nim.tar.xz"
|
||||
tar -xJf "${WORK_DIR}/nim.tar.xz" -C "${WORK_DIR}"
|
||||
rm -rf "${NIM_DEST}"
|
||||
mkdir -p "${HOME}/.nim"
|
||||
cp -r "${WORK_DIR}/nim-${NIM_VERSION}" "${NIM_DEST}"
|
||||
SRC_DIR="${WORK_DIR}/nim-${NIM_VERSION}"
|
||||
else
|
||||
echo "No pre-built binary found for ${OS}_${ARCH}. Building from source..."
|
||||
SRC_URL="https://github.com/nim-lang/Nim/archive/refs/tags/v${NIM_VERSION}.tar.gz"
|
||||
@ -58,15 +66,19 @@ else
|
||||
tar -xzf "${WORK_DIR}/nim-src.tar.gz" -C "${WORK_DIR}"
|
||||
cd "${WORK_DIR}/Nim-${NIM_VERSION}"
|
||||
sh build_all.sh
|
||||
rm -rf "${NIM_DEST}"
|
||||
mkdir -p "${HOME}/.nim"
|
||||
cp -r "${WORK_DIR}/Nim-${NIM_VERSION}" "${NIM_DEST}"
|
||||
SRC_DIR="${WORK_DIR}/Nim-${NIM_VERSION}"
|
||||
fi
|
||||
|
||||
# rm -rf can fail with "Directory not empty" on overlay filesystems (e.g. Docker).
|
||||
# Using cp -r src/. dst/ handles both cases: dst absent (clean) or partially present.
|
||||
rm -rf "${NIM_DEST}" 2>/dev/null || true
|
||||
mkdir -p "${NIM_DEST}"
|
||||
cp -r "${SRC_DIR}/." "${NIM_DEST}/"
|
||||
|
||||
mkdir -p "${HOME}/.nimble/bin"
|
||||
for bin_path in "${NIM_DEST}/bin/"*; do
|
||||
ln -sf "${bin_path}" "${HOME}/.nimble/bin/$(basename "${bin_path}")"
|
||||
done
|
||||
|
||||
echo "Nim ${NIM_VERSION} installed to ${NIM_DEST}"
|
||||
echo "Binaries symlinked in ~/.nimble/bin — ensure it is in your PATH."
|
||||
echo "Binaries symlinked in ~/.nimble/bin — ensure it is in your PATH."
|
||||
70
scripts/install_nimble.sh
Executable file
70
scripts/install_nimble.sh
Executable file
@ -0,0 +1,70 @@
|
||||
#!/usr/bin/env bash
|
||||
# Installs a specific nimble version without using `nimble install nimble`.
|
||||
#
|
||||
# `nimble install nimble` is inherently fragile:
|
||||
# - ETXTBSY: overwriting the running nimble binary in pkgs2/
|
||||
# - JSON parse failures with older nimble versions reading packages_official.json
|
||||
#
|
||||
# Strategy:
|
||||
# 1. If the right version is already at ~/.nimble/bin/nimble → done.
|
||||
# 2. If a previously-compiled binary exists in pkgs2/ → re-link it.
|
||||
# 3. Otherwise: clone the nimble git repo, init submodules, build with nim,
|
||||
# and atomically replace the target (mv avoids ETXTBSY on the old binary).
|
||||
|
||||
set -e
|
||||
|
||||
NIMBLE_VERSION="${1:-}"
|
||||
if [ -z "${NIMBLE_VERSION}" ]; then
|
||||
echo "Usage: $0 <nimble-version>" >&2
|
||||
exit 1
|
||||
fi
|
||||
|
||||
NIMBLE_BIN="${HOME}/.nimble/bin/nimble"
|
||||
|
||||
# 1. Already installed at the right version?
|
||||
if [ -x "${NIMBLE_BIN}" ]; then
|
||||
nimble_ver=$("${NIMBLE_BIN}" --version 2>/dev/null \
|
||||
| head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1 || true)
|
||||
if [ "${nimble_ver}" = "${NIMBLE_VERSION}" ]; then
|
||||
echo "Nimble ${NIMBLE_VERSION} already installed, skipping."
|
||||
exit 0
|
||||
fi
|
||||
fi
|
||||
|
||||
# 2. Already compiled into pkgs2/ from a previous (possibly partial) run?
|
||||
PKGS2_NIMBLE=$(ls -dt "${HOME}/.nimble/pkgs2/nimble-${NIMBLE_VERSION}-"*/nimble \
|
||||
2>/dev/null | head -1 || true)
|
||||
if [ -n "${PKGS2_NIMBLE}" ] && [ -x "${PKGS2_NIMBLE}" ]; then
|
||||
echo "Nimble ${NIMBLE_VERSION} found in pkgs2, re-linking to ${NIMBLE_BIN}."
|
||||
mkdir -p "${HOME}/.nimble/bin"
|
||||
ln -sf "${PKGS2_NIMBLE}" "${NIMBLE_BIN}"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
# 3. Build from source.
|
||||
NIM_BIN="${HOME}/.nimble/bin/nim"
|
||||
if [ ! -x "${NIM_BIN}" ]; then
|
||||
NIM_BIN="$(command -v nim)"
|
||||
fi
|
||||
|
||||
WORK_DIR="$(mktemp -d)"
|
||||
trap 'rm -rf "${WORK_DIR}"' EXIT
|
||||
|
||||
echo "Cloning nimble v${NIMBLE_VERSION} with submodules..."
|
||||
git clone --depth=1 --branch "v${NIMBLE_VERSION}" \
|
||||
--recurse-submodules --shallow-submodules \
|
||||
https://github.com/nim-lang/nimble.git \
|
||||
"${WORK_DIR}/nimble"
|
||||
|
||||
echo "Building nimble ${NIMBLE_VERSION} with $("${NIM_BIN}" --version | head -1)..."
|
||||
cd "${WORK_DIR}/nimble"
|
||||
# nim reads nim.cfg / config.nims in the current dir, which sets vendor paths.
|
||||
"${NIM_BIN}" c -d:release --path:src \
|
||||
-o:"${WORK_DIR}/nimble_new" src/nimble.nim
|
||||
|
||||
mkdir -p "${HOME}/.nimble/bin"
|
||||
# Atomic rename: avoids ETXTBSY when the old binary at NIMBLE_BIN is still running.
|
||||
cp "${WORK_DIR}/nimble_new" "${NIMBLE_BIN}.new.$$"
|
||||
mv -f "${NIMBLE_BIN}.new.$$" "${NIMBLE_BIN}"
|
||||
|
||||
echo "Nimble ${NIMBLE_VERSION} installed to ${NIMBLE_BIN}"
|
||||
@ -88,3 +88,6 @@ import ./tools/test_all
|
||||
|
||||
# Persistency library tests
|
||||
import ./persistency/test_all
|
||||
|
||||
# Reliable Channel API tests
|
||||
import ./channels/test_all
|
||||
|
||||
3
tests/channels/test_all.nim
Normal file
3
tests/channels/test_all.nim
Normal file
@ -0,0 +1,3 @@
|
||||
{.used.}
|
||||
|
||||
import ./test_reliable_channel_send_receive
|
||||
149
tests/channels/test_reliable_channel_send_receive.nim
Normal file
149
tests/channels/test_reliable_channel_send_receive.nim
Normal file
@ -0,0 +1,149 @@
|
||||
{.used.}
|
||||
|
||||
import std/[net]
|
||||
import chronos, testutils/unittests, stew/byteutils
|
||||
import brokers/broker_context
|
||||
|
||||
import ../testlib/[common, wakucore, wakunode, testasync]
|
||||
|
||||
import waku
|
||||
import waku/[waku_node, waku_core]
|
||||
import waku/factory/waku_conf
|
||||
import waku/events/message_events as waku_message_events
|
||||
import tools/confutils/cli_args
|
||||
|
||||
import channels/reliable_channel_manager
|
||||
import channels/encryption/noop_encryption
|
||||
|
||||
const TestTimeout = chronos.seconds(15)
|
||||
|
||||
proc createApiNodeConf(): WakuNodeConf =
|
||||
var conf = defaultWakuNodeConf().valueOr:
|
||||
raiseAssert error
|
||||
conf.mode = cli_args.WakuMode.Core
|
||||
conf.listenAddress = parseIpAddress("0.0.0.0")
|
||||
conf.tcpPort = Port(0)
|
||||
conf.discv5UdpPort = Port(0)
|
||||
conf.clusterId = 3'u16
|
||||
conf.numShardsInNetwork = 1
|
||||
conf.reliabilityEnabled = true
|
||||
conf.rest = false
|
||||
return conf
|
||||
|
||||
suite "Reliable Channel - ingress":
|
||||
asyncTest "manager dispatches marked WakuMessage to the right channel":
|
||||
## Unit test for the receive side of the API: instead of standing
|
||||
## up two libp2p nodes and a relay mesh, we drive the manager
|
||||
## directly by emitting a `MessageReceivedEvent` (the exact event
|
||||
## the DeliveryService emits when a `WakuMessage` arrives off the
|
||||
## wire). The manager must:
|
||||
## - drop traffic missing the Reliable Channel spec marker
|
||||
## - dispatch the matching channel's `onMessageReceived`
|
||||
## - emit `ChannelMessageReceivedEvent` with the payload
|
||||
const
|
||||
channelId = ChannelId("test-channel")
|
||||
contentTopic = ContentTopic("/reliable-channel/test/proto")
|
||||
let appPayload = "hello reliable channel".toBytes()
|
||||
|
||||
var manager: ReliableChannelManager
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
|
||||
"Failed to create manager"
|
||||
)
|
||||
|
||||
## Noop encryption providers so the Encrypt/Decrypt brokers have
|
||||
## something to dispatch to; without this the channel falls back to
|
||||
## plaintext anyway, but installing them is the documented setup.
|
||||
setNoopEncryption()
|
||||
|
||||
discard manager
|
||||
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
|
||||
.expect("createReliableChannel")
|
||||
|
||||
let received = newFuture[seq[byte]]("channel-message-received")
|
||||
discard ChannelMessageReceivedEvent
|
||||
.listen(
|
||||
brokerCtx,
|
||||
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
|
||||
if not received.finished() and evt.channelId == channelId:
|
||||
received.complete(evt.payload)
|
||||
,
|
||||
)
|
||||
.expect("listen ChannelMessageReceivedEvent")
|
||||
|
||||
## Build a `WakuMessage` that looks like one that came in off the
|
||||
## wire from a peer: the spec marker on `meta` plus the right content
|
||||
## topic. The manager's ingress listener should pick it up,
|
||||
## decrypt (noop), unwrap SDS (pass-through), reassemble (one
|
||||
## segment), and finally emit `ChannelMessageReceivedEvent`.
|
||||
let inboundMsg = WakuMessage(
|
||||
payload: appPayload,
|
||||
contentTopic: contentTopic,
|
||||
version: 0,
|
||||
meta: LipWireReliableChannelVersion.toBytes(),
|
||||
)
|
||||
|
||||
waku_message_events.MessageReceivedEvent.emit(
|
||||
brokerCtx,
|
||||
waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg),
|
||||
)
|
||||
|
||||
let arrived = await received.withTimeout(TestTimeout)
|
||||
check arrived
|
||||
if arrived:
|
||||
check received.read() == appPayload
|
||||
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "manager drops unmarked WakuMessage":
|
||||
## Mirror of the above: same content topic, but `meta` is empty
|
||||
## (i.e. foreign traffic). The channel-level event must NOT fire.
|
||||
const
|
||||
channelId = ChannelId("test-channel-2")
|
||||
contentTopic = ContentTopic("/reliable-channel/test/proto")
|
||||
let appPayload = "foreign payload".toBytes()
|
||||
|
||||
var manager: ReliableChannelManager
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
|
||||
"Failed to create manager"
|
||||
)
|
||||
|
||||
setNoopEncryption()
|
||||
|
||||
discard manager
|
||||
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
|
||||
.expect("createReliableChannel")
|
||||
|
||||
var fired = false
|
||||
discard ChannelMessageReceivedEvent
|
||||
.listen(
|
||||
brokerCtx,
|
||||
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
|
||||
if evt.channelId == channelId:
|
||||
fired = true
|
||||
,
|
||||
)
|
||||
.expect("listen ChannelMessageReceivedEvent")
|
||||
|
||||
let inboundMsg = WakuMessage(
|
||||
payload: appPayload,
|
||||
contentTopic: contentTopic,
|
||||
version: 0,
|
||||
meta: @[], ## no Reliable Channel spec marker
|
||||
)
|
||||
|
||||
waku_message_events.MessageReceivedEvent.emit(
|
||||
brokerCtx,
|
||||
waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg),
|
||||
)
|
||||
|
||||
## Give the event broker a chance to fan out.
|
||||
await sleepAsync(100.milliseconds)
|
||||
check not fired
|
||||
|
||||
await manager.stop()
|
||||
@ -1,11 +1,11 @@
|
||||
import std/[options], stew/results, testutils/unittests
|
||||
import std/[options], results, testutils/unittests
|
||||
|
||||
import
|
||||
waku/node/peer_manager/peer_store/migrations,
|
||||
../../waku_archive/archive_utils,
|
||||
../../testlib/[simple_mock]
|
||||
|
||||
import std/[tables, strutils, os], stew/results, chronicles
|
||||
import std/[tables, strutils, os], results, chronicles
|
||||
|
||||
import waku/common/databases/db_sqlite, waku/common/databases/common
|
||||
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import stew/results, testutils/unittests
|
||||
import results, testutils/unittests
|
||||
|
||||
import waku/node/peer_manager/peer_store/peer_storage, waku/waku_core/peers
|
||||
|
||||
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
import
|
||||
std/[tempfiles, strutils, options],
|
||||
stew/results,
|
||||
results,
|
||||
testutils/unittests,
|
||||
chronos,
|
||||
libp2p/switch,
|
||||
|
||||
388
tests/simulator/rln-e2e-test.py
Executable file
388
tests/simulator/rln-e2e-test.py
Executable file
@ -0,0 +1,388 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
RLN end-to-end test against a running logos-delivery-simulator stack.
|
||||
|
||||
Designed to run as a sidecar container on the simulator's Docker network so
|
||||
hostnames like `logos-delivery-simulator-nwaku-1` resolve via Docker DNS.
|
||||
|
||||
Scenarios covered (in order):
|
||||
1. HEALTH - every node responds to /debug/v1/info with an enrUri
|
||||
2. SUBSCRIBE - every node REST-subscribes to the pubsub topic
|
||||
3. WITHIN_LIMIT - every node concurrently sends msg_limit messages -> 200
|
||||
4. PROPAGATION - one sender's message lands in all peers' inboxes
|
||||
5. OVER_LIMIT - one extra message per node -> 500 (rate-limit hit)
|
||||
6. EPOCH_RESET - after epoch_sec, every node can send 1 more -> 200
|
||||
7. SAME_MESSAGE_ID - sending same message_id twice in same epoch is the
|
||||
slashable signal (verified by checking node logs)
|
||||
|
||||
Exit code:
|
||||
0 = all scenarios passed
|
||||
N = number of scenarios that failed
|
||||
|
||||
Usage (typical):
|
||||
docker run --rm \\
|
||||
--network logos-delivery-simulator_simulation \\
|
||||
-v /path/to/rln-e2e-test.py:/test.py \\
|
||||
python:3.11-slim \\
|
||||
sh -c 'pip install --quiet requests && python /test.py \\
|
||||
--hostname-prefix logos-delivery-simulator-nwaku- \\
|
||||
--num-nodes 30 --msg-limit 30 --epoch-sec 15'
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import concurrent.futures as cf
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.parse
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
|
||||
PUBSUB_TOPIC = "/waku/2/rs/66/0"
|
||||
CONTENT_TOPIC = "/rln-test/1/probe/proto"
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def url_of(host: str, port: int = 8645) -> str:
|
||||
return f"http://{host}:{port}"
|
||||
|
||||
|
||||
def waku_publish(node_url: str, payload: bytes, timeout: float = 5.0) -> int:
|
||||
body = {
|
||||
"payload": base64.b64encode(payload).decode("ascii"),
|
||||
"contentTopic": CONTENT_TOPIC,
|
||||
"version": 1,
|
||||
"timestamp": time.time_ns(),
|
||||
}
|
||||
enc = urllib.parse.quote(PUBSUB_TOPIC, safe="")
|
||||
try:
|
||||
r = requests.post(
|
||||
f"{node_url}/relay/v1/messages/{enc}",
|
||||
json=body,
|
||||
timeout=timeout,
|
||||
headers={"content-type": "application/json"},
|
||||
)
|
||||
return r.status_code
|
||||
except requests.RequestException:
|
||||
return -1
|
||||
|
||||
|
||||
def waku_subscribe(node_url: str, timeout: float = 5.0) -> int:
|
||||
try:
|
||||
r = requests.post(
|
||||
f"{node_url}/relay/v1/subscriptions",
|
||||
json=[PUBSUB_TOPIC],
|
||||
timeout=timeout,
|
||||
headers={"content-type": "application/json"},
|
||||
)
|
||||
return r.status_code
|
||||
except requests.RequestException:
|
||||
return -1
|
||||
|
||||
|
||||
def waku_get_messages(node_url: str, timeout: float = 5.0) -> Optional[list]:
|
||||
enc = urllib.parse.quote(PUBSUB_TOPIC, safe="")
|
||||
try:
|
||||
r = requests.get(
|
||||
f"{node_url}/relay/v1/messages/{enc}",
|
||||
timeout=timeout,
|
||||
)
|
||||
if r.status_code != 200:
|
||||
return None
|
||||
return r.json()
|
||||
except (requests.RequestException, json.JSONDecodeError):
|
||||
return None
|
||||
|
||||
|
||||
def node_healthy(node_url: str, timeout: float = 3.0) -> bool:
|
||||
try:
|
||||
r = requests.get(f"{node_url}/debug/v1/info", timeout=timeout)
|
||||
return r.status_code == 200 and "enrUri" in r.json()
|
||||
except (requests.RequestException, json.JSONDecodeError):
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# scenarios
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class Result:
|
||||
name: str
|
||||
ok: bool
|
||||
detail: str = ""
|
||||
|
||||
def __str__(self) -> str:
|
||||
status = "PASS" if self.ok else "FAIL"
|
||||
s = f"[{status}] {self.name}"
|
||||
if self.detail:
|
||||
s += f" — {self.detail}"
|
||||
return s
|
||||
|
||||
|
||||
def scenario_health(nodes: list[str], deadline_sec: int = 120) -> Result:
|
||||
"""Every node must be reachable within deadline_sec."""
|
||||
start = time.time()
|
||||
unhealthy = list(nodes)
|
||||
while time.time() - start < deadline_sec and unhealthy:
|
||||
with cf.ThreadPoolExecutor(max_workers=min(32, len(unhealthy))) as ex:
|
||||
results = list(ex.map(node_healthy, [url_of(n) for n in unhealthy]))
|
||||
unhealthy = [n for n, ok in zip(unhealthy, results) if not ok]
|
||||
if unhealthy:
|
||||
time.sleep(3)
|
||||
return Result(
|
||||
"HEALTH",
|
||||
not unhealthy,
|
||||
f"{len(nodes) - len(unhealthy)}/{len(nodes)} healthy"
|
||||
+ (f"; failing: {unhealthy[:5]}" if unhealthy else ""),
|
||||
)
|
||||
|
||||
|
||||
def scenario_subscribe(nodes: list[str]) -> Result:
|
||||
"""REST-subscribe every node to the pubsub topic so GETs return cached msgs."""
|
||||
with cf.ThreadPoolExecutor(max_workers=min(32, len(nodes))) as ex:
|
||||
codes = list(ex.map(waku_subscribe, [url_of(n) for n in nodes]))
|
||||
bad = [(n, c) for n, c in zip(nodes, codes) if c != 200]
|
||||
return Result(
|
||||
"SUBSCRIBE",
|
||||
not bad,
|
||||
f"{len(nodes) - len(bad)}/{len(nodes)} subscribed"
|
||||
+ (f"; failing: {bad[:5]}" if bad else ""),
|
||||
)
|
||||
|
||||
|
||||
def _send_n(node_url: str, n: int) -> list[int]:
|
||||
codes = []
|
||||
for i in range(n):
|
||||
codes.append(waku_publish(node_url, f"probe-{i}".encode()))
|
||||
return codes
|
||||
|
||||
|
||||
def _burst_until_blocked(node_url: str, msg_limit: int, overshoot: int = 3):
|
||||
"""Send msg_limit+overshoot messages back-to-back, fast, recording codes.
|
||||
Designed to complete inside a single epoch — keep epoch_sec large enough
|
||||
that this burst can't straddle an epoch boundary.
|
||||
|
||||
Returns (n_200, n_500, n_transport_err, two_hundred_after_block) where
|
||||
two_hundred_after_block flags a 200 appearing AFTER the first 500 (i.e.
|
||||
quota reset mid-burst => epoch straddle)."""
|
||||
codes = []
|
||||
for i in range(msg_limit + overshoot):
|
||||
codes.append(waku_publish(node_url, f"burst-{i}".encode(), timeout=10.0))
|
||||
n_200 = sum(c == 200 for c in codes)
|
||||
n_500 = sum(c == 500 for c in codes)
|
||||
n_err = sum(c not in (200, 500) for c in codes) # -1, 4xx transient, etc.
|
||||
first_block_idx = next((i for i, c in enumerate(codes) if c == 500), None)
|
||||
two_hundred_after_block = (
|
||||
first_block_idx is not None
|
||||
and any(c == 200 for c in codes[first_block_idx + 1:])
|
||||
)
|
||||
return n_200, n_500, n_err, two_hundred_after_block
|
||||
|
||||
|
||||
def _publish_until_ok(node_url: str, attempts: int = 20, spacing: float = 5.0) -> bool:
|
||||
"""Retry a single publish until it returns 200 or attempts run out.
|
||||
Tolerates the post-startup window where discv5/gossipsub mesh is still
|
||||
forming and the RLN publish path transiently 500s."""
|
||||
for _ in range(attempts):
|
||||
if waku_publish(node_url, b"warmup", timeout=10.0) == 200:
|
||||
return True
|
||||
time.sleep(spacing)
|
||||
return False
|
||||
|
||||
|
||||
def scenario_warmup(nodes: list[str], attempts: int = 20) -> Result:
|
||||
"""Readiness gate: every node must successfully publish at least once.
|
||||
This absorbs mesh-formation churn so PROPAGATION/RATE_LIMIT aren't
|
||||
judging a not-yet-connected fleet. Consumes 1 nonce/node — well within
|
||||
msg_limit, and RATE_LIMIT's tolerance accounts for it."""
|
||||
with cf.ThreadPoolExecutor(max_workers=min(8, len(nodes))) as ex:
|
||||
ready = list(ex.map(lambda n: _publish_until_ok(url_of(n), attempts), nodes))
|
||||
not_ready = [n for n, ok in zip(nodes, ready) if not ok]
|
||||
return Result(
|
||||
"WARMUP",
|
||||
not not_ready,
|
||||
f"{len(nodes) - len(not_ready)}/{len(nodes)} nodes publishing"
|
||||
+ (f"; never ready: {not_ready[:5]}" if not_ready else ""),
|
||||
)
|
||||
|
||||
|
||||
def scenario_rate_limit(nodes: list[str], msg_limit: int, tolerance: int = 3) -> Result:
|
||||
"""Per-node burst of msg_limit+3 messages within one epoch.
|
||||
|
||||
The RLN invariant being checked:
|
||||
(a) a node must NEVER publish more than msg_limit in one epoch, and
|
||||
(b) the node must enforce a 500 ceiling once the quota is exhausted.
|
||||
|
||||
Transient HTTP errors under concurrent load can lower the accepted count
|
||||
below msg_limit — that does NOT violate the invariant, so we accept
|
||||
successes in [msg_limit - tolerance, msg_limit]. successes > msg_limit OR
|
||||
a 200 after the first 500 means the epoch rolled mid-burst (raise
|
||||
RLN_RELAY_EPOCH_SEC) — reported as a timing skew, not an RLN failure."""
|
||||
# Cap concurrency: firing len(nodes)*(msg_limit+3) publishes all at once
|
||||
# saturates small CI runners (2 vCPU) and causes publish-path timeouts
|
||||
# that masquerade as rate-limit failures.
|
||||
with cf.ThreadPoolExecutor(max_workers=min(5, len(nodes))) as ex:
|
||||
per_node = list(
|
||||
ex.map(lambda n: _burst_until_blocked(url_of(n), msg_limit), nodes)
|
||||
)
|
||||
|
||||
rate_failures = [] # genuine RLN misbehaviour
|
||||
timing_skews = [] # epoch straddled mid-burst — inconclusive
|
||||
for node, (n_200, n_500, n_err, after_block) in zip(nodes, per_node):
|
||||
if n_200 > msg_limit or after_block:
|
||||
timing_skews.append(
|
||||
(node, f"{n_200} ok, epoch rolled mid-burst (raise epoch_sec)")
|
||||
)
|
||||
elif n_500 == 0:
|
||||
rate_failures.append((node, f"no 500 ceiling ({n_200} ok, {n_err} err)"))
|
||||
elif n_200 < msg_limit - tolerance:
|
||||
rate_failures.append(
|
||||
(node, f"only {n_200}/{msg_limit} ok ({n_err} transport err)")
|
||||
)
|
||||
|
||||
if timing_skews and not rate_failures:
|
||||
return Result(
|
||||
"RATE_LIMIT",
|
||||
False,
|
||||
f"INCONCLUSIVE (timing) — raise RLN_RELAY_EPOCH_SEC; "
|
||||
f"{len(timing_skews)} node(s) straddled an epoch: {timing_skews[:3]}",
|
||||
)
|
||||
ok = not rate_failures and not timing_skews
|
||||
good = len(nodes) - len(rate_failures) - len(timing_skews)
|
||||
return Result(
|
||||
"RATE_LIMIT",
|
||||
ok,
|
||||
f"{good}/{len(nodes)} nodes enforced <= {msg_limit} then 500 "
|
||||
f"(tolerance {tolerance} for transport noise)"
|
||||
+ (f"; rate failures: {rate_failures[:3]}" if rate_failures else "")
|
||||
+ (f"; timing skews: {timing_skews[:3]}" if timing_skews else ""),
|
||||
)
|
||||
|
||||
|
||||
def scenario_propagation(
|
||||
sender: str, receivers: list[str], settle_sec: int = 5
|
||||
) -> Result:
|
||||
"""Send one message on `sender`, expect it visible in every receiver's
|
||||
REST inbox within settle_sec."""
|
||||
marker = f"propagation-marker-{time.time_ns()}".encode()
|
||||
code = waku_publish(url_of(sender), marker)
|
||||
if code != 200:
|
||||
return Result("PROPAGATION", False, f"sender publish returned {code}")
|
||||
|
||||
time.sleep(settle_sec)
|
||||
missing = []
|
||||
with cf.ThreadPoolExecutor(max_workers=min(32, len(receivers))) as ex:
|
||||
inboxes = list(ex.map(waku_get_messages, [url_of(r) for r in receivers]))
|
||||
|
||||
encoded_marker = base64.b64encode(marker).decode().rstrip("=")
|
||||
for r, inbox in zip(receivers, inboxes):
|
||||
if inbox is None:
|
||||
missing.append((r, "GET failed"))
|
||||
continue
|
||||
# Look for our marker payload in any message
|
||||
found = any(
|
||||
(m.get("payload") or "").rstrip("=") == encoded_marker
|
||||
for m in inbox
|
||||
)
|
||||
if not found:
|
||||
missing.append((r, f"{len(inbox)} msgs, marker not present"))
|
||||
|
||||
return Result(
|
||||
"PROPAGATION",
|
||||
not missing,
|
||||
f"{len(receivers) - len(missing)}/{len(receivers)} receivers got the message"
|
||||
+ (f"; missing on {missing[:3]}" if missing else ""),
|
||||
)
|
||||
|
||||
|
||||
def scenario_epoch_reset(nodes: list[str], epoch_sec: int) -> Result:
|
||||
"""After epoch_sec + slack, each node can send 1 more message — expect 200."""
|
||||
sleep_s = epoch_sec + 3
|
||||
print(f" sleeping {sleep_s}s for epoch reset...")
|
||||
time.sleep(sleep_s)
|
||||
with cf.ThreadPoolExecutor(max_workers=len(nodes)) as ex:
|
||||
codes = list(
|
||||
ex.map(
|
||||
lambda n: waku_publish(url_of(n), b"post-epoch"),
|
||||
nodes,
|
||||
)
|
||||
)
|
||||
bad = [(n, c) for n, c in zip(nodes, codes) if c != 200]
|
||||
return Result(
|
||||
"EPOCH_RESET",
|
||||
not bad,
|
||||
f"{sum(c == 200 for c in codes)}/{len(nodes)} returned 200 after epoch reset"
|
||||
+ (f"; failing: {bad[:3]}" if bad else ""),
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# main
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def main() -> int:
|
||||
ap = argparse.ArgumentParser(description=__doc__)
|
||||
ap.add_argument("--hostname-prefix", default="logos-delivery-simulator-nwaku-")
|
||||
ap.add_argument("--num-nodes", type=int, default=30)
|
||||
ap.add_argument("--msg-limit", type=int, default=30,
|
||||
help="Must match RLN_RELAY_MSG_LIMIT in simulator .env")
|
||||
ap.add_argument("--epoch-sec", type=int, default=15,
|
||||
help="Must match RLN_RELAY_EPOCH_SEC in simulator .env")
|
||||
ap.add_argument("--health-deadline-sec", type=int, default=180)
|
||||
args = ap.parse_args()
|
||||
|
||||
nodes = [f"{args.hostname_prefix}{i}" for i in range(1, args.num_nodes + 1)]
|
||||
print(f"Testing {len(nodes)} nodes: {nodes[0]} … {nodes[-1]}")
|
||||
print(f"Config: msg_limit={args.msg_limit}, epoch_sec={args.epoch_sec}")
|
||||
print()
|
||||
|
||||
results: list[Result] = []
|
||||
|
||||
def run(scenario_fn, *fn_args, **fn_kwargs) -> bool:
|
||||
r = scenario_fn(*fn_args, **fn_kwargs)
|
||||
results.append(r)
|
||||
print(r)
|
||||
return r.ok
|
||||
|
||||
if not run(scenario_health, nodes, deadline_sec=args.health_deadline_sec):
|
||||
print("\nABORTING — nodes never reached healthy state.")
|
||||
return _summarize(results)
|
||||
|
||||
if not run(scenario_subscribe, nodes):
|
||||
print("\nABORTING — could not subscribe nodes to pubsub topic.")
|
||||
return _summarize(results)
|
||||
|
||||
# Readiness gate: wait out mesh-formation churn before judging behaviour.
|
||||
if not run(scenario_warmup, nodes):
|
||||
print("\nABORTING — fleet never reached a publishable state.")
|
||||
return _summarize(results)
|
||||
|
||||
run(scenario_propagation, nodes[0], nodes[1:])
|
||||
# Rate limit: per-node burst, asserts exactly msg_limit then 500.
|
||||
# Requires epoch_sec large enough that the burst can't straddle an epoch.
|
||||
run(scenario_rate_limit, nodes, args.msg_limit)
|
||||
run(scenario_epoch_reset, nodes, args.epoch_sec)
|
||||
|
||||
return _summarize(results)
|
||||
|
||||
|
||||
def _summarize(results: list[Result]) -> int:
|
||||
print()
|
||||
print("=" * 64)
|
||||
passed = sum(r.ok for r in results)
|
||||
print(f" {passed}/{len(results)} scenarios passed")
|
||||
for r in results:
|
||||
print(f" {r}")
|
||||
print("=" * 64)
|
||||
return len(results) - passed
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
6
tests/simulator/rln-sim.env
Normal file
6
tests/simulator/rln-sim.env
Normal file
@ -0,0 +1,6 @@
|
||||
# Source of truth for the RLN simulator E2E run (ci-rln-simulator.yml).
|
||||
# workflow_dispatch inputs override any value here per-run (blank input = use this file).
|
||||
BRANCH=master
|
||||
NUM_NODES=6
|
||||
MSG_LIMIT=30
|
||||
EPOCH_SEC=120
|
||||
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import testutils/unittests
|
||||
import stew/results, waku/waku_core/message, waku/waku_core/time, ./testlib/common
|
||||
import results, waku/waku_core/message, waku/waku_core/time, ./testlib/common
|
||||
|
||||
suite "Waku Payload":
|
||||
test "Encode/Decode waku message with timestamp":
|
||||
|
||||
@ -1,7 +1,7 @@
|
||||
{.used.}
|
||||
|
||||
import
|
||||
stew/results,
|
||||
results,
|
||||
chronos,
|
||||
testutils/unittests,
|
||||
libp2p/crypto/crypto as libp2p_keys,
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user