From 67eebe3a024b5361f763f18476c3f4c1dba35185 Mon Sep 17 00:00:00 2001 From: Darshan <35736874+darshankabariya@users.noreply.github.com> Date: Fri, 22 May 2026 17:15:31 +0530 Subject: [PATCH 1/7] ci: add daily rln simulator e2e workflow (#3885) --- .github/workflows/ci-daily.yml | 5 + .github/workflows/ci-rln-simulator.yml | 271 +++++++++++++++++ .gitignore | 4 + tests/simulator/rln-e2e-test.py | 388 +++++++++++++++++++++++++ tests/simulator/rln-sim.env | 6 + 5 files changed, 674 insertions(+) create mode 100644 .github/workflows/ci-rln-simulator.yml create mode 100755 tests/simulator/rln-e2e-test.py create mode 100644 tests/simulator/rln-sim.env diff --git a/.github/workflows/ci-daily.yml b/.github/workflows/ci-daily.yml index a4cf39340..d52775ae2 100644 --- a/.github/workflows/ci-daily.yml +++ b/.github/workflows/ci-daily.yml @@ -77,3 +77,8 @@ jobs: }" \ "$DISCORD_WEBHOOK_URL" + # RLN end-to-end against the simulator. Defaults from tests/simulator/rln-sim.env. + rln-simulator: + uses: ./.github/workflows/ci-rln-simulator.yml + secrets: inherit + diff --git a/.github/workflows/ci-rln-simulator.yml b/.github/workflows/ci-rln-simulator.yml new file mode 100644 index 000000000..c49ae1815 --- /dev/null +++ b/.github/workflows/ci-rln-simulator.yml @@ -0,0 +1,271 @@ +name: RLN E2E — Simulator + +# Validates the full RLN flow end-to-end against logos-delivery-simulator: +# keystore generation, on-chain registration, gossipsub propagation, +# per-epoch rate-limit enforcement, and epoch-boundary recovery. +# +# Why this exists: logos-dev runs with RLN disabled, so there is no +# production traffic exercising RLN. Until RLN is enabled there, this is +# the only end-to-end coverage of the RLN + zerokit path. +# +# The image is built ON the runner and tested ON the same runner, so the +# AVX-512 portability issue in container-image.yml does not apply here. +# +# No own schedule: ci-daily.yml is the single daily entry point and calls +# this via workflow_call. workflow_dispatch allows manual runs. +# Run defaults live in tests/simulator/rln-sim.env; inputs override per-run. + +on: + workflow_call: + inputs: + branch: + type: string + default: '' + num_nodes: + type: string + default: '' + msg_limit: + type: string + default: '' + epoch_sec: + type: string + default: '' + workflow_dispatch: + inputs: + branch: + description: 'logos-delivery branch to build & test (blank = use rln-sim.env)' + type: string + default: '' + num_nodes: + description: 'Number of nwaku nodes (blank = use rln-sim.env)' + type: string + default: '' + msg_limit: + description: 'RLN_RELAY_MSG_LIMIT, must be >= contract min ~20 (blank = use rln-sim.env)' + type: string + default: '' + epoch_sec: + description: 'RLN_RELAY_EPOCH_SEC, large enough a burst cannot straddle an epoch (blank = use rln-sim.env)' + type: string + default: '' + +env: + NPROC: 2 + MAKEFLAGS: "-j2" + NIM_VERSION: '2.2.4' + NIMBLE_VERSION: '0.22.3' + +jobs: + rln-e2e: + runs-on: ubuntu-22.04 + timeout-minutes: 120 + name: rln-e2e + + steps: + # First checkout: the ref that triggered this workflow (CI branch / + # master). This is where the e2e test script and rln-sim.env live — + # the build branch may not contain them. + - name: Checkout CI ref (for the test script) + uses: actions/checkout@v4 + with: + submodules: false + + # Defaults come from tests/simulator/rln-sim.env (single source of truth); + # a non-blank input (dispatch or workflow_call) overrides the matching value. + - name: Resolve parameters + id: cfg + env: + IN_BRANCH: ${{ inputs.branch }} + IN_NUM_NODES: ${{ inputs.num_nodes }} + IN_MSG_LIMIT: ${{ inputs.msg_limit }} + IN_EPOCH_SEC: ${{ inputs.epoch_sec }} + run: | + set -euo pipefail + set -a; . tests/simulator/rln-sim.env; set +a + { + echo "branch=${IN_BRANCH:-$BRANCH}" + echo "num_nodes=${IN_NUM_NODES:-$NUM_NODES}" + echo "msg_limit=${IN_MSG_LIMIT:-$MSG_LIMIT}" + echo "epoch_sec=${IN_EPOCH_SEC:-$EPOCH_SEC}" + } >> "$GITHUB_OUTPUT" + + - name: Stash e2e test script outside the workspace + run: | + test -f tests/simulator/rln-e2e-test.py \ + || { echo "tests/simulator/rln-e2e-test.py missing on CI ref"; exit 1; } + cp tests/simulator/rln-e2e-test.py "$RUNNER_TEMP/rln-e2e-test.py" + + # Second checkout: the branch to build & test. Overwrites the workspace; + # the stashed test script in RUNNER_TEMP survives. + - name: Checkout logos-delivery (${{ steps.cfg.outputs.branch }}) + uses: actions/checkout@v4 + with: + ref: ${{ steps.cfg.outputs.branch }} + submodules: false + clean: true + + - name: Get submodules hash + id: submodules + run: echo "hash=$(git submodule status | awk '{print $1}' | sort | shasum -a 256 | sed 's/[ -]*//g')" >> $GITHUB_OUTPUT + + - name: Cache submodules + uses: actions/cache@v3 + with: + path: | + vendor/ + .git/modules + key: ${{ runner.os }}-vendor-modules-${{ steps.submodules.outputs.hash }} + + - name: Install Nim ${{ env.NIM_VERSION }} + uses: jiro4989/setup-nim-action@v2 + with: + nim-version: ${{ env.NIM_VERSION }} + repo-token: ${{ secrets.GITHUB_TOKEN }} + + - name: Install Nimble ${{ env.NIMBLE_VERSION }} + run: | + cd /tmp && nimble install "nimble@${{ env.NIMBLE_VERSION }}" -y + echo "$HOME/.nimble/bin" >> $GITHUB_PATH + + - name: Cache nimble deps + id: cache-nimbledeps + uses: actions/cache@v3 + with: + path: | + nimbledeps/ + nimble.paths + key: ${{ runner.os }}-nimbledeps-nimble${{ env.NIMBLE_VERSION }}-${{ hashFiles('nimble.lock', 'BearSSL.mk', 'Nat.mk') }} + + - name: Install nimble deps + if: steps.cache-nimbledeps.outputs.cache-hit != 'true' + run: | + nimble setup --localdeps -y + make rebuild-nat-libs-nimbledeps + make rebuild-bearssl-nimbledeps + touch nimbledeps/.nimble-setup + + - name: Build wakunode2 + run: | + make -j${NPROC} V=1 POSTGRES=1 \ + NIMFLAGS="-d:disableMarchNative -d:chronicles_colors:none" \ + wakunode2 + + - name: Build local Docker image + run: | + docker build -t nwaku-rln-ci:test -f docker/binaries/Dockerfile.bn.amd64 . + + - name: Clone logos-delivery-simulator + run: | + git clone --depth 1 https://github.com/logos-messaging/logos-delivery-simulator.git "$RUNNER_TEMP/logos-delivery-simulator" + + - name: Write simulator .env + working-directory: ${{ runner.temp }}/logos-delivery-simulator + run: | + cat > .env </dev/null || echo missing) + [ "$st" = "exited" ] && break + echo "deployer status: $st"; sleep 15 + done + ec=$(docker inspect logos-delivery-simulator-contract-repo-deployer-1 --format='{{.State.ExitCode}}') + echo "deployer exit code: $ec" + if [ "$ec" != "0" ]; then + docker logs logos-delivery-simulator-contract-repo-deployer-1 2>&1 | tail -50 + exit 1 + fi + + - name: Wait for nwaku fleet to register + working-directory: ${{ runner.temp }}/logos-delivery-simulator + run: | + N=${{ steps.cfg.outputs.num_nodes }} + for _ in $(seq 1 60); do + up=$(docker ps --filter 'name=logos-delivery-simulator-nwaku-' --filter 'status=running' --format '{{.Names}}' | wc -l) + echo "nwaku running: $up/$N" + [ "$up" -ge "$N" ] && break + sleep 15 + done + # nwaku-1 must reach the "registered + started" marker + timeout 300 docker logs -f logos-delivery-simulator-nwaku-1 2>&1 \ + | grep -m1 -E "Segmentation fault|Illegal instruction|Failed to register on-chain|I am a nwaku node" \ + | tee /tmp/nwaku1.verdict + grep -q "I am a nwaku node" /tmp/nwaku1.verdict + + - name: Run RLN e2e scenarios + run: | + TEST_SCRIPT="$RUNNER_TEMP/rln-e2e-test.py" + test -f "$TEST_SCRIPT" \ + || { echo "stashed test script missing at $TEST_SCRIPT"; exit 1; } + docker run --rm \ + --network logos-delivery-simulator_simulation \ + -v "$TEST_SCRIPT:/test.py:ro" \ + python:3.11-slim \ + sh -c "pip install --quiet --disable-pip-version-check requests && \ + python /test.py \ + --hostname-prefix logos-delivery-simulator-nwaku- \ + --num-nodes ${{ steps.cfg.outputs.num_nodes }} \ + --msg-limit ${{ steps.cfg.outputs.msg_limit }} \ + --epoch-sec ${{ steps.cfg.outputs.epoch_sec }} \ + --health-deadline-sec 600" + + - name: Collect logs on failure + if: failure() + working-directory: ${{ runner.temp }}/logos-delivery-simulator + run: | + mkdir -p "$RUNNER_TEMP/logs" + for c in $(docker ps -a --filter 'name=logos-delivery-simulator-' --format '{{.Names}}'); do + docker logs "$c" > "$RUNNER_TEMP/logs/$c.log" 2>&1 || true + done + + - name: Upload logs + if: failure() + uses: actions/upload-artifact@v4 + with: + name: simulator-logs + path: ${{ runner.temp }}/logs + retention-days: 7 + + - name: Tear down + if: always() + working-directory: ${{ runner.temp }}/logos-delivery-simulator + run: docker compose down -v || true + + - name: Notify Discord + if: always() + env: + DISCORD_WEBHOOK_URL: ${{ secrets.DISCORD_WEBHOOK_URL }} + run: | + [ -z "$DISCORD_WEBHOOK_URL" ] && exit 0 + STATUS="${{ job.status }}" + BRANCH="${{ steps.cfg.outputs.branch }}" + RUN_URL="https://github.com/${{ github.repository }}/actions/runs/${{ github.run_id }}" + if [ "$STATUS" = "success" ]; then COLOR=3066993; TITLE="✅ RLN E2E passed"; else COLOR=15158332; TITLE="❌ RLN E2E failed"; fi + curl -H "Content-Type: application/json" -X POST -d "{ + \"embeds\":[{\"title\":\"$TITLE\",\"color\":$COLOR, + \"fields\":[ + {\"name\":\"Branch\",\"value\":\"$BRANCH\",\"inline\":true}, + {\"name\":\"Status\",\"value\":\"$STATUS\",\"inline\":true}], + \"url\":\"$RUN_URL\", + \"footer\":{\"text\":\"Daily RLN simulator E2E\"}}]}" \ + "$DISCORD_WEBHOOK_URL" diff --git a/.gitignore b/.gitignore index 750d0a00b..0f9751f9b 100644 --- a/.gitignore +++ b/.gitignore @@ -86,3 +86,7 @@ nimbledeps **/anvil_state/state-deployed-contracts-mint-and-approved.json .gitnexus + +# Python bytecode from tests/simulator +__pycache__/ +*.pyc diff --git a/tests/simulator/rln-e2e-test.py b/tests/simulator/rln-e2e-test.py new file mode 100755 index 000000000..4248ee1f6 --- /dev/null +++ b/tests/simulator/rln-e2e-test.py @@ -0,0 +1,388 @@ +#!/usr/bin/env python3 +""" +RLN end-to-end test against a running logos-delivery-simulator stack. + +Designed to run as a sidecar container on the simulator's Docker network so +hostnames like `logos-delivery-simulator-nwaku-1` resolve via Docker DNS. + +Scenarios covered (in order): + 1. HEALTH - every node responds to /debug/v1/info with an enrUri + 2. SUBSCRIBE - every node REST-subscribes to the pubsub topic + 3. WITHIN_LIMIT - every node concurrently sends msg_limit messages -> 200 + 4. PROPAGATION - one sender's message lands in all peers' inboxes + 5. OVER_LIMIT - one extra message per node -> 500 (rate-limit hit) + 6. EPOCH_RESET - after epoch_sec, every node can send 1 more -> 200 + 7. SAME_MESSAGE_ID - sending same message_id twice in same epoch is the + slashable signal (verified by checking node logs) + +Exit code: + 0 = all scenarios passed + N = number of scenarios that failed + +Usage (typical): + docker run --rm \\ + --network logos-delivery-simulator_simulation \\ + -v /path/to/rln-e2e-test.py:/test.py \\ + python:3.11-slim \\ + sh -c 'pip install --quiet requests && python /test.py \\ + --hostname-prefix logos-delivery-simulator-nwaku- \\ + --num-nodes 30 --msg-limit 30 --epoch-sec 15' +""" + +import argparse +import base64 +import concurrent.futures as cf +import json +import os +import sys +import time +import urllib.parse +from dataclasses import dataclass +from typing import Optional + +import requests + +PUBSUB_TOPIC = "/waku/2/rs/66/0" +CONTENT_TOPIC = "/rln-test/1/probe/proto" + +# --------------------------------------------------------------------------- +# helpers +# --------------------------------------------------------------------------- + +def url_of(host: str, port: int = 8645) -> str: + return f"http://{host}:{port}" + + +def waku_publish(node_url: str, payload: bytes, timeout: float = 5.0) -> int: + body = { + "payload": base64.b64encode(payload).decode("ascii"), + "contentTopic": CONTENT_TOPIC, + "version": 1, + "timestamp": time.time_ns(), + } + enc = urllib.parse.quote(PUBSUB_TOPIC, safe="") + try: + r = requests.post( + f"{node_url}/relay/v1/messages/{enc}", + json=body, + timeout=timeout, + headers={"content-type": "application/json"}, + ) + return r.status_code + except requests.RequestException: + return -1 + + +def waku_subscribe(node_url: str, timeout: float = 5.0) -> int: + try: + r = requests.post( + f"{node_url}/relay/v1/subscriptions", + json=[PUBSUB_TOPIC], + timeout=timeout, + headers={"content-type": "application/json"}, + ) + return r.status_code + except requests.RequestException: + return -1 + + +def waku_get_messages(node_url: str, timeout: float = 5.0) -> Optional[list]: + enc = urllib.parse.quote(PUBSUB_TOPIC, safe="") + try: + r = requests.get( + f"{node_url}/relay/v1/messages/{enc}", + timeout=timeout, + ) + if r.status_code != 200: + return None + return r.json() + except (requests.RequestException, json.JSONDecodeError): + return None + + +def node_healthy(node_url: str, timeout: float = 3.0) -> bool: + try: + r = requests.get(f"{node_url}/debug/v1/info", timeout=timeout) + return r.status_code == 200 and "enrUri" in r.json() + except (requests.RequestException, json.JSONDecodeError): + return False + + +# --------------------------------------------------------------------------- +# scenarios +# --------------------------------------------------------------------------- + +@dataclass +class Result: + name: str + ok: bool + detail: str = "" + + def __str__(self) -> str: + status = "PASS" if self.ok else "FAIL" + s = f"[{status}] {self.name}" + if self.detail: + s += f" — {self.detail}" + return s + + +def scenario_health(nodes: list[str], deadline_sec: int = 120) -> Result: + """Every node must be reachable within deadline_sec.""" + start = time.time() + unhealthy = list(nodes) + while time.time() - start < deadline_sec and unhealthy: + with cf.ThreadPoolExecutor(max_workers=min(32, len(unhealthy))) as ex: + results = list(ex.map(node_healthy, [url_of(n) for n in unhealthy])) + unhealthy = [n for n, ok in zip(unhealthy, results) if not ok] + if unhealthy: + time.sleep(3) + return Result( + "HEALTH", + not unhealthy, + f"{len(nodes) - len(unhealthy)}/{len(nodes)} healthy" + + (f"; failing: {unhealthy[:5]}" if unhealthy else ""), + ) + + +def scenario_subscribe(nodes: list[str]) -> Result: + """REST-subscribe every node to the pubsub topic so GETs return cached msgs.""" + with cf.ThreadPoolExecutor(max_workers=min(32, len(nodes))) as ex: + codes = list(ex.map(waku_subscribe, [url_of(n) for n in nodes])) + bad = [(n, c) for n, c in zip(nodes, codes) if c != 200] + return Result( + "SUBSCRIBE", + not bad, + f"{len(nodes) - len(bad)}/{len(nodes)} subscribed" + + (f"; failing: {bad[:5]}" if bad else ""), + ) + + +def _send_n(node_url: str, n: int) -> list[int]: + codes = [] + for i in range(n): + codes.append(waku_publish(node_url, f"probe-{i}".encode())) + return codes + + +def _burst_until_blocked(node_url: str, msg_limit: int, overshoot: int = 3): + """Send msg_limit+overshoot messages back-to-back, fast, recording codes. + Designed to complete inside a single epoch — keep epoch_sec large enough + that this burst can't straddle an epoch boundary. + + Returns (n_200, n_500, n_transport_err, two_hundred_after_block) where + two_hundred_after_block flags a 200 appearing AFTER the first 500 (i.e. + quota reset mid-burst => epoch straddle).""" + codes = [] + for i in range(msg_limit + overshoot): + codes.append(waku_publish(node_url, f"burst-{i}".encode(), timeout=10.0)) + n_200 = sum(c == 200 for c in codes) + n_500 = sum(c == 500 for c in codes) + n_err = sum(c not in (200, 500) for c in codes) # -1, 4xx transient, etc. + first_block_idx = next((i for i, c in enumerate(codes) if c == 500), None) + two_hundred_after_block = ( + first_block_idx is not None + and any(c == 200 for c in codes[first_block_idx + 1:]) + ) + return n_200, n_500, n_err, two_hundred_after_block + + +def _publish_until_ok(node_url: str, attempts: int = 20, spacing: float = 5.0) -> bool: + """Retry a single publish until it returns 200 or attempts run out. + Tolerates the post-startup window where discv5/gossipsub mesh is still + forming and the RLN publish path transiently 500s.""" + for _ in range(attempts): + if waku_publish(node_url, b"warmup", timeout=10.0) == 200: + return True + time.sleep(spacing) + return False + + +def scenario_warmup(nodes: list[str], attempts: int = 20) -> Result: + """Readiness gate: every node must successfully publish at least once. + This absorbs mesh-formation churn so PROPAGATION/RATE_LIMIT aren't + judging a not-yet-connected fleet. Consumes 1 nonce/node — well within + msg_limit, and RATE_LIMIT's tolerance accounts for it.""" + with cf.ThreadPoolExecutor(max_workers=min(8, len(nodes))) as ex: + ready = list(ex.map(lambda n: _publish_until_ok(url_of(n), attempts), nodes)) + not_ready = [n for n, ok in zip(nodes, ready) if not ok] + return Result( + "WARMUP", + not not_ready, + f"{len(nodes) - len(not_ready)}/{len(nodes)} nodes publishing" + + (f"; never ready: {not_ready[:5]}" if not_ready else ""), + ) + + +def scenario_rate_limit(nodes: list[str], msg_limit: int, tolerance: int = 3) -> Result: + """Per-node burst of msg_limit+3 messages within one epoch. + + The RLN invariant being checked: + (a) a node must NEVER publish more than msg_limit in one epoch, and + (b) the node must enforce a 500 ceiling once the quota is exhausted. + + Transient HTTP errors under concurrent load can lower the accepted count + below msg_limit — that does NOT violate the invariant, so we accept + successes in [msg_limit - tolerance, msg_limit]. successes > msg_limit OR + a 200 after the first 500 means the epoch rolled mid-burst (raise + RLN_RELAY_EPOCH_SEC) — reported as a timing skew, not an RLN failure.""" + # Cap concurrency: firing len(nodes)*(msg_limit+3) publishes all at once + # saturates small CI runners (2 vCPU) and causes publish-path timeouts + # that masquerade as rate-limit failures. + with cf.ThreadPoolExecutor(max_workers=min(5, len(nodes))) as ex: + per_node = list( + ex.map(lambda n: _burst_until_blocked(url_of(n), msg_limit), nodes) + ) + + rate_failures = [] # genuine RLN misbehaviour + timing_skews = [] # epoch straddled mid-burst — inconclusive + for node, (n_200, n_500, n_err, after_block) in zip(nodes, per_node): + if n_200 > msg_limit or after_block: + timing_skews.append( + (node, f"{n_200} ok, epoch rolled mid-burst (raise epoch_sec)") + ) + elif n_500 == 0: + rate_failures.append((node, f"no 500 ceiling ({n_200} ok, {n_err} err)")) + elif n_200 < msg_limit - tolerance: + rate_failures.append( + (node, f"only {n_200}/{msg_limit} ok ({n_err} transport err)") + ) + + if timing_skews and not rate_failures: + return Result( + "RATE_LIMIT", + False, + f"INCONCLUSIVE (timing) — raise RLN_RELAY_EPOCH_SEC; " + f"{len(timing_skews)} node(s) straddled an epoch: {timing_skews[:3]}", + ) + ok = not rate_failures and not timing_skews + good = len(nodes) - len(rate_failures) - len(timing_skews) + return Result( + "RATE_LIMIT", + ok, + f"{good}/{len(nodes)} nodes enforced <= {msg_limit} then 500 " + f"(tolerance {tolerance} for transport noise)" + + (f"; rate failures: {rate_failures[:3]}" if rate_failures else "") + + (f"; timing skews: {timing_skews[:3]}" if timing_skews else ""), + ) + + +def scenario_propagation( + sender: str, receivers: list[str], settle_sec: int = 5 +) -> Result: + """Send one message on `sender`, expect it visible in every receiver's + REST inbox within settle_sec.""" + marker = f"propagation-marker-{time.time_ns()}".encode() + code = waku_publish(url_of(sender), marker) + if code != 200: + return Result("PROPAGATION", False, f"sender publish returned {code}") + + time.sleep(settle_sec) + missing = [] + with cf.ThreadPoolExecutor(max_workers=min(32, len(receivers))) as ex: + inboxes = list(ex.map(waku_get_messages, [url_of(r) for r in receivers])) + + encoded_marker = base64.b64encode(marker).decode().rstrip("=") + for r, inbox in zip(receivers, inboxes): + if inbox is None: + missing.append((r, "GET failed")) + continue + # Look for our marker payload in any message + found = any( + (m.get("payload") or "").rstrip("=") == encoded_marker + for m in inbox + ) + if not found: + missing.append((r, f"{len(inbox)} msgs, marker not present")) + + return Result( + "PROPAGATION", + not missing, + f"{len(receivers) - len(missing)}/{len(receivers)} receivers got the message" + + (f"; missing on {missing[:3]}" if missing else ""), + ) + + +def scenario_epoch_reset(nodes: list[str], epoch_sec: int) -> Result: + """After epoch_sec + slack, each node can send 1 more message — expect 200.""" + sleep_s = epoch_sec + 3 + print(f" sleeping {sleep_s}s for epoch reset...") + time.sleep(sleep_s) + with cf.ThreadPoolExecutor(max_workers=len(nodes)) as ex: + codes = list( + ex.map( + lambda n: waku_publish(url_of(n), b"post-epoch"), + nodes, + ) + ) + bad = [(n, c) for n, c in zip(nodes, codes) if c != 200] + return Result( + "EPOCH_RESET", + not bad, + f"{sum(c == 200 for c in codes)}/{len(nodes)} returned 200 after epoch reset" + + (f"; failing: {bad[:3]}" if bad else ""), + ) + + +# --------------------------------------------------------------------------- +# main +# --------------------------------------------------------------------------- + +def main() -> int: + ap = argparse.ArgumentParser(description=__doc__) + ap.add_argument("--hostname-prefix", default="logos-delivery-simulator-nwaku-") + ap.add_argument("--num-nodes", type=int, default=30) + ap.add_argument("--msg-limit", type=int, default=30, + help="Must match RLN_RELAY_MSG_LIMIT in simulator .env") + ap.add_argument("--epoch-sec", type=int, default=15, + help="Must match RLN_RELAY_EPOCH_SEC in simulator .env") + ap.add_argument("--health-deadline-sec", type=int, default=180) + args = ap.parse_args() + + nodes = [f"{args.hostname_prefix}{i}" for i in range(1, args.num_nodes + 1)] + print(f"Testing {len(nodes)} nodes: {nodes[0]} … {nodes[-1]}") + print(f"Config: msg_limit={args.msg_limit}, epoch_sec={args.epoch_sec}") + print() + + results: list[Result] = [] + + def run(scenario_fn, *fn_args, **fn_kwargs) -> bool: + r = scenario_fn(*fn_args, **fn_kwargs) + results.append(r) + print(r) + return r.ok + + if not run(scenario_health, nodes, deadline_sec=args.health_deadline_sec): + print("\nABORTING — nodes never reached healthy state.") + return _summarize(results) + + if not run(scenario_subscribe, nodes): + print("\nABORTING — could not subscribe nodes to pubsub topic.") + return _summarize(results) + + # Readiness gate: wait out mesh-formation churn before judging behaviour. + if not run(scenario_warmup, nodes): + print("\nABORTING — fleet never reached a publishable state.") + return _summarize(results) + + run(scenario_propagation, nodes[0], nodes[1:]) + # Rate limit: per-node burst, asserts exactly msg_limit then 500. + # Requires epoch_sec large enough that the burst can't straddle an epoch. + run(scenario_rate_limit, nodes, args.msg_limit) + run(scenario_epoch_reset, nodes, args.epoch_sec) + + return _summarize(results) + + +def _summarize(results: list[Result]) -> int: + print() + print("=" * 64) + passed = sum(r.ok for r in results) + print(f" {passed}/{len(results)} scenarios passed") + for r in results: + print(f" {r}") + print("=" * 64) + return len(results) - passed + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/tests/simulator/rln-sim.env b/tests/simulator/rln-sim.env new file mode 100644 index 000000000..ec86f1eea --- /dev/null +++ b/tests/simulator/rln-sim.env @@ -0,0 +1,6 @@ +# Source of truth for the RLN simulator E2E run (ci-rln-simulator.yml). +# workflow_dispatch inputs override any value here per-run (blank input = use this file). +BRANCH=master +NUM_NODES=6 +MSG_LIMIT=30 +EPOCH_SEC=120 From c738c7b65ed0a3b7ce45c201c1d83038aabc0231 Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Fri, 22 May 2026 14:32:54 +0100 Subject: [PATCH 2/7] fix: accept port 0 in JSON config (ephemeral port support) (#3895) * chore: pin confutils to merged upstream commit status-im/nim-confutils#146 is merged; move the confutils pin from the PR fork back to status-im/nim-confutils master (36f3115). Content is identical to the fork commit, so nimble sha1 and nix sha256 are unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- nimble.lock | 4 ++-- nix/deps.nix | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/nimble.lock b/nimble.lock index 7a36d72c4..cd533001e 100644 --- a/nimble.lock +++ b/nimble.lock @@ -250,7 +250,7 @@ }, "confutils": { "version": "0.1.0", - "vcsRevision": "7728f6bd81a1eedcfe277d02ea85fdb805bcc05a", + "vcsRevision": "36f3115ca350f40841ac0eecc7dfa5fe7790c864", "url": "https://github.com/status-im/nim-confutils", "downloadMethod": "git", "dependencies": [ @@ -260,7 +260,7 @@ "results" ], "checksums": { - "sha1": "8bc8c30b107fdba73b677e5f257c6c42ae1cdc8e" + "sha1": "2fbe6418ddd9f79fb11a0addd7666a3e787adbe0" } }, "cbor_serialization": { diff --git a/nix/deps.nix b/nix/deps.nix index 63eeb597a..8e4453675 100644 --- a/nix/deps.nix +++ b/nix/deps.nix @@ -124,8 +124,8 @@ confutils = pkgs.fetchgit { url = "https://github.com/status-im/nim-confutils"; - rev = "7728f6bd81a1eedcfe277d02ea85fdb805bcc05a"; - sha256 = "18bj1ilx10jm2vmqx2wy2xl9rzy7alymi2m4n9jgpa4sbxnfh0x3"; + rev = "36f3115ca350f40841ac0eecc7dfa5fe7790c864"; + sha256 = "1vppqplwlpl7a61r8iki5hlzvhd8lnq41ixpqslv35dnm482c55j"; fetchSubmodules = true; }; From 4b80c7762d28d0d2142cff329bc747e56948d44b Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Mon, 25 May 2026 18:22:04 -0300 Subject: [PATCH 3/7] Point CLAUDE.md to (updated) AGENTS.md * Point CLAUDE.md to (updated) AGENTS.md * add cluster id, shard id, autosharding * add in-flight testing section Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> --- AGENTS.md | 53 ++++++++++++++++++++++++++++++----------------------- CLAUDE.md | 1 + 2 files changed, 31 insertions(+), 23 deletions(-) create mode 100644 CLAUDE.md diff --git a/AGENTS.md b/AGENTS.md index 28e455f47..ff7d29a39 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -16,7 +16,7 @@ Key architectural decisions: Resource-restricted first: Protocols differentiate between full nodes (relay) and light clients (filter, lightpush, store). Light clients can participate without maintaining full message history or relay capabilities. This explains the client/server split in protocol implementations. -Privacy through unlinkability: RLN (Rate Limiting Nullifier) provides DoS protection while preserving sender anonymity. Messages are routed through pubsub topics with automatic sharding across 8 shards. Code prioritizes metadata privacy alongside content encryption. +Privacy through unlinkability: RLN (Rate Limiting Nullifier) provides DoS protection while preserving sender anonymity. Messages are routed through pubsub topics with automatic content-topic-based sharding (shard count is configurable; generation-zero defaults to 8 shards on cluster 0). Code prioritizes metadata privacy alongside content encryption. Scalability via sharding: The network uses automatic content-topic-based sharding to distribute traffic. This is why you'll see sharding logic throughout the codebase and why pubsub topic selection is protocol-level, not application-level. @@ -36,7 +36,10 @@ See [documentation](https://docs.waku.org/learn/) for architectural details. ### Key Terminology - ENR (Ethereum Node Record): Node identity and capability advertisement - Multiaddr: libp2p addressing format (e.g., `/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2...`) -- PubsubTopic: Gossipsub topic for message routing (e.g., `/waku/2/default-waku/proto`) +- PubsubTopic: Gossipsub topic for message routing (shard-based, e.g., `/waku/2/rs//`; the default is `/waku/2/rs/0/0`) + - cluster-id: network id + - shard-id: shard differentiator inside the network - drivers mesh forming. + - autosharding: network supports n (configured) shards [0..n-1], shard derived from ContentTopic - ContentTopic: Application-level message categorization (e.g., `/my-app/1/chat/proto`) - Sharding: Partitioning network traffic across topics (static or auto-sharding) - RLN (Rate Limiting Nullifier): Zero-knowledge proof system for spam prevention @@ -77,29 +80,29 @@ type WakuFilter* = ref object of LPProtocol ### Build Requirements - Nim 2.x (check `waku.nimble` for minimum version) - Rust toolchain (required for RLN dependencies) -- Build system: Make with nimbus-build-system +- Build system: Make driven by Nimble (dependencies pinned in `nimble.lock`) ### Build System -The project uses Makefile with nimbus-build-system (Status's Nim build framework): +The project uses a Makefile that drives Nimble. Dependencies are resolved from +`nimble.lock` into a local `nimbledeps/` directory (tracked by the +`NIMBLEDEPS_STAMP` target). ```bash -# Initial build (updates submodules) +# Initial build (resolves Nimble deps automatically) make wakunode2 -# After git pull, update submodules -make update - # Build with custom flags make wakunode2 NIMFLAGS="-d:chronicles_log_level=DEBUG" ``` -Note: The build system uses `--mm:refc` memory management (automatically enforced). Only relevant if compiling outside the standard build system. +Note: The build uses `--mm:refc` memory management (passed automatically by the Nimble tasks in `waku.nimble`). Only relevant if compiling outside the standard build system. ### Common Make Targets ```bash make wakunode2 # Build main node binary make test # Run all tests make testcommon # Run common tests only -make libwakuStatic # Build static C library +make libwaku # Build the legacy C library (libwaku) +make liblogosdelivery. # Build actual C FFI library make chat2 # Build chat example make install-nph # Install git hook for auto-formatting ``` @@ -127,7 +130,7 @@ suite "Waku ENR - Capabilities": test "check capabilities support": ## Given let bitfield: CapabilitiesBitfield = 0b0000_1101u8 - + ## Then check: bitfield.supportsCapability(Capabilities.Relay) @@ -135,7 +138,7 @@ suite "Waku ENR - Capabilities": ``` ### Code Formatting -Mandatory: All code must be formatted with `nph` (vendored in `vendor/nph`) +Mandatory: All code must be formatted with `nph` (installed via `make build-nph`, which fetches a pinned `nph` version with Nimble) ```bash # Format specific file make nph/waku/waku_core.nim @@ -162,7 +165,6 @@ Compile with log level: nim c -d:chronicles_log_level=TRACE myfile.nim ``` - ## Code Conventions Common pitfalls: @@ -181,8 +183,13 @@ Common pitfalls: - Exceptions: `XxxError` for CatchableError, `XxxDefect` for Defect - ref object types: `XxxRef` suffix +### Calls and Member Access +- Prefer dot call syntax for predicates: `x.isNil()` instead of `isNil(x)` +- Use parentheses for "verbs" (operations/actions): `isSome()`, `handleRequest()` +- Omit parentheses for "nouns" (properties/values): `.len`, `.high` + ### Imports Organization -Group imports: stdlib, external libs, internal modules: +Stdlib + external in one `import` block, internal modules in a separate block: ```nim import std/[options, sequtils], # stdlib @@ -214,11 +221,11 @@ proc subscribe( ): Future[FilterSubscribeResult] {.async.} = if contentTopics.len > MaxContentTopicsPerRequest: return err(FilterSubscribeError.badRequest("exceeds maximum")) - + # Handle Result with isOkOr (await wf.subscriptions.addSubscription(peerId, criteria)).isOkOr: return err(FilterSubscribeError.serviceUnavailable(error)) - + ok() ``` @@ -460,8 +467,7 @@ nim c -r \ ### Vendor Directory - Never edit files directly in vendor - it is auto-generated from git submodules -- Always run `make update` after pulling changes -- Managed by `nimbus-build-system` +- Nimble dependencies are resolved from `nimble.lock` into `nimbledeps/` ### Chronicles Performance - Log levels are configured at compile time for performance @@ -475,7 +481,7 @@ nim c -r \ ### RLN Dependencies - RLN code requires a Rust toolchain, which explains Rust imports in some modules -- Pre-built `librln` libraries are checked into the repository +- `librln` is built from the vendored `zerokit` submodule via the `librln`/`rln-deps` Make targets ## Quick Reference @@ -483,18 +489,19 @@ Language: Nim 2.x | License: MIT or Apache 2.0 ### Important Files - `Makefile` - Primary build interface -- `waku.nimble` - Package definition and build tasks (called via nimbus-build-system) -- `vendor/nimbus-build-system/` - Status's build framework +- `waku.nimble` - Package definition and build tasks (invoked by the Makefile via Nimble) +- `nimble.lock` - Pinned dependency versions resolved into `nimbledeps/` - `waku/node/waku_node.nim` - Core node implementation - `apps/wakunode2/wakunode2.nim` - Main CLI application - `waku/factory/waku_conf.nim` - Configuration types -- `library/libwaku.nim` - C bindings entry point +- `liblogosdelivery/liblogosdelivery.nim` - C bindings entry point ### Testing Entry Points - `tests/all_tests_waku.nim` - All Waku protocol tests - `tests/all_tests_wakunode2.nim` - Node application tests - `tests/all_tests_common.nim` - Common utilities tests - +#### in-flight testing +- any test can be run separately by issuing `make test tests//.nim` ### Key Dependencies - `chronos` - Async framework - `nim-results` - Result type for error handling diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..43c994c2d --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +@AGENTS.md From 79dda6375807624cd1ba04b562cf40e6aeaf6807 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Tue, 26 May 2026 16:22:10 +0200 Subject: [PATCH 4/7] Recover wakucanary in nix output (#3892) --- flake.nix | 9 ++- nix/default.nix | 149 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 108 insertions(+), 50 deletions(-) diff --git a/flake.nix b/flake.nix index 077668b9a..6c283780d 100644 --- a/flake.nix +++ b/flake.nix @@ -89,8 +89,15 @@ inherit zerokitRln; gitVersion = "v${nimbleVersion}-g${builtins.substring 0 6 shortRev}"; }; + + wakucanary = pkgs.callPackage ./nix/default.nix { + inherit pkgs; + src = ./.; + targets = ["wakucanary"]; + zerokitRln = zerokit.packages.${system}.rln; + }; in { - inherit liblogosdelivery; + inherit liblogosdelivery wakucanary; # Expose the cargoHash-corrected librln so downstream consumers # (e.g. logos-delivery-module) bundle the exact same librln this # build links, instead of pulling zerokit's rln directly — whose diff --git a/nix/default.nix b/nix/default.nix index 7b7989e1a..ec9e0542c 100644 --- a/nix/default.nix +++ b/nix/default.nix @@ -1,6 +1,7 @@ { pkgs , src , zerokitRln +, targets ? [] , gitVersion ? "n/a" , enablePostgres ? true , enableNimDebugDlOpen ? true @@ -10,6 +11,8 @@ let deps = import ./deps.nix { inherit pkgs; }; + buildWakucanary = builtins.elem "wakucanary" targets; + nimDefineArgs = pkgs.lib.concatStringsSep " \\\n " ( [ "--define:disable_libbacktrace" "--define:git_version=${gitVersion}" ] @@ -34,9 +37,29 @@ let if pkgs.stdenv.hostPlatform.isWindows then "dll" else if pkgs.stdenv.hostPlatform.isDarwin then "dylib" else "so"; + + # Shared `nim c` invocation. Callers vary the output, the source file and a + # few mode-specific flags (e.g. --app:lib, --noMain, --header); everything + # else (paths, defines, threading, gc, nimcache, rln linkage) is constant. + # $NAT_TRAV and $NIMCACHE are shell variables defined in buildPhase. + nimCompile = { outFile, sourceFile, extraArgs ? [] }: '' + nim c \ + --noNimblePath \ + ${pathArgs} \ + --path:$NAT_TRAV \ + --path:$NAT_TRAV/src \ + --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ + ${nimDefineArgs} \ + --threads:on \ + --mm:refc \ + --nimcache:$NIMCACHE \ + --out:${outFile} \ + ${pkgs.lib.concatStringsSep " \\\n " extraArgs} \ + ${sourceFile} + ''; in pkgs.stdenv.mkDerivation { - pname = "liblogosdelivery"; + pname = if buildWakucanary then "wakucanary" else "liblogosdelivery"; version = "dev"; inherit src; @@ -71,45 +94,47 @@ pkgs.stdenv.mkDerivation { make -C $NAT_TRAV/vendor/libnatpmp-upstream \ CFLAGS="-Wall -Os -fPIC -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4" libnatpmp.a + ${if buildWakucanary then '' + echo "== Building wakucanary ==" + ${nimCompile { + outFile = "build/wakucanary"; + sourceFile = "apps/wakucanary/wakucanary.nim"; + extraArgs = [ "--path:." ]; + }} + '' else '' echo "== Building liblogosdelivery (dynamic) ==" - nim c \ - --noNimblePath \ - ${pathArgs} \ - --path:$NAT_TRAV \ - --path:$NAT_TRAV/src \ - --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ - ${nimDefineArgs} \ - --out:build/liblogosdelivery.${libExt} \ - --app:lib \ - --threads:on \ - --opt:size \ - --noMain \ - --mm:refc \ - --header \ - --nimMainPrefix:liblogosdelivery \ - --nimcache:$NIMCACHE \ - liblogosdelivery/liblogosdelivery.nim + ${nimCompile { + outFile = "build/liblogosdelivery.${libExt}"; + sourceFile = "liblogosdelivery/liblogosdelivery.nim"; + extraArgs = [ + "--app:lib" + "--opt:size" + "--noMain" + "--header" + "--nimMainPrefix:liblogosdelivery" + ]; + }} echo "== Building liblogosdelivery (static) ==" - nim c \ - --noNimblePath \ - ${pathArgs} \ - --path:$NAT_TRAV \ - --path:$NAT_TRAV/src \ - --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ - ${nimDefineArgs} \ - --out:build/liblogosdelivery.a \ - --app:staticlib \ - --threads:on \ - --opt:size \ - --noMain \ - --mm:refc \ - --nimMainPrefix:liblogosdelivery \ - --nimcache:$NIMCACHE \ - liblogosdelivery/liblogosdelivery.nim + ${nimCompile { + outFile = "build/liblogosdelivery.a"; + sourceFile = "liblogosdelivery/liblogosdelivery.nim"; + extraArgs = [ + "--app:staticlib" + "--opt:size" + "--noMain" + "--nimMainPrefix:liblogosdelivery" + ]; + }} + ''} ''; - installPhase = '' + installPhase = if buildWakucanary then '' + runHook preInstall + mkdir -p $out/bin $out/lib + cp build/wakucanary $out/bin/ + runHook postInstall + '' else '' runHook preInstall mkdir -p $out/lib $out/include cp build/liblogosdelivery.${libExt} $out/lib/ 2>/dev/null || true @@ -118,21 +143,47 @@ pkgs.stdenv.mkDerivation { runHook postInstall ''; - # Bundle librln alongside liblogosdelivery so the output is self-contained. + # Bundle librln alongside the produced artifact so the output is self-contained. # Use --add-rpath (not --set-rpath) so fixupPhase's stdenv RUNPATH injection # for libstdc++ is preserved. postInstall = - pkgs.lib.optionalString pkgs.stdenv.isDarwin '' - cp ${zerokitRln}/lib/librln.dylib $out/lib/ - chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib - old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln) - install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib - '' - + pkgs.lib.optionalString pkgs.stdenv.isLinux '' - cp ${zerokitRln}/lib/librln.so $out/lib/ - patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so - ''; + if buildWakucanary then + pkgs.lib.optionalString pkgs.stdenv.isDarwin '' + cp ${zerokitRln}/lib/librln.dylib $out/lib/ + chmod +w $out/lib/librln.dylib $out/bin/wakucanary + install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib + old=$(otool -L $out/bin/wakucanary | awk 'NR>1{print $1}' | grep librln || true) + if [ -n "$old" ]; then + install_name_tool -change "$old" @rpath/librln.dylib $out/bin/wakucanary + fi + install_name_tool -add_rpath @loader_path/../lib $out/bin/wakucanary + '' + + pkgs.lib.optionalString pkgs.stdenv.isLinux '' + cp ${zerokitRln}/lib/librln.so $out/lib/ + patchelf --add-rpath '$ORIGIN/../lib' $out/bin/wakucanary + '' + else + pkgs.lib.optionalString pkgs.stdenv.isDarwin '' + cp ${zerokitRln}/lib/librln.dylib $out/lib/ + chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib + old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln) + install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib + '' + + pkgs.lib.optionalString pkgs.stdenv.isLinux '' + cp ${zerokitRln}/lib/librln.so $out/lib/ + patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so + ''; + + meta = with pkgs.lib; { + description = + if buildWakucanary + then "Waku network canary tool" + else "logos-delivery shared/static library"; + homepage = "https://github.com/logos-messaging/logos-delivery"; + license = licenses.mit; + platforms = platforms.unix; + }; } From 8b53e64379f782acb9104a07539d6f7b2ee8a077 Mon Sep 17 00:00:00 2001 From: Tanya S <120410716+stubbsta@users.noreply.github.com> Date: Wed, 27 May 2026 10:40:54 +0200 Subject: [PATCH 5/7] Remove makefile target update (#3897) * Remove makefile target update * fix: set execute permission on install_nimble.sh * improve install_nim script * skip second nim install on Windows * fix path check in install-nim * Makefile workfile reordering --- BearSSL.mk | 15 +++------ Makefile | 68 ++++++++++++++++++------------------- Nat.mk | 19 +++-------- scripts/install_nim.sh | 44 +++++++++++++++--------- scripts/install_nimble.sh | 70 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 140 insertions(+), 76 deletions(-) create mode 100755 scripts/install_nimble.sh diff --git a/BearSSL.mk b/BearSSL.mk index 355e46563..65e4f72a7 100644 --- a/BearSSL.mk +++ b/BearSSL.mk @@ -9,7 +9,7 @@ ## bearssl (nimbledeps) ## ########################### # Rebuilds libbearssl.a from the package installed by nimble under -# nimbledeps/pkgs2/. Used by `make update` / $(NIMBLEDEPS_STAMP). +# nimbledeps/pkgs2/. Invoked via $(NIMBLEDEPS_STAMP) / build-deps. # # BEARSSL_NIMBLEDEPS_DIR is evaluated at parse time, so targets that # depend on it must be invoked via a recursive $(MAKE) call so the sub-make @@ -29,18 +29,11 @@ else PORTABLE_BEARSSL_CFLAGS := -W -Wall -Os -fPIC endif -.PHONY: clean-bearssl-nimbledeps rebuild-bearssl-nimbledeps +.PHONY: rebuild-bearssl-nimbledeps -clean-bearssl-nimbledeps: +rebuild-bearssl-nimbledeps: ifeq ($(BEARSSL_NIMBLEDEPS_DIR),) - $(error No bearssl package found under nimbledeps/pkgs2/ — run 'make update' first) -endif - + [ -e "$(BEARSSL_CSOURCES_DIR)/build" ] && \ - "$(MAKE)" -C "$(BEARSSL_CSOURCES_DIR)" clean || true - -rebuild-bearssl-nimbledeps: | clean-bearssl-nimbledeps -ifeq ($(BEARSSL_NIMBLEDEPS_DIR),) - $(error No bearssl package found under nimbledeps/pkgs2/ — run 'make update' first) + $(error No bearssl package found under nimbledeps/pkgs2/ — run 'make build-deps' first) endif @echo "Rebuilding bearssl from $(BEARSSL_CSOURCES_DIR)" + "$(MAKE)" -C "$(BEARSSL_CSOURCES_DIR)" CFLAGS="$(PORTABLE_BEARSSL_CFLAGS)" lib \ No newline at end of file diff --git a/Makefile b/Makefile index f147c5e7e..0515ef3ec 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,8 @@ endif ########## ## Main ## ########## -.PHONY: all test update clean examples deps nimble install-nim install-nimble +# The Makefile automatically bootstraps dependency setup when needed for build and test targets. +.PHONY: all test clean examples deps nimble install-nim install-nimble # default target all: | wakunode2 libwaku liblogosdelivery @@ -69,18 +70,16 @@ endif waku.nims: ln -s waku.nimble $@ -$(NIMBLEDEPS_STAMP): nimble.lock | waku.nims - $(MAKE) install-nimble +$(NIMBLEDEPS_STAMP): nimble.lock | install-nimble build-nph waku.nims nimble setup --localdeps - $(MAKE) build-nph - $(MAKE) rebuild-bearssl-nimbledeps - $(MAKE) rebuild-nat-libs-nimbledeps touch $@ -update: - rm -f $(NIMBLEDEPS_STAMP) - $(MAKE) $(NIMBLEDEPS_STAMP) - nimble lock +# Must be phony so the recipe always runs and the sub-make re-evaluates +# BEARSSL_NIMBLEDEPS_DIR / NAT_TRAVERSAL_NIMBLEDEPS_DIR (parse-time variables) +# after nimble setup has populated nimbledeps/. +.PHONY: build-deps +build-deps: | $(NIMBLEDEPS_STAMP) + $(MAKE) rebuild-bearssl-nimbledeps rebuild-nat-libs-nimbledeps clean: rm -rf build 2> /dev/null || true @@ -93,15 +92,14 @@ REQUIRED_NIM_VERSION := $(shell grep -E '^const RequiredNimVersion\s*=' waku. REQUIRED_NIMBLE_VERSION := $(shell grep -E '^const RequiredNimbleVersion\s*=' waku.nimble | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"') install-nim: +ifneq ($(detected_OS),Windows) scripts/install_nim.sh $(REQUIRED_NIM_VERSION) +endif install-nimble: install-nim - @nimble_ver=$$(nimble --version 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1); \ - if [ "$$nimble_ver" = "$(REQUIRED_NIMBLE_VERSION)" ]; then \ - echo "nimble $(REQUIRED_NIMBLE_VERSION) already installed, skipping."; \ - else \ - cd $$(mktemp -d) && nimble install "nimble@$(REQUIRED_NIMBLE_VERSION)" -y; \ - fi +ifneq ($(detected_OS),Windows) + scripts/install_nimble.sh $(REQUIRED_NIMBLE_VERSION) +endif build: mkdir -p build @@ -203,7 +201,7 @@ clean: | clean-librln ################# .PHONY: testcommon -testcommon: | $(NIMBLEDEPS_STAMP) build +testcommon: | build-deps build echo -e $(BUILD_MSG) "build/$@" && \ nimble testcommon @@ -212,59 +210,59 @@ testcommon: | $(NIMBLEDEPS_STAMP) build ########## .PHONY: testwaku wakunode2 testwakunode2 example2 chat2 chat2bridge liteprotocoltester -testwaku: | $(NIMBLEDEPS_STAMP) build rln-deps librln +testwaku: | build-deps build rln-deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble test -wakunode2: | $(NIMBLEDEPS_STAMP) build deps librln +wakunode2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble wakunode2 -benchmarks: | $(NIMBLEDEPS_STAMP) build deps librln +benchmarks: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble benchmarks -testwakunode2: | $(NIMBLEDEPS_STAMP) build deps librln +testwakunode2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble testwakunode2 -example2: | $(NIMBLEDEPS_STAMP) build deps librln +example2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble example2 -chat2: | $(NIMBLEDEPS_STAMP) build deps librln +chat2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble chat2 -chat2mix: | $(NIMBLEDEPS_STAMP) build deps librln +chat2mix: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble chat2mix -rln-db-inspector: | $(NIMBLEDEPS_STAMP) build deps librln +rln-db-inspector: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble rln_db_inspector -chat2bridge: | $(NIMBLEDEPS_STAMP) build deps librln +chat2bridge: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble chat2bridge -liteprotocoltester: | $(NIMBLEDEPS_STAMP) build deps librln +liteprotocoltester: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble liteprotocoltester -lightpushwithmix: | $(NIMBLEDEPS_STAMP) build deps librln +lightpushwithmix: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble lightpushwithmix -api_example: | $(NIMBLEDEPS_STAMP) build deps librln +api_example: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim api_example $(NIM_PARAMS) waku.nims -build/%: | $(NIMBLEDEPS_STAMP) build deps librln +build/%: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$*" && \ nimble buildone $* -compile-test: | $(NIMBLEDEPS_STAMP) build deps librln +compile-test: | build-deps build deps librln echo -e $(BUILD_MSG) "$(TEST_FILE)" "\"$(TEST_NAME)\"" && \ nimble buildTest $(TEST_FILE) && \ nimble execTest $(TEST_FILE) "\"$(TEST_NAME)\"" @@ -276,11 +274,11 @@ compile-test: | $(NIMBLEDEPS_STAMP) build deps librln tools: networkmonitor wakucanary -wakucanary: | $(NIMBLEDEPS_STAMP) build deps librln +wakucanary: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble wakucanary -networkmonitor: | $(NIMBLEDEPS_STAMP) build deps librln +networkmonitor: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble networkmonitor @@ -424,10 +422,10 @@ else ifeq ($(detected_OS),Linux) BUILD_COMMAND := $(BUILD_COMMAND)Linux endif -libwaku: | $(NIMBLEDEPS_STAMP) librln +libwaku: | build-deps librln nimble --verbose libwaku$(BUILD_COMMAND) waku.nimble -liblogosdelivery: | $(NIMBLEDEPS_STAMP) librln +liblogosdelivery: | build-deps librln nimble --verbose liblogosdelivery$(BUILD_COMMAND) waku.nimble logosdelivery_example: | build liblogosdelivery diff --git a/Nat.mk b/Nat.mk index 90d0b2ead..1161121ba 100644 --- a/Nat.mk +++ b/Nat.mk @@ -9,7 +9,7 @@ ## nat-libs (nimbledeps) ## ########################### # Builds miniupnpc and libnatpmp from the package installed by nimble under -# nimbledeps/pkgs2/. Used by `make update` / $(NIMBLEDEPS_STAMP). +# nimbledeps/pkgs2/. Invoked via $(NIMBLEDEPS_STAMP) / build-deps. # # NAT_TRAVERSAL_NIMBLEDEPS_DIR is evaluated at parse time, so targets that # depend on it must be invoked via a recursive $(MAKE) call so the sub-make @@ -28,20 +28,11 @@ else PORTABLE_NAT_MARCH := endif -.PHONY: clean-cross-nimbledeps rebuild-nat-libs-nimbledeps +.PHONY: rebuild-nat-libs-nimbledeps -clean-cross-nimbledeps: +rebuild-nat-libs-nimbledeps: ifeq ($(NAT_TRAVERSAL_NIMBLEDEPS_DIR),) - $(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make update' first) -endif - + [ -e "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/miniupnp/miniupnpc" ] && \ - "$(MAKE)" -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/miniupnp/miniupnpc" CC=$(CC) clean $(HANDLE_OUTPUT) || true - + [ -e "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" ] && \ - "$(MAKE)" -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" CC=$(CC) clean $(HANDLE_OUTPUT) || true - -rebuild-nat-libs-nimbledeps: | clean-cross-nimbledeps -ifeq ($(NAT_TRAVERSAL_NIMBLEDEPS_DIR),) - $(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make update' first) + $(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make build-deps' first) endif @echo "Rebuilding nat-libs from $(NAT_TRAVERSAL_NIMBLEDEPS_DIR)" ifeq ($(OS), Windows_NT) @@ -58,4 +49,4 @@ else + "$(MAKE)" CFLAGS="-Wall -Wno-cpp -Os -fPIC $(PORTABLE_NAT_MARCH) -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4 $(CFLAGS)" \ -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" \ CC=$(CC) libnatpmp.a $(HANDLE_OUTPUT) -endif +endif \ No newline at end of file diff --git a/scripts/install_nim.sh b/scripts/install_nim.sh index c8d0f439d..42aa88ecd 100755 --- a/scripts/install_nim.sh +++ b/scripts/install_nim.sh @@ -17,26 +17,36 @@ if [ -z "${NIM_VERSION}" ]; then exit 1 fi -# Check if the right version is already installed +NIM_DEST="${HOME}/.nim/nim-${NIM_VERSION}" + +# 1. A matching Nim is already on PATH (e.g. provided by CI's setup-nim-action, +# choosenim, or a previous run of this script). Use it as-is: installing over it +# would symlink a freshly downloaded Nim into ~/.nimble/bin (first on PATH) and +# shadow a known-good toolchain, which has caused C-backend build failures. nim_ver=$(nim --version 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1 || true) if [ "${nim_ver}" = "${NIM_VERSION}" ]; then - echo "Nim ${NIM_VERSION} already installed, skipping." + echo "Nim ${NIM_VERSION} already on PATH ($(command -v nim)), skipping install." + exit 0 +fi + +# 2. Already installed at our expected location from a previous run, but not on PATH. +# Re-link binaries into ~/.nimble/bin. +if [ -f "${NIM_DEST}/lib/system.nim" ]; then + echo "Nim ${NIM_VERSION} already installed at ${NIM_DEST}, re-linking binaries." + mkdir -p "${HOME}/.nimble/bin" + for bin_path in "${NIM_DEST}/bin/"*; do + ln -sf "${bin_path}" "${HOME}/.nimble/bin/$(basename "${bin_path}")" + done exit 0 fi if [ -n "${nim_ver}" ]; then - newer=$(printf '%s\n%s\n' "${NIM_VERSION}" "${nim_ver}" | sort -V | tail -1) - if [ "${newer}" = "${nim_ver}" ]; then - echo "WARNING: Nim ${nim_ver} is installed; this repo is validated against ${NIM_VERSION}." >&2 - echo "WARNING: The build will proceed but may behave differently." >&2 - exit 0 - fi + echo "INFO: Nim ${nim_ver} found in PATH; installing Nim ${NIM_VERSION} to ${NIM_DEST}." >&2 fi OS=$(uname -s | tr 'A-Z' 'a-z' | sed 's/darwin/macosx/') ARCH=$(uname -m | sed 's/x86_64/x64/;s/aarch64/arm64/') -NIM_DEST="${HOME}/.nim/nim-${NIM_VERSION}" BINARY_URL="https://nim-lang.org/download/nim-${NIM_VERSION}-${OS}_${ARCH}.tar.xz" WORK_DIR=$(mktemp -d) trap 'rm -rf "${WORK_DIR}"' EXIT @@ -48,9 +58,7 @@ if [ "${HTTP_STATUS}" = "200" ]; then echo "Downloading pre-built binary from ${BINARY_URL}..." curl -fL "${BINARY_URL}" -o "${WORK_DIR}/nim.tar.xz" tar -xJf "${WORK_DIR}/nim.tar.xz" -C "${WORK_DIR}" - rm -rf "${NIM_DEST}" - mkdir -p "${HOME}/.nim" - cp -r "${WORK_DIR}/nim-${NIM_VERSION}" "${NIM_DEST}" + SRC_DIR="${WORK_DIR}/nim-${NIM_VERSION}" else echo "No pre-built binary found for ${OS}_${ARCH}. Building from source..." SRC_URL="https://github.com/nim-lang/Nim/archive/refs/tags/v${NIM_VERSION}.tar.gz" @@ -58,15 +66,19 @@ else tar -xzf "${WORK_DIR}/nim-src.tar.gz" -C "${WORK_DIR}" cd "${WORK_DIR}/Nim-${NIM_VERSION}" sh build_all.sh - rm -rf "${NIM_DEST}" - mkdir -p "${HOME}/.nim" - cp -r "${WORK_DIR}/Nim-${NIM_VERSION}" "${NIM_DEST}" + SRC_DIR="${WORK_DIR}/Nim-${NIM_VERSION}" fi +# rm -rf can fail with "Directory not empty" on overlay filesystems (e.g. Docker). +# Using cp -r src/. dst/ handles both cases: dst absent (clean) or partially present. +rm -rf "${NIM_DEST}" 2>/dev/null || true +mkdir -p "${NIM_DEST}" +cp -r "${SRC_DIR}/." "${NIM_DEST}/" + mkdir -p "${HOME}/.nimble/bin" for bin_path in "${NIM_DEST}/bin/"*; do ln -sf "${bin_path}" "${HOME}/.nimble/bin/$(basename "${bin_path}")" done echo "Nim ${NIM_VERSION} installed to ${NIM_DEST}" -echo "Binaries symlinked in ~/.nimble/bin — ensure it is in your PATH." +echo "Binaries symlinked in ~/.nimble/bin — ensure it is in your PATH." \ No newline at end of file diff --git a/scripts/install_nimble.sh b/scripts/install_nimble.sh new file mode 100755 index 000000000..dba2d6612 --- /dev/null +++ b/scripts/install_nimble.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash +# Installs a specific nimble version without using `nimble install nimble`. +# +# `nimble install nimble` is inherently fragile: +# - ETXTBSY: overwriting the running nimble binary in pkgs2/ +# - JSON parse failures with older nimble versions reading packages_official.json +# +# Strategy: +# 1. If the right version is already at ~/.nimble/bin/nimble → done. +# 2. If a previously-compiled binary exists in pkgs2/ → re-link it. +# 3. Otherwise: clone the nimble git repo, init submodules, build with nim, +# and atomically replace the target (mv avoids ETXTBSY on the old binary). + +set -e + +NIMBLE_VERSION="${1:-}" +if [ -z "${NIMBLE_VERSION}" ]; then + echo "Usage: $0 " >&2 + exit 1 +fi + +NIMBLE_BIN="${HOME}/.nimble/bin/nimble" + +# 1. Already installed at the right version? +if [ -x "${NIMBLE_BIN}" ]; then + nimble_ver=$("${NIMBLE_BIN}" --version 2>/dev/null \ + | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1 || true) + if [ "${nimble_ver}" = "${NIMBLE_VERSION}" ]; then + echo "Nimble ${NIMBLE_VERSION} already installed, skipping." + exit 0 + fi +fi + +# 2. Already compiled into pkgs2/ from a previous (possibly partial) run? +PKGS2_NIMBLE=$(ls -dt "${HOME}/.nimble/pkgs2/nimble-${NIMBLE_VERSION}-"*/nimble \ + 2>/dev/null | head -1 || true) +if [ -n "${PKGS2_NIMBLE}" ] && [ -x "${PKGS2_NIMBLE}" ]; then + echo "Nimble ${NIMBLE_VERSION} found in pkgs2, re-linking to ${NIMBLE_BIN}." + mkdir -p "${HOME}/.nimble/bin" + ln -sf "${PKGS2_NIMBLE}" "${NIMBLE_BIN}" + exit 0 +fi + +# 3. Build from source. +NIM_BIN="${HOME}/.nimble/bin/nim" +if [ ! -x "${NIM_BIN}" ]; then + NIM_BIN="$(command -v nim)" +fi + +WORK_DIR="$(mktemp -d)" +trap 'rm -rf "${WORK_DIR}"' EXIT + +echo "Cloning nimble v${NIMBLE_VERSION} with submodules..." +git clone --depth=1 --branch "v${NIMBLE_VERSION}" \ + --recurse-submodules --shallow-submodules \ + https://github.com/nim-lang/nimble.git \ + "${WORK_DIR}/nimble" + +echo "Building nimble ${NIMBLE_VERSION} with $("${NIM_BIN}" --version | head -1)..." +cd "${WORK_DIR}/nimble" +# nim reads nim.cfg / config.nims in the current dir, which sets vendor paths. +"${NIM_BIN}" c -d:release --path:src \ + -o:"${WORK_DIR}/nimble_new" src/nimble.nim + +mkdir -p "${HOME}/.nimble/bin" +# Atomic rename: avoids ETXTBSY when the old binary at NIMBLE_BIN is still running. +cp "${WORK_DIR}/nimble_new" "${NIMBLE_BIN}.new.$$" +mv -f "${NIMBLE_BIN}.new.$$" "${NIMBLE_BIN}" + +echo "Nimble ${NIMBLE_VERSION} installed to ${NIMBLE_BIN}" \ No newline at end of file From 5e262badf74aa6bf80315887bfd11f4193859968 Mon Sep 17 00:00:00 2001 From: Darshan <35736874+darshankabariya@users.noreply.github.com> Date: Wed, 27 May 2026 23:58:30 +0530 Subject: [PATCH 6/7] chore: fixing daily ci (#3878) --- .github/workflows/ci-daily.yml | 1 + Makefile | 3 ++- apps/chat2bridge/chat2bridge.nim | 10 ++++++++-- apps/chat2bridge/config_chat2bridge.nim | 11 +++++------ apps/chat2mix/config_chat2mix.nim | 8 ++++++-- apps/liteprotocoltester/tester_config.nim | 2 +- apps/networkmonitor/networkmonitor_config.nim | 2 +- .../node/peer_manager/peer_store/test_migrations.nim | 4 ++-- .../peer_manager/peer_store/test_peer_storage.nim | 2 +- tests/node/test_wakunode_relay_rln.nim | 2 +- tests/test_utils_compat.nim | 2 +- tests/waku_enr/test_sharding.nim | 2 +- 12 files changed, 30 insertions(+), 19 deletions(-) diff --git a/.github/workflows/ci-daily.yml b/.github/workflows/ci-daily.yml index d52775ae2..009e6c523 100644 --- a/.github/workflows/ci-daily.yml +++ b/.github/workflows/ci-daily.yml @@ -3,6 +3,7 @@ name: Daily logos-delivery CI on: schedule: - cron: '30 6 * * *' + workflow_dispatch: env: NPROC: 2 diff --git a/Makefile b/Makefile index 0515ef3ec..ea1bf66f0 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,7 @@ export PATH := $(HOME)/.nimble/bin:$(PATH) # NIM binary location NIM_BINARY := $(shell which nim 2>/dev/null) NPH := $(HOME)/.nimble/bin/nph +NIMBLE := $(HOME)/.nimble/bin/nimble NIMBLEDEPS_STAMP := nimbledeps/.nimble-setup # Compilation parameters @@ -71,7 +72,7 @@ waku.nims: ln -s waku.nimble $@ $(NIMBLEDEPS_STAMP): nimble.lock | install-nimble build-nph waku.nims - nimble setup --localdeps + $(NIMBLE) setup --localdeps touch $@ # Must be phony so the recipe always runs and the sub-make re-evaluates diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 53eb5d04b..eeeea328b 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[tables, times, strutils, hashes, sequtils, json], + std/[tables, times, strutils, hashes, sequtils, json, options], chronos, confutils, chronicles, @@ -267,10 +267,16 @@ when isMainModule: else: nodev2ExtPort + let nodev2Key = + if conf.nodekey.isSome(): + conf.nodekey.get() + else: + crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() + let bridge = Chat2Matterbridge.new( mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)), mbGateway = conf.mbGateway, - nodev2Key = conf.nodekey, + nodev2Key = nodev2Key, nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift), nodev2ExtIp = nodev2ExtIp, diff --git a/apps/chat2bridge/config_chat2bridge.nim b/apps/chat2bridge/config_chat2bridge.nim index abb5e329f..048fc4d87 100644 --- a/apps/chat2bridge/config_chat2bridge.nim +++ b/apps/chat2bridge/config_chat2bridge.nim @@ -1,4 +1,5 @@ import + std/options, confutils, confutils/defs, confutils/std/net, @@ -45,7 +46,7 @@ type Chat2MatterbridgeConf* = object metricsServerAddress* {. desc: "Listening address of the metrics server", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress @@ -62,10 +63,8 @@ type Chat2MatterbridgeConf* = object .}: seq[string] nodekey* {. - desc: "P2P node private key as hex", - defaultValue: crypto.PrivateKey.random(Secp256k1, newRng()[]).tryGet(), - name: "nodekey" - .}: crypto.PrivateKey + desc: "P2P node private key as hex", defaultValueDesc: "random", name: "nodekey" + .}: Option[crypto.PrivateKey] store* {. desc: "Flag whether to start store protocol", defaultValue: true, name: "store" @@ -94,7 +93,7 @@ type Chat2MatterbridgeConf* = object # Matterbridge options mbHostAddress* {. desc: "Listening address of the Matterbridge host", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "mb-host-address" .}: IpAddress diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim index 4e5a32e6d..639e14986 100644 --- a/apps/chat2mix/config_chat2mix.nim +++ b/apps/chat2mix/config_chat2mix.nim @@ -162,7 +162,8 @@ type metricsServerAddress* {. desc: "Listening address of the metrics server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: + IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress @@ -194,7 +195,10 @@ type dnsDiscoveryNameServers* {. desc: "DNS name server IPs to query. Argument may be repeated.", - defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")], + defaultValue: @[ + IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1]), + IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 0, 0, 1]), + ], name: "dns-discovery-name-server" .}: seq[IpAddress] diff --git a/apps/liteprotocoltester/tester_config.nim b/apps/liteprotocoltester/tester_config.nim index dee918b8c..1f4bedaa8 100644 --- a/apps/liteprotocoltester/tester_config.nim +++ b/apps/liteprotocoltester/tester_config.nim @@ -133,7 +133,7 @@ type LiteProtocolTesterConf* = object ## Tester REST service configuration restAddress* {. desc: "Listening address of the REST HTTP server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "rest-address" .}: IpAddress diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index f67fb09a8..b5bcfbd96 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -116,7 +116,7 @@ type NetworkMonitorConf* = object metricsServerAddress* {. desc: "Listening address of the metrics server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress diff --git a/tests/node/peer_manager/peer_store/test_migrations.nim b/tests/node/peer_manager/peer_store/test_migrations.nim index a20d065ec..d6b86a15b 100644 --- a/tests/node/peer_manager/peer_store/test_migrations.nim +++ b/tests/node/peer_manager/peer_store/test_migrations.nim @@ -1,11 +1,11 @@ -import std/[options], stew/results, testutils/unittests +import std/[options], results, testutils/unittests import waku/node/peer_manager/peer_store/migrations, ../../waku_archive/archive_utils, ../../testlib/[simple_mock] -import std/[tables, strutils, os], stew/results, chronicles +import std/[tables, strutils, os], results, chronicles import waku/common/databases/db_sqlite, waku/common/databases/common diff --git a/tests/node/peer_manager/peer_store/test_peer_storage.nim b/tests/node/peer_manager/peer_store/test_peer_storage.nim index c8a479178..871df8644 100644 --- a/tests/node/peer_manager/peer_store/test_peer_storage.nim +++ b/tests/node/peer_manager/peer_store/test_peer_storage.nim @@ -1,4 +1,4 @@ -import stew/results, testutils/unittests +import results, testutils/unittests import waku/node/peer_manager/peer_store/peer_storage, waku/waku_core/peers diff --git a/tests/node/test_wakunode_relay_rln.nim b/tests/node/test_wakunode_relay_rln.nim index c8ca9b43d..3a2a8a67c 100644 --- a/tests/node/test_wakunode_relay_rln.nim +++ b/tests/node/test_wakunode_relay_rln.nim @@ -2,7 +2,7 @@ import std/[tempfiles, strutils, options], - stew/results, + results, testutils/unittests, chronos, libp2p/switch, diff --git a/tests/test_utils_compat.nim b/tests/test_utils_compat.nim index 121efa4a5..1394982ef 100644 --- a/tests/test_utils_compat.nim +++ b/tests/test_utils_compat.nim @@ -1,7 +1,7 @@ {.used.} import testutils/unittests -import stew/results, waku/waku_core/message, waku/waku_core/time, ./testlib/common +import results, waku/waku_core/message, waku/waku_core/time, ./testlib/common suite "Waku Payload": test "Encode/Decode waku message with timestamp": diff --git a/tests/waku_enr/test_sharding.nim b/tests/waku_enr/test_sharding.nim index 344436d0e..789f8faec 100644 --- a/tests/waku_enr/test_sharding.nim +++ b/tests/waku_enr/test_sharding.nim @@ -1,7 +1,7 @@ {.used.} import - stew/results, + results, chronos, testutils/unittests, libp2p/crypto/crypto as libp2p_keys, From 74057c66224f43b4aa27b42033d4ed52eed5c7a7 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 27 May 2026 23:05:20 +0200 Subject: [PATCH 7/7] start basic reliable channel folder (#3886) --- .github/workflows/ci.yml | 1 + channels/encryption/encryption.nim | 25 ++ channels/encryption/noop_encryption.nim | 18 ++ channels/events.nim | 23 ++ .../rate_limit_manager/rate_limit_manager.nim | 80 ++++++ channels/reliable_channel.nim | 264 ++++++++++++++++++ channels/reliable_channel_manager.nim | 138 +++++++++ .../scalable_data_sync/scalable_data_sync.nim | 62 ++++ .../scalable_data_sync/sds_persistence.nim | 25 ++ .../segmentation/segment_message_proto.nim | 34 +++ channels/segmentation/segmentation.nim | 70 +++++ .../segmentation/segmentation_persistence.nim | 20 ++ channels/types.nim | 15 + tests/all_tests_waku.nim | 3 + tests/channels/test_all.nim | 3 + .../test_reliable_channel_send_receive.nim | 149 ++++++++++ 16 files changed, 930 insertions(+) create mode 100644 channels/encryption/encryption.nim create mode 100644 channels/encryption/noop_encryption.nim create mode 100644 channels/events.nim create mode 100644 channels/rate_limit_manager/rate_limit_manager.nim create mode 100644 channels/reliable_channel.nim create mode 100644 channels/reliable_channel_manager.nim create mode 100644 channels/scalable_data_sync/scalable_data_sync.nim create mode 100644 channels/scalable_data_sync/sds_persistence.nim create mode 100644 channels/segmentation/segment_message_proto.nim create mode 100644 channels/segmentation/segmentation.nim create mode 100644 channels/segmentation/segmentation_persistence.nim create mode 100644 channels/types.nim create mode 100644 tests/channels/test_all.nim create mode 100644 tests/channels/test_reliable_channel_send_receive.nim diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 52d20157a..c54d828ae 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,6 +43,7 @@ jobs: - 'tools/**' - 'tests/all_tests_v2.nim' - 'tests/**' + - 'channels/**' docker: - 'docker/**' diff --git a/channels/encryption/encryption.nim b/channels/encryption/encryption.nim new file mode 100644 index 000000000..5cb53be2f --- /dev/null +++ b/channels/encryption/encryption.nim @@ -0,0 +1,25 @@ +## Optional encryption hooks for the Reliable Channel API. +## +## Modelled as `RequestBroker`s: the broker pattern lets the channel +## delegate work to a provider that may live in any module without +## introducing a direct dependency. If no provider is registered the +## broker returns an error, so installing the noop providers from +## `noop_encryption` is required when the application does not want +## actual encryption. +## +## Applied per-segment after SDS processing on outgoing, and before +## SDS processing on incoming. No specific scheme is mandated. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import brokers/request_broker + +export request_broker + +RequestBroker: + type Encrypt* = seq[byte] + proc signature*(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} + +RequestBroker: + type Decrypt* = seq[byte] + proc signature*(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} diff --git a/channels/encryption/noop_encryption.nim b/channels/encryption/noop_encryption.nim new file mode 100644 index 000000000..f09ed9cf4 --- /dev/null +++ b/channels/encryption/noop_encryption.nim @@ -0,0 +1,18 @@ +## No-op encryption providers. Install these when the application does +## not want actual encryption so the `Encrypt` / `Decrypt` brokers have +## something to dispatch to. + +import results +import chronos +import ./encryption + +proc setNoopEncryption*() = + discard Encrypt.setProvider( + proc(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} = + return ok(Encrypt(payload)) + ) + + discard Decrypt.setProvider( + proc(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} = + return ok(Decrypt(payload)) + ) diff --git a/channels/events.nim b/channels/events.nim new file mode 100644 index 000000000..5a17c99d2 --- /dev/null +++ b/channels/events.nim @@ -0,0 +1,23 @@ +## Reliable Channel event types emitted to API consumers. +## +## Lifecycle events for individual segments (sent / propagated / errored) +## are the same as the network-level ones the DeliveryService already +## emits — `requestId` is shared across layers — so we just re-export +## `waku/events/message_events` and avoid declaring duplicates. +## +## Only the channel-level `MessageReceivedEvent` carries data that has +## no analogue in the lower layer (reassembled application payload, +## senderId, channelId), so it lives here. + +import waku/events/message_events as waku_message_events +import brokers/event_broker + +import ./types as channel_types + +export waku_message_events, channel_types, event_broker + +EventBroker: + type ChannelMessageReceivedEvent* = object + channelId*: ChannelId + senderId*: SdsParticipantID + payload*: seq[byte] diff --git a/channels/rate_limit_manager/rate_limit_manager.nim b/channels/rate_limit_manager/rate_limit_manager.nim new file mode 100644 index 000000000..5ea6486a5 --- /dev/null +++ b/channels/rate_limit_manager/rate_limit_manager.nim @@ -0,0 +1,80 @@ +## Rate Limit Manager for the Reliable Channel API. +## +## Tracks messages sent per RLN epoch and delays dispatch when the +## limit is approached, ensuring RLN compliance on enforcing relays. +## +## For the skeleton this is a pass-through: messages are immediately +## released as ready-to-send. Real epoch budgeting will be added later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/times +import message +import brokers/event_broker +import brokers/broker_context + +export event_broker, broker_context +export message.SdsChannelID + +const + DefaultEpochPeriodSec* = 600 + DefaultMessagesPerEpoch* = 1 + +EventBroker: + ## Emitted by `enqueueToSend` carrying the batch of opaque message + ## blobs that may now leave the rate limiter and continue down the + ## outgoing pipeline (encryption -> dispatch). Bytes only: the rate + ## limiter is intentionally agnostic of SDS, so anything serialisable + ## can flow through it. + ## + ## `channelId` lets listeners filter to their own channel, since all + ## reliable channels share the underlying Waku node's broker context. + type ReadyToSendEvent* = object + channelId*: SdsChannelID + msgs*: seq[seq[byte]] + +type + RateLimitConfig* = object + enabled*: bool ## spec: rate limiting opt-in; SHOULD be true when RLN active + epochPeriodSec*: int + messagesPerEpoch*: int + + RateLimitManager* = ref object + config*: RateLimitConfig + queue*: seq[seq[byte]] + currentEpochStart*: Time + sentInCurrentEpoch*: int + channelId*: SdsChannelID ## tag for the emitted `ReadyToSendEvent` + brokerCtx: BrokerContext + +proc new*( + T: type RateLimitManager, + config: RateLimitConfig, + channelId: SdsChannelID, + brokerCtx: BrokerContext = globalBrokerContext(), +): T = + return T( + config: config, + queue: @[], + currentEpochStart: getTime(), + sentInCurrentEpoch: 0, + channelId: channelId, + brokerCtx: brokerCtx, + ) + +proc enqueueToSend*(self: RateLimitManager, msg: seq[byte]) = + ## Skeleton behaviour: enqueue and immediately release as a single + ## ready batch. Real per-epoch budgeting will park messages on + ## `self.queue` and emit only when the budget allows. + ReadyToSendEvent.emit( + self.brokerCtx, ReadyToSendEvent(channelId: self.channelId, msgs: @[msg]) + ) + +proc dequeueReady*(self: RateLimitManager): seq[seq[byte]] = + ## Returns the set of queued messages that may be dispatched now + ## without exceeding the configured rate limit. + discard + +proc resetEpoch*(self: RateLimitManager) = + self.currentEpochStart = getTime() + self.sentInCurrentEpoch = 0 diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim new file mode 100644 index 000000000..2a7d01d35 --- /dev/null +++ b/channels/reliable_channel.nim @@ -0,0 +1,264 @@ +## Reliable Channel type. +## +## A `ReliableChannel` orchestrates segmentation, SDS (end-to-end +## reliability), optional encryption, and rate-limited dispatch on top +## of the Messaging API for a single channel. +## +## Outgoing pipeline: Segment -> SDS -> Rate Limit -> Encrypt -> Dispatch +## Incoming pipeline: Decrypt -> SDS -> Reassemble -> Emit event +## +## Channels are owned by a `ReliableChannelManager`. Lifecycle and send +## operations are addressed by `ChannelId`, so callers only need to keep +## an opaque handle around. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/[options, tables] +import results +import chronos +import bearssl/rand +import stew/byteutils +import libp2p/crypto/crypto as libp2p_crypto + +import waku/node/delivery_service/delivery_service +import waku/node/delivery_service/send_service +import waku/waku_core/topics + +import ./events +import ./segmentation/segmentation +import ./scalable_data_sync/scalable_data_sync +import ./rate_limit_manager/rate_limit_manager +import ./encryption/encryption + +export + delivery_service, send_service, events, segmentation, scalable_data_sync, + rate_limit_manager, encryption + +const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" + ## Wire-format spec marker for the Reliable Channel layer, as defined + ## in the reliable-channel-api LIP (`Wire Format / Spec Marker`). + ## A `WakuMessage` whose `meta` field does not equal these bytes is + ## not addressed to this layer and is silently dropped on ingress. + ## The trailing `/N` is the wire-format version and is bumped only + ## on breaking on-the-wire changes; implementations pin one version. + +type ReliableChannel* = ref object + ## Spec-defined public type. Fields are private so callers cannot + ## mutate internals and break invariants. Getters are added below + ## for the few values consumers may need. + deliveryService: DeliveryService + channelId: ChannelId + contentTopic: ContentTopic + senderId: SdsParticipantID + rng: ref HmacDrbgContext + segmentation: SegmentationHandler + sdsHandler: SdsHandler + rateLimit: RateLimitManager + + requestIds: Table[RequestId, seq[RequestId]] + pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]] + brokerCtx: BrokerContext + ## Captured here so the channel emits `ChannelMessageReceivedEvent` + ## on the same broker context the owning manager registered its + ## listeners on. Without this, an emit via `globalBrokerContext()` + ## would land on whatever context happens to be thread-local at + ## emit time, which is not necessarily the manager's. + +func getChannelId*(self: ReliableChannel): ChannelId {.inline.} = + self.channelId + +func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} = + self.contentTopic + +func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = + self.senderId + +proc onReadyToSend( + self: ReliableChannel, msgs: seq[seq[byte]] +) {.async: (raises: []).} = + ## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent` + ## listener once `rate_limit_manager` releases a batch of opaque + ## blobs (already-encoded SDS messages): + ## + ## ... -> rate_limit_manager -> [encryption] -> dispatch + for m in msgs: + ## Each `m` was preceded by exactly one push onto `pendingRequests` + ## in `send`, so this pop is always safe in the current skeleton. + let pending = self.pendingRequests[0] + self.pendingRequests.delete(0) + + ## TODO: revisit which fields of the SDS message must be encrypted. + ## Encrypting the whole encoded blob forces every receiver to attempt + ## decryption before it can route, which breaks selective dispatch. + ## Leave routing metadata (channelId, causal-history references) in + ## clear and encrypt only the application payload. + let encRes = await Encrypt.request(m) + let encrypted = encRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: pending.parent, + messageHash: "", + error: "encryption failed: " & error, + ), + ) + continue + let wireBytes = seq[byte](encrypted) + + let envelope = MessageEnvelope( + contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral + ) + + let deliveryReqId = RequestId.new(self.rng) + let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr: + ## TODO: emit waku `MessageErrorEvent` for the parent request id. + continue + + ## Stamp the Reliable Channel wire-format spec marker so the ingress + ## side of any peer can route this WakuMessage to its Reliable + ## Channel layer. Done on the constructed WakuMessage rather than + ## via the envelope because `MessageEnvelope` does not expose a + ## `meta` field. + deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes() + + asyncSpawn self.deliveryService.sendService.send(deliveryTask) + self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId) + +proc send*( + self: ReliableChannel, payload: seq[byte], ephemeral: bool = false +): Result[RequestId, string] = + ## Single application-level send. The first three stages of the + ## outgoing pipeline are chained explicitly so the flow is visible + ## at a glance: + ## + ## segmentation -> sds -> rate_limit_manager + ## + ## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with + ## the SDS messages cleared for transmission; the channel's listener + ## then runs the final stage (encryption -> dispatch). The `ephemeral` + ## flag is carried alongside each segment in `pendingRequests` and + ## stamped onto the eventual `MessageEnvelope`. + ## + ## The returned `RequestId` is the parent of one-or-more + ## delivery-service `RequestId`s; the mapping is recorded in + ## `self.requestIds`. + if payload.len == 0: + return err("empty payload") + + let parentReqId = RequestId.new(self.rng) + self.requestIds[parentReqId] = @[] + + for segmentBytes in self.segmentation.performSegmentation(payload): + ## Segments arrive already encoded; the segmentation module owns + ## the wire format so SDS only ever sees opaque bytes. + let sdsBytes = self.sdsHandler.wrapOutgoing( + self.channelId, self.senderId, segmentBytes + ).valueOr: + return err("SDS wrap failed: " & error) + self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral)) + self.rateLimit.enqueueToSend(sdsBytes) + + return ok(parentReqId) + +proc onMessageReceived( + self: ReliableChannel, messageHash: string, payload: seq[byte] +) {.async: (raises: []).} = + ## Ingress pipeline made visible: + ## + ## payload -> decrypt -> sds -> reassemble -> emit + ## + ## Invoked from this channel's `MessageReceivedEvent` listener, which + ## already filtered on the spec marker and on `contentTopic`. The + ## channel only sees the raw payload bytes for itself. + + ## Notice that the following "request" is implemented implicitly as a broker call to + ## the `Decrypt` request broker. + let decRes = await Decrypt.request(payload) + let plaintext = decRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: RequestId(""), + messageHash: messageHash, + error: "decryption failed: " & error, + ), + ) + return + let plaintextBytes = seq[byte](plaintext) + + let unwrapped = self.sdsHandler.handleIncoming(plaintextBytes) + if unwrapped.isErr(): + return + + let reassembled = self.segmentation.handleIncomingSegment(unwrapped.get().content) + if reassembled.isSome(): + ## Emit on the captured `brokerCtx` (the manager's), so the + ## application listener that the manager has set up on that same + ## context picks the event up. + ChannelMessageReceivedEvent.emit( + self.brokerCtx, + ChannelMessageReceivedEvent( + channelId: self.channelId, + senderId: self.senderId, + payload: reassembled.get().payload, + ), + ) + +proc new*( + T: type ReliableChannel, + deliveryService: DeliveryService, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, + segConfig: SegmentationConfig, + sdsConfig: SdsConfig, + rateConfig: RateLimitConfig, + brokerCtx: BrokerContext = globalBrokerContext(), +): T = + ## Pipeline handlers (segmentation/SDS/rate-limit) are constructed + ## inside the channel rather than handed in by the caller — they are + ## implementation details of the channel, not knobs the API consumer + ## should be wiring up. Encryption is delegated to the `Encrypt`/ + ## `Decrypt` request brokers, so the channel keeps no per-instance + ## encryption state either. + let chn = T( + deliveryService: deliveryService, + channelId: channelId, + contentTopic: contentTopic, + senderId: senderId, + rng: libp2p_crypto.newRng(), + segmentation: SegmentationHandler.new(segConfig), + sdsHandler: SdsHandler.new(sdsConfig, senderId), + rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx), + requestIds: initTable[RequestId, seq[RequestId]](), + pendingRequests: @[], + brokerCtx: brokerCtx, + ) + + ## Each channel owns its own egress + ingress listeners on + ## `chn.brokerCtx`, filtered to traffic addressed to this channel. + ## Keeping the listeners (and the procs they call) inside the + ## channel lets `onReadyToSend` and `onMessageReceived` stay private + ## — the manager doesn't need to know about them. + discard ReadyToSendEvent.listen( + chn.brokerCtx, + proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} = + if evt.channelId == chn.channelId: + await chn.onReadyToSend(evt.msgs) + , + ) + + discard MessageReceivedEvent.listen( + chn.brokerCtx, + proc(evt: MessageReceivedEvent): Future[void] {.async: (raises: []).} = + ## Drop foreign traffic (non-Reliable-Channel `meta`) and traffic + ## for other channels before doing any decode work. + if string.fromBytes(evt.message.meta) != LipWireReliableChannelVersion: + return + if evt.message.contentTopic != chn.contentTopic: + return + await chn.onMessageReceived(evt.messageHash, evt.message.payload) + , + ) + + return chn diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim new file mode 100644 index 000000000..ddbdb37a6 --- /dev/null +++ b/channels/reliable_channel_manager.nim @@ -0,0 +1,138 @@ +## Reliable Channel API entry point. +## +## Owns the set of `ReliableChannel` instances and exposes lifecycle and +## send/receive operations addressed by `ChannelId`. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/tables +import results +import chronos +import stew/byteutils + +import waku/api/api +import waku/api/api_conf +import waku/events/message_events as waku_message_events +import waku/factory/waku as waku_factory +import waku/waku_core/topics + +import ./reliable_channel +import ./encryption/noop_encryption + +export reliable_channel + +type ReliableChannelManager* = ref object + channels: Table[ChannelId, ReliableChannel] + deliveryService: DeliveryService + ## Owned by the manager. The ownership chain is + ## ReliableChannelManager -> DeliveryService -> Waku -> WakuNode. + ## Hidden so callers can't substitute their own and bypass the + ## manager's pipeline. + brokerCtx: BrokerContext + +proc new*( + T: type ReliableChannelManager, + conf: WakuNodeConf, + brokerCtx: BrokerContext = globalBrokerContext(), +): Future[Result[T, string]] {.async.} = + ## TODO !! The proper ownership chain is: + ## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode, + ## and this will be implemented in the future. For now, `createNode` + ## is called here to get a DeliveryService instance, and the WakuNode is immediately discarded. + ## This is a temporary workaround to get the API + + let waku = ?(await createNode(conf)) + + let manager = T( + channels: initTable[ChannelId, ReliableChannel](), + deliveryService: waku.deliveryService, + brokerCtx: brokerCtx, + ) + + return ok(manager) + +proc start*(self: ReliableChannelManager): Result[void, string] = + ## Bring the owned DeliveryService up. Separated from `new` so callers + ## can register encryption providers / create channels before traffic + ## starts flowing. + self.deliveryService.startDeliveryService() + +proc stop*(self: ReliableChannelManager) {.async.} = + if not self.deliveryService.isNil(): + await self.deliveryService.stopDeliveryService() + +proc createReliableChannel*( + self: ReliableChannelManager, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, +): Result[ChannelId, string] = + ## Spec entry point. The `DeliveryService` and `rng` the channel needs + ## are sourced from the owning `ReliableChannelManager` rather than + ## passed per call. Encryption is wired up through the `Encrypt`/ + ## `Decrypt` request brokers — the application installs its own + ## providers (or `setNoopEncryption()`) before traffic flows. + ## + ## Segmentation, SDS and rate-limit configs will eventually be read + ## from the node's `NodeConfig`. Defaults for now. + if self.channels.hasKey(channelId): + return err("channel already exists: " & channelId) + + let segConfig = SegmentationConfig( + segmentSizeBytes: DefaultSegmentSizeBytes, + enableReedSolomon: false, + persistence: nil, + ) + let sdsConfig = SdsConfig( + acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, + maxRetransmissions: DefaultMaxRetransmissions, + causalHistorySize: DefaultCausalHistorySize, + persistence: nil, + ) + let rateConfig = RateLimitConfig( + epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch + ) + + let chn = ReliableChannel.new( + deliveryService = self.deliveryService, + channelId = channelId, + contentTopic = contentTopic, + senderId = senderId, + segConfig = segConfig, + sdsConfig = sdsConfig, + rateConfig = rateConfig, + brokerCtx = self.brokerCtx, + ) + + self.channels[channelId] = chn + return ok(channelId) + +proc closeChannel*( + self: ReliableChannelManager, channelId: ChannelId +): Result[void, string] = + ## Flush state, persist outstanding SDS buffers, release resources. + if not self.channels.hasKey(channelId): + return err("unknown channel: " & channelId) + self.channels.del(channelId) + return ok() + +proc send*( + self: ReliableChannelManager, + channelId: ChannelId, + appPayload: seq[byte], + ephemeral: bool = false, +): Result[RequestId, string] = + ## Spec-level entry point. Looks the channel up by id and delegates + ## to `ReliableChannel.send`, which exposes the visible pipeline + ## segmentation -> sds -> rate_limit_manager -> encryption. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): + return err("unknown channel: " & channelId) + return chn.send(appPayload, ephemeral) + +## Inbound messages are not handed to the manager by direct call. Each +## `ReliableChannel` installs its own `MessageReceivedEvent` listener +## in `ReliableChannel.new`, filters by spec marker and `contentTopic`, +## and routes to its private `onMessageReceived`. This keeps the lower +## layer (MessagingAPI/Waku) unaware of the existence of ReliableChannel +## and keeps the manager out of per-channel event dispatch. diff --git a/channels/scalable_data_sync/scalable_data_sync.nim b/channels/scalable_data_sync/scalable_data_sync.nim new file mode 100644 index 000000000..30ad0e02b --- /dev/null +++ b/channels/scalable_data_sync/scalable_data_sync.nim @@ -0,0 +1,62 @@ +## Scalable Data Sync (SDS) component for the Reliable Channel API. +## +## Provides end-to-end delivery guarantees via causal history tracking, +## acknowledgements, and retransmission of unacknowledged segments. +## +## Skeleton: `wrapOutgoing` and `handleIncoming` are pass-throughs so +## the send/receive circuit can exercise the surrounding pipeline. +## Real SDS wrapping will plug in via `nim-sds` later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import results +import message as sds_message + +import ./sds_persistence + +export sds_message, sds_persistence + +const + DefaultAcknowledgementTimeoutMs* = 5_000 + DefaultMaxRetransmissions* = 5 + DefaultCausalHistorySize* = 2 + +type + SdsConfig* = object + acknowledgementTimeoutMs*: int + maxRetransmissions*: int + causalHistorySize*: int + persistence*: SdsPersistence + + SdsHandler* = ref object + config*: SdsConfig + participantId*: SdsParticipantID + +proc new*( + T: type SdsHandler, + config: SdsConfig, + participantId: SdsParticipantID = SdsParticipantID(""), +): T = + return T(config: config, participantId: participantId) + +proc wrapOutgoing*( + self: SdsHandler, + channelId: SdsChannelID, + senderId: SdsParticipantID, + payload: seq[byte], +): Result[seq[byte], string] = + ## Stage 2 of the outgoing pipeline (segmentation -> sds -> rate_limit_manager -> encryption). + ## Skeleton: pass the encoded segment through unchanged. Real causal + ## history / lamport / bloom-filter population will replace this. + return ok(payload) + +proc handleIncoming*( + self: SdsHandler, msg: seq[byte] +): Result[tuple[content: seq[byte], channelId: SdsChannelID], string] = + ## Skeleton: pass the bytes through; channel id is left empty until + ## the real wire format provides it. + return ok((content: msg, channelId: SdsChannelID(""))) + +proc tickRetransmissions*(self: SdsHandler) = + ## Drives retransmissions of unacknowledged messages. + discard diff --git a/channels/scalable_data_sync/sds_persistence.nim b/channels/scalable_data_sync/sds_persistence.nim new file mode 100644 index 000000000..8089595ea --- /dev/null +++ b/channels/scalable_data_sync/sds_persistence.nim @@ -0,0 +1,25 @@ +## Persistence backend for SDS outgoing buffer and causal history. +## +## TODO (raised in PR review): this surface is duplicating concerns that +## should come from the SDS module itself. Once the SDS module exposes a +## complete persistence contract, drop this file and import that surface +## instead of re-declaring it here. + +import message + +type + SdsPersistenceKind* {.pure.} = enum + InMemory + Sqlite + + SdsPersistence* = ref object of RootObj + kind*: SdsPersistenceKind + +method storeOutgoing*(self: SdsPersistence, msg: SdsMessage) {.base.} = + discard + +method markAcknowledged*(self: SdsPersistence, messageId: SdsMessageID) {.base.} = + discard + +method unackedOlderThan*(self: SdsPersistence, ageMs: int): seq[SdsMessage] {.base.} = + discard diff --git a/channels/segmentation/segment_message_proto.nim b/channels/segmentation/segment_message_proto.nim new file mode 100644 index 000000000..f19cdc27f --- /dev/null +++ b/channels/segmentation/segment_message_proto.nim @@ -0,0 +1,34 @@ +## Wire format for a single segment, per the Reliable Channel API spec. +## +## Skeleton: encode/decode treat the segment as just its payload bytes, +## since for now we only ever produce a single segment per send. + +type SegmentMessageProto* = object + entireMessageHash*: seq[byte] ## Keccak256(original payload), 32 bytes + dataSegmentIndex*: uint32 ## zero-indexed sequence number for data segments + dataSegmentCount*: uint32 ## number of data segments (>= 1) + payload*: seq[byte] ## segment payload (data or parity shard) + paritySegmentIndex*: uint32 ## zero-based sequence number for parity segments + paritySegmentCount*: uint32 ## number of parity segments + isParity*: bool ## true for parity segments, false (default) for data segments + +proc isParityMessage*(self: SegmentMessageProto): bool = + self.isParity + +proc isValid*(self: SegmentMessageProto): bool = + ## Validates hash length (32 bytes), segment indices and counts. + discard + +proc encode*(self: SegmentMessageProto): seq[byte] = + self.payload + +proc decode*(T: type SegmentMessageProto, buf: seq[byte]): T = + T( + entireMessageHash: @[], + dataSegmentIndex: 0, + dataSegmentCount: 1, + payload: buf, + paritySegmentIndex: 0, + paritySegmentCount: 0, + isParity: false, + ) diff --git a/channels/segmentation/segmentation.nim b/channels/segmentation/segmentation.nim new file mode 100644 index 000000000..9fc7964c0 --- /dev/null +++ b/channels/segmentation/segmentation.nim @@ -0,0 +1,70 @@ +## Segmentation component for the Reliable Channel API. +## +## Splits large application payloads into transmittable segments and +## reassembles them on reception. Supports optional Reed-Solomon parity +## segments for loss recovery, as per the Reliable Channel API spec. +## +## For the skeleton everything fits in a single segment: real chunking +## and Reed-Solomon parity will be plugged in later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/options +import ./segment_message_proto +import ./segmentation_persistence + +export segment_message_proto, segmentation_persistence + +const + DefaultSegmentSizeBytes* = 102_400 + SegmentsParityRate* = 0.125 + SegmentsReedSolomonMaxCount* = 256 + +type + SegmentationConfig* = object + segmentSizeBytes*: int + enableReedSolomon*: bool + persistence*: SegmentationPersistence + + SegmentationHandler* = ref object + config*: SegmentationConfig + + ReassemblyResult* = object + payload*: seq[byte] + entireMessageHash*: seq[byte] + +proc new*(T: type SegmentationHandler, config: SegmentationConfig): T = + return T(config: config) + +proc performSegmentation*( + self: SegmentationHandler, payload: seq[byte] +): seq[seq[byte]] = + ## Skeleton behaviour: emit exactly one segment carrying the whole + ## payload. Real chunking and Reed-Solomon parity will replace this. + let segment = SegmentMessageProto( + entireMessageHash: @[], + dataSegmentIndex: 0, + dataSegmentCount: 1, + payload: payload, + paritySegmentIndex: 0, + paritySegmentCount: 0, + isParity: false, + ) + return @[segment.encode()] + +proc handleIncomingSegment*( + self: SegmentationHandler, segmentBytes: seq[byte] +): Option[ReassemblyResult] = + ## Skeleton behaviour: every segment is already a complete message + ## (since `performSegmentation` always emits one), so just hand the + ## payload straight back. + let segment = SegmentMessageProto.decode(segmentBytes) + return some( + ReassemblyResult( + payload: segment.payload, entireMessageHash: segment.entireMessageHash + ) + ) + +proc cleanupSegments*(self: SegmentationHandler) = + ## Drop expired partial-reassembly state. + discard diff --git a/channels/segmentation/segmentation_persistence.nim b/channels/segmentation/segmentation_persistence.nim new file mode 100644 index 000000000..cc34c36d2 --- /dev/null +++ b/channels/segmentation/segmentation_persistence.nim @@ -0,0 +1,20 @@ +## Persistence backend interface for segmentation reassembly state. +## +## Allows partial reassembly state to survive process restarts. + +type + SegmentationPersistenceKind* {.pure.} = enum + InMemory + Sqlite + + SegmentationPersistence* = ref object of RootObj + kind*: SegmentationPersistenceKind + +method put*(self: SegmentationPersistence, key: seq[byte], value: seq[byte]) {.base.} = + discard + +method get*(self: SegmentationPersistence, key: seq[byte]): seq[byte] {.base.} = + discard + +method delete*(self: SegmentationPersistence, key: seq[byte]) {.base.} = + discard diff --git a/channels/types.nim b/channels/types.nim new file mode 100644 index 000000000..4070ed620 --- /dev/null +++ b/channels/types.nim @@ -0,0 +1,15 @@ +## Core identifier types for the Reliable Channel API. + +import std/hashes +import waku/api/types as api_types + +import ./scalable_data_sync/scalable_data_sync + +export scalable_data_sync +export api_types + +type ChannelId* = SdsChannelID + +proc hash*(r: RequestId): Hash = + ## Allows `RequestId` to be used as a `Table` key. + hash(string(r)) diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index e64922f4c..963a948a3 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -88,3 +88,6 @@ import ./tools/test_all # Persistency library tests import ./persistency/test_all + +# Reliable Channel API tests +import ./channels/test_all diff --git a/tests/channels/test_all.nim b/tests/channels/test_all.nim new file mode 100644 index 000000000..04b448707 --- /dev/null +++ b/tests/channels/test_all.nim @@ -0,0 +1,3 @@ +{.used.} + +import ./test_reliable_channel_send_receive diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim new file mode 100644 index 000000000..052cd35c9 --- /dev/null +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -0,0 +1,149 @@ +{.used.} + +import std/[net] +import chronos, testutils/unittests, stew/byteutils +import brokers/broker_context + +import ../testlib/[common, wakucore, wakunode, testasync] + +import waku +import waku/[waku_node, waku_core] +import waku/factory/waku_conf +import waku/events/message_events as waku_message_events +import tools/confutils/cli_args + +import channels/reliable_channel_manager +import channels/encryption/noop_encryption + +const TestTimeout = chronos.seconds(15) + +proc createApiNodeConf(): WakuNodeConf = + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = cli_args.WakuMode.Core + conf.listenAddress = parseIpAddress("0.0.0.0") + conf.tcpPort = Port(0) + conf.discv5UdpPort = Port(0) + conf.clusterId = 3'u16 + conf.numShardsInNetwork = 1 + conf.reliabilityEnabled = true + conf.rest = false + return conf + +suite "Reliable Channel - ingress": + asyncTest "manager dispatches marked WakuMessage to the right channel": + ## Unit test for the receive side of the API: instead of standing + ## up two libp2p nodes and a relay mesh, we drive the manager + ## directly by emitting a `MessageReceivedEvent` (the exact event + ## the DeliveryService emits when a `WakuMessage` arrives off the + ## wire). The manager must: + ## - drop traffic missing the Reliable Channel spec marker + ## - dispatch the matching channel's `onMessageReceived` + ## - emit `ChannelMessageReceivedEvent` with the payload + const + channelId = ChannelId("test-channel") + contentTopic = ContentTopic("/reliable-channel/test/proto") + let appPayload = "hello reliable channel".toBytes() + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + ## Noop encryption providers so the Encrypt/Decrypt brokers have + ## something to dispatch to; without this the channel falls back to + ## plaintext anyway, but installing them is the documented setup. + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + let received = newFuture[seq[byte]]("channel-message-received") + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if not received.finished() and evt.channelId == channelId: + received.complete(evt.payload) + , + ) + .expect("listen ChannelMessageReceivedEvent") + + ## Build a `WakuMessage` that looks like one that came in off the + ## wire from a peer: the spec marker on `meta` plus the right content + ## topic. The manager's ingress listener should pick it up, + ## decrypt (noop), unwrap SDS (pass-through), reassemble (one + ## segment), and finally emit `ChannelMessageReceivedEvent`. + let inboundMsg = WakuMessage( + payload: appPayload, + contentTopic: contentTopic, + version: 0, + meta: LipWireReliableChannelVersion.toBytes(), + ) + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + ) + + let arrived = await received.withTimeout(TestTimeout) + check arrived + if arrived: + check received.read() == appPayload + + await manager.stop() + + asyncTest "manager drops unmarked WakuMessage": + ## Mirror of the above: same content topic, but `meta` is empty + ## (i.e. foreign traffic). The channel-level event must NOT fire. + const + channelId = ChannelId("test-channel-2") + contentTopic = ContentTopic("/reliable-channel/test/proto") + let appPayload = "foreign payload".toBytes() + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + var fired = false + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + fired = true + , + ) + .expect("listen ChannelMessageReceivedEvent") + + let inboundMsg = WakuMessage( + payload: appPayload, + contentTopic: contentTopic, + version: 0, + meta: @[], ## no Reliable Channel spec marker + ) + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + ) + + ## Give the event broker a chance to fan out. + await sleepAsync(100.milliseconds) + check not fired + + await manager.stop()