From 79dda6375807624cd1ba04b562cf40e6aeaf6807 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Tue, 26 May 2026 16:22:10 +0200 Subject: [PATCH 1/8] Recover wakucanary in nix output (#3892) --- flake.nix | 9 ++- nix/default.nix | 149 ++++++++++++++++++++++++++++++++---------------- 2 files changed, 108 insertions(+), 50 deletions(-) diff --git a/flake.nix b/flake.nix index 077668b9a..6c283780d 100644 --- a/flake.nix +++ b/flake.nix @@ -89,8 +89,15 @@ inherit zerokitRln; gitVersion = "v${nimbleVersion}-g${builtins.substring 0 6 shortRev}"; }; + + wakucanary = pkgs.callPackage ./nix/default.nix { + inherit pkgs; + src = ./.; + targets = ["wakucanary"]; + zerokitRln = zerokit.packages.${system}.rln; + }; in { - inherit liblogosdelivery; + inherit liblogosdelivery wakucanary; # Expose the cargoHash-corrected librln so downstream consumers # (e.g. logos-delivery-module) bundle the exact same librln this # build links, instead of pulling zerokit's rln directly — whose diff --git a/nix/default.nix b/nix/default.nix index 7b7989e1a..ec9e0542c 100644 --- a/nix/default.nix +++ b/nix/default.nix @@ -1,6 +1,7 @@ { pkgs , src , zerokitRln +, targets ? [] , gitVersion ? "n/a" , enablePostgres ? true , enableNimDebugDlOpen ? true @@ -10,6 +11,8 @@ let deps = import ./deps.nix { inherit pkgs; }; + buildWakucanary = builtins.elem "wakucanary" targets; + nimDefineArgs = pkgs.lib.concatStringsSep " \\\n " ( [ "--define:disable_libbacktrace" "--define:git_version=${gitVersion}" ] @@ -34,9 +37,29 @@ let if pkgs.stdenv.hostPlatform.isWindows then "dll" else if pkgs.stdenv.hostPlatform.isDarwin then "dylib" else "so"; + + # Shared `nim c` invocation. Callers vary the output, the source file and a + # few mode-specific flags (e.g. --app:lib, --noMain, --header); everything + # else (paths, defines, threading, gc, nimcache, rln linkage) is constant. + # $NAT_TRAV and $NIMCACHE are shell variables defined in buildPhase. + nimCompile = { outFile, sourceFile, extraArgs ? [] }: '' + nim c \ + --noNimblePath \ + ${pathArgs} \ + --path:$NAT_TRAV \ + --path:$NAT_TRAV/src \ + --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ + ${nimDefineArgs} \ + --threads:on \ + --mm:refc \ + --nimcache:$NIMCACHE \ + --out:${outFile} \ + ${pkgs.lib.concatStringsSep " \\\n " extraArgs} \ + ${sourceFile} + ''; in pkgs.stdenv.mkDerivation { - pname = "liblogosdelivery"; + pname = if buildWakucanary then "wakucanary" else "liblogosdelivery"; version = "dev"; inherit src; @@ -71,45 +94,47 @@ pkgs.stdenv.mkDerivation { make -C $NAT_TRAV/vendor/libnatpmp-upstream \ CFLAGS="-Wall -Os -fPIC -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4" libnatpmp.a + ${if buildWakucanary then '' + echo "== Building wakucanary ==" + ${nimCompile { + outFile = "build/wakucanary"; + sourceFile = "apps/wakucanary/wakucanary.nim"; + extraArgs = [ "--path:." ]; + }} + '' else '' echo "== Building liblogosdelivery (dynamic) ==" - nim c \ - --noNimblePath \ - ${pathArgs} \ - --path:$NAT_TRAV \ - --path:$NAT_TRAV/src \ - --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ - ${nimDefineArgs} \ - --out:build/liblogosdelivery.${libExt} \ - --app:lib \ - --threads:on \ - --opt:size \ - --noMain \ - --mm:refc \ - --header \ - --nimMainPrefix:liblogosdelivery \ - --nimcache:$NIMCACHE \ - liblogosdelivery/liblogosdelivery.nim + ${nimCompile { + outFile = "build/liblogosdelivery.${libExt}"; + sourceFile = "liblogosdelivery/liblogosdelivery.nim"; + extraArgs = [ + "--app:lib" + "--opt:size" + "--noMain" + "--header" + "--nimMainPrefix:liblogosdelivery" + ]; + }} echo "== Building liblogosdelivery (static) ==" - nim c \ - --noNimblePath \ - ${pathArgs} \ - --path:$NAT_TRAV \ - --path:$NAT_TRAV/src \ - --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ - ${nimDefineArgs} \ - --out:build/liblogosdelivery.a \ - --app:staticlib \ - --threads:on \ - --opt:size \ - --noMain \ - --mm:refc \ - --nimMainPrefix:liblogosdelivery \ - --nimcache:$NIMCACHE \ - liblogosdelivery/liblogosdelivery.nim + ${nimCompile { + outFile = "build/liblogosdelivery.a"; + sourceFile = "liblogosdelivery/liblogosdelivery.nim"; + extraArgs = [ + "--app:staticlib" + "--opt:size" + "--noMain" + "--nimMainPrefix:liblogosdelivery" + ]; + }} + ''} ''; - installPhase = '' + installPhase = if buildWakucanary then '' + runHook preInstall + mkdir -p $out/bin $out/lib + cp build/wakucanary $out/bin/ + runHook postInstall + '' else '' runHook preInstall mkdir -p $out/lib $out/include cp build/liblogosdelivery.${libExt} $out/lib/ 2>/dev/null || true @@ -118,21 +143,47 @@ pkgs.stdenv.mkDerivation { runHook postInstall ''; - # Bundle librln alongside liblogosdelivery so the output is self-contained. + # Bundle librln alongside the produced artifact so the output is self-contained. # Use --add-rpath (not --set-rpath) so fixupPhase's stdenv RUNPATH injection # for libstdc++ is preserved. postInstall = - pkgs.lib.optionalString pkgs.stdenv.isDarwin '' - cp ${zerokitRln}/lib/librln.dylib $out/lib/ - chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib - old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln) - install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib - '' - + pkgs.lib.optionalString pkgs.stdenv.isLinux '' - cp ${zerokitRln}/lib/librln.so $out/lib/ - patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so - ''; + if buildWakucanary then + pkgs.lib.optionalString pkgs.stdenv.isDarwin '' + cp ${zerokitRln}/lib/librln.dylib $out/lib/ + chmod +w $out/lib/librln.dylib $out/bin/wakucanary + install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib + old=$(otool -L $out/bin/wakucanary | awk 'NR>1{print $1}' | grep librln || true) + if [ -n "$old" ]; then + install_name_tool -change "$old" @rpath/librln.dylib $out/bin/wakucanary + fi + install_name_tool -add_rpath @loader_path/../lib $out/bin/wakucanary + '' + + pkgs.lib.optionalString pkgs.stdenv.isLinux '' + cp ${zerokitRln}/lib/librln.so $out/lib/ + patchelf --add-rpath '$ORIGIN/../lib' $out/bin/wakucanary + '' + else + pkgs.lib.optionalString pkgs.stdenv.isDarwin '' + cp ${zerokitRln}/lib/librln.dylib $out/lib/ + chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib + old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln) + install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib + '' + + pkgs.lib.optionalString pkgs.stdenv.isLinux '' + cp ${zerokitRln}/lib/librln.so $out/lib/ + patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so + ''; + + meta = with pkgs.lib; { + description = + if buildWakucanary + then "Waku network canary tool" + else "logos-delivery shared/static library"; + homepage = "https://github.com/logos-messaging/logos-delivery"; + license = licenses.mit; + platforms = platforms.unix; + }; } From 8b53e64379f782acb9104a07539d6f7b2ee8a077 Mon Sep 17 00:00:00 2001 From: Tanya S <120410716+stubbsta@users.noreply.github.com> Date: Wed, 27 May 2026 10:40:54 +0200 Subject: [PATCH 2/8] Remove makefile target update (#3897) * Remove makefile target update * fix: set execute permission on install_nimble.sh * improve install_nim script * skip second nim install on Windows * fix path check in install-nim * Makefile workfile reordering --- BearSSL.mk | 15 +++------ Makefile | 68 ++++++++++++++++++------------------- Nat.mk | 19 +++-------- scripts/install_nim.sh | 44 +++++++++++++++--------- scripts/install_nimble.sh | 70 +++++++++++++++++++++++++++++++++++++++ 5 files changed, 140 insertions(+), 76 deletions(-) create mode 100755 scripts/install_nimble.sh diff --git a/BearSSL.mk b/BearSSL.mk index 355e46563..65e4f72a7 100644 --- a/BearSSL.mk +++ b/BearSSL.mk @@ -9,7 +9,7 @@ ## bearssl (nimbledeps) ## ########################### # Rebuilds libbearssl.a from the package installed by nimble under -# nimbledeps/pkgs2/. Used by `make update` / $(NIMBLEDEPS_STAMP). +# nimbledeps/pkgs2/. Invoked via $(NIMBLEDEPS_STAMP) / build-deps. # # BEARSSL_NIMBLEDEPS_DIR is evaluated at parse time, so targets that # depend on it must be invoked via a recursive $(MAKE) call so the sub-make @@ -29,18 +29,11 @@ else PORTABLE_BEARSSL_CFLAGS := -W -Wall -Os -fPIC endif -.PHONY: clean-bearssl-nimbledeps rebuild-bearssl-nimbledeps +.PHONY: rebuild-bearssl-nimbledeps -clean-bearssl-nimbledeps: +rebuild-bearssl-nimbledeps: ifeq ($(BEARSSL_NIMBLEDEPS_DIR),) - $(error No bearssl package found under nimbledeps/pkgs2/ — run 'make update' first) -endif - + [ -e "$(BEARSSL_CSOURCES_DIR)/build" ] && \ - "$(MAKE)" -C "$(BEARSSL_CSOURCES_DIR)" clean || true - -rebuild-bearssl-nimbledeps: | clean-bearssl-nimbledeps -ifeq ($(BEARSSL_NIMBLEDEPS_DIR),) - $(error No bearssl package found under nimbledeps/pkgs2/ — run 'make update' first) + $(error No bearssl package found under nimbledeps/pkgs2/ — run 'make build-deps' first) endif @echo "Rebuilding bearssl from $(BEARSSL_CSOURCES_DIR)" + "$(MAKE)" -C "$(BEARSSL_CSOURCES_DIR)" CFLAGS="$(PORTABLE_BEARSSL_CFLAGS)" lib \ No newline at end of file diff --git a/Makefile b/Makefile index f147c5e7e..0515ef3ec 100644 --- a/Makefile +++ b/Makefile @@ -42,7 +42,8 @@ endif ########## ## Main ## ########## -.PHONY: all test update clean examples deps nimble install-nim install-nimble +# The Makefile automatically bootstraps dependency setup when needed for build and test targets. +.PHONY: all test clean examples deps nimble install-nim install-nimble # default target all: | wakunode2 libwaku liblogosdelivery @@ -69,18 +70,16 @@ endif waku.nims: ln -s waku.nimble $@ -$(NIMBLEDEPS_STAMP): nimble.lock | waku.nims - $(MAKE) install-nimble +$(NIMBLEDEPS_STAMP): nimble.lock | install-nimble build-nph waku.nims nimble setup --localdeps - $(MAKE) build-nph - $(MAKE) rebuild-bearssl-nimbledeps - $(MAKE) rebuild-nat-libs-nimbledeps touch $@ -update: - rm -f $(NIMBLEDEPS_STAMP) - $(MAKE) $(NIMBLEDEPS_STAMP) - nimble lock +# Must be phony so the recipe always runs and the sub-make re-evaluates +# BEARSSL_NIMBLEDEPS_DIR / NAT_TRAVERSAL_NIMBLEDEPS_DIR (parse-time variables) +# after nimble setup has populated nimbledeps/. +.PHONY: build-deps +build-deps: | $(NIMBLEDEPS_STAMP) + $(MAKE) rebuild-bearssl-nimbledeps rebuild-nat-libs-nimbledeps clean: rm -rf build 2> /dev/null || true @@ -93,15 +92,14 @@ REQUIRED_NIM_VERSION := $(shell grep -E '^const RequiredNimVersion\s*=' waku. REQUIRED_NIMBLE_VERSION := $(shell grep -E '^const RequiredNimbleVersion\s*=' waku.nimble | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"') install-nim: +ifneq ($(detected_OS),Windows) scripts/install_nim.sh $(REQUIRED_NIM_VERSION) +endif install-nimble: install-nim - @nimble_ver=$$(nimble --version 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1); \ - if [ "$$nimble_ver" = "$(REQUIRED_NIMBLE_VERSION)" ]; then \ - echo "nimble $(REQUIRED_NIMBLE_VERSION) already installed, skipping."; \ - else \ - cd $$(mktemp -d) && nimble install "nimble@$(REQUIRED_NIMBLE_VERSION)" -y; \ - fi +ifneq ($(detected_OS),Windows) + scripts/install_nimble.sh $(REQUIRED_NIMBLE_VERSION) +endif build: mkdir -p build @@ -203,7 +201,7 @@ clean: | clean-librln ################# .PHONY: testcommon -testcommon: | $(NIMBLEDEPS_STAMP) build +testcommon: | build-deps build echo -e $(BUILD_MSG) "build/$@" && \ nimble testcommon @@ -212,59 +210,59 @@ testcommon: | $(NIMBLEDEPS_STAMP) build ########## .PHONY: testwaku wakunode2 testwakunode2 example2 chat2 chat2bridge liteprotocoltester -testwaku: | $(NIMBLEDEPS_STAMP) build rln-deps librln +testwaku: | build-deps build rln-deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble test -wakunode2: | $(NIMBLEDEPS_STAMP) build deps librln +wakunode2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble wakunode2 -benchmarks: | $(NIMBLEDEPS_STAMP) build deps librln +benchmarks: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble benchmarks -testwakunode2: | $(NIMBLEDEPS_STAMP) build deps librln +testwakunode2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble testwakunode2 -example2: | $(NIMBLEDEPS_STAMP) build deps librln +example2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble example2 -chat2: | $(NIMBLEDEPS_STAMP) build deps librln +chat2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble chat2 -chat2mix: | $(NIMBLEDEPS_STAMP) build deps librln +chat2mix: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble chat2mix -rln-db-inspector: | $(NIMBLEDEPS_STAMP) build deps librln +rln-db-inspector: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble rln_db_inspector -chat2bridge: | $(NIMBLEDEPS_STAMP) build deps librln +chat2bridge: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble chat2bridge -liteprotocoltester: | $(NIMBLEDEPS_STAMP) build deps librln +liteprotocoltester: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble liteprotocoltester -lightpushwithmix: | $(NIMBLEDEPS_STAMP) build deps librln +lightpushwithmix: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble lightpushwithmix -api_example: | $(NIMBLEDEPS_STAMP) build deps librln +api_example: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim api_example $(NIM_PARAMS) waku.nims -build/%: | $(NIMBLEDEPS_STAMP) build deps librln +build/%: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$*" && \ nimble buildone $* -compile-test: | $(NIMBLEDEPS_STAMP) build deps librln +compile-test: | build-deps build deps librln echo -e $(BUILD_MSG) "$(TEST_FILE)" "\"$(TEST_NAME)\"" && \ nimble buildTest $(TEST_FILE) && \ nimble execTest $(TEST_FILE) "\"$(TEST_NAME)\"" @@ -276,11 +274,11 @@ compile-test: | $(NIMBLEDEPS_STAMP) build deps librln tools: networkmonitor wakucanary -wakucanary: | $(NIMBLEDEPS_STAMP) build deps librln +wakucanary: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble wakucanary -networkmonitor: | $(NIMBLEDEPS_STAMP) build deps librln +networkmonitor: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble networkmonitor @@ -424,10 +422,10 @@ else ifeq ($(detected_OS),Linux) BUILD_COMMAND := $(BUILD_COMMAND)Linux endif -libwaku: | $(NIMBLEDEPS_STAMP) librln +libwaku: | build-deps librln nimble --verbose libwaku$(BUILD_COMMAND) waku.nimble -liblogosdelivery: | $(NIMBLEDEPS_STAMP) librln +liblogosdelivery: | build-deps librln nimble --verbose liblogosdelivery$(BUILD_COMMAND) waku.nimble logosdelivery_example: | build liblogosdelivery diff --git a/Nat.mk b/Nat.mk index 90d0b2ead..1161121ba 100644 --- a/Nat.mk +++ b/Nat.mk @@ -9,7 +9,7 @@ ## nat-libs (nimbledeps) ## ########################### # Builds miniupnpc and libnatpmp from the package installed by nimble under -# nimbledeps/pkgs2/. Used by `make update` / $(NIMBLEDEPS_STAMP). +# nimbledeps/pkgs2/. Invoked via $(NIMBLEDEPS_STAMP) / build-deps. # # NAT_TRAVERSAL_NIMBLEDEPS_DIR is evaluated at parse time, so targets that # depend on it must be invoked via a recursive $(MAKE) call so the sub-make @@ -28,20 +28,11 @@ else PORTABLE_NAT_MARCH := endif -.PHONY: clean-cross-nimbledeps rebuild-nat-libs-nimbledeps +.PHONY: rebuild-nat-libs-nimbledeps -clean-cross-nimbledeps: +rebuild-nat-libs-nimbledeps: ifeq ($(NAT_TRAVERSAL_NIMBLEDEPS_DIR),) - $(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make update' first) -endif - + [ -e "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/miniupnp/miniupnpc" ] && \ - "$(MAKE)" -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/miniupnp/miniupnpc" CC=$(CC) clean $(HANDLE_OUTPUT) || true - + [ -e "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" ] && \ - "$(MAKE)" -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" CC=$(CC) clean $(HANDLE_OUTPUT) || true - -rebuild-nat-libs-nimbledeps: | clean-cross-nimbledeps -ifeq ($(NAT_TRAVERSAL_NIMBLEDEPS_DIR),) - $(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make update' first) + $(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make build-deps' first) endif @echo "Rebuilding nat-libs from $(NAT_TRAVERSAL_NIMBLEDEPS_DIR)" ifeq ($(OS), Windows_NT) @@ -58,4 +49,4 @@ else + "$(MAKE)" CFLAGS="-Wall -Wno-cpp -Os -fPIC $(PORTABLE_NAT_MARCH) -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4 $(CFLAGS)" \ -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" \ CC=$(CC) libnatpmp.a $(HANDLE_OUTPUT) -endif +endif \ No newline at end of file diff --git a/scripts/install_nim.sh b/scripts/install_nim.sh index c8d0f439d..42aa88ecd 100755 --- a/scripts/install_nim.sh +++ b/scripts/install_nim.sh @@ -17,26 +17,36 @@ if [ -z "${NIM_VERSION}" ]; then exit 1 fi -# Check if the right version is already installed +NIM_DEST="${HOME}/.nim/nim-${NIM_VERSION}" + +# 1. A matching Nim is already on PATH (e.g. provided by CI's setup-nim-action, +# choosenim, or a previous run of this script). Use it as-is: installing over it +# would symlink a freshly downloaded Nim into ~/.nimble/bin (first on PATH) and +# shadow a known-good toolchain, which has caused C-backend build failures. nim_ver=$(nim --version 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1 || true) if [ "${nim_ver}" = "${NIM_VERSION}" ]; then - echo "Nim ${NIM_VERSION} already installed, skipping." + echo "Nim ${NIM_VERSION} already on PATH ($(command -v nim)), skipping install." + exit 0 +fi + +# 2. Already installed at our expected location from a previous run, but not on PATH. +# Re-link binaries into ~/.nimble/bin. +if [ -f "${NIM_DEST}/lib/system.nim" ]; then + echo "Nim ${NIM_VERSION} already installed at ${NIM_DEST}, re-linking binaries." + mkdir -p "${HOME}/.nimble/bin" + for bin_path in "${NIM_DEST}/bin/"*; do + ln -sf "${bin_path}" "${HOME}/.nimble/bin/$(basename "${bin_path}")" + done exit 0 fi if [ -n "${nim_ver}" ]; then - newer=$(printf '%s\n%s\n' "${NIM_VERSION}" "${nim_ver}" | sort -V | tail -1) - if [ "${newer}" = "${nim_ver}" ]; then - echo "WARNING: Nim ${nim_ver} is installed; this repo is validated against ${NIM_VERSION}." >&2 - echo "WARNING: The build will proceed but may behave differently." >&2 - exit 0 - fi + echo "INFO: Nim ${nim_ver} found in PATH; installing Nim ${NIM_VERSION} to ${NIM_DEST}." >&2 fi OS=$(uname -s | tr 'A-Z' 'a-z' | sed 's/darwin/macosx/') ARCH=$(uname -m | sed 's/x86_64/x64/;s/aarch64/arm64/') -NIM_DEST="${HOME}/.nim/nim-${NIM_VERSION}" BINARY_URL="https://nim-lang.org/download/nim-${NIM_VERSION}-${OS}_${ARCH}.tar.xz" WORK_DIR=$(mktemp -d) trap 'rm -rf "${WORK_DIR}"' EXIT @@ -48,9 +58,7 @@ if [ "${HTTP_STATUS}" = "200" ]; then echo "Downloading pre-built binary from ${BINARY_URL}..." curl -fL "${BINARY_URL}" -o "${WORK_DIR}/nim.tar.xz" tar -xJf "${WORK_DIR}/nim.tar.xz" -C "${WORK_DIR}" - rm -rf "${NIM_DEST}" - mkdir -p "${HOME}/.nim" - cp -r "${WORK_DIR}/nim-${NIM_VERSION}" "${NIM_DEST}" + SRC_DIR="${WORK_DIR}/nim-${NIM_VERSION}" else echo "No pre-built binary found for ${OS}_${ARCH}. Building from source..." SRC_URL="https://github.com/nim-lang/Nim/archive/refs/tags/v${NIM_VERSION}.tar.gz" @@ -58,15 +66,19 @@ else tar -xzf "${WORK_DIR}/nim-src.tar.gz" -C "${WORK_DIR}" cd "${WORK_DIR}/Nim-${NIM_VERSION}" sh build_all.sh - rm -rf "${NIM_DEST}" - mkdir -p "${HOME}/.nim" - cp -r "${WORK_DIR}/Nim-${NIM_VERSION}" "${NIM_DEST}" + SRC_DIR="${WORK_DIR}/Nim-${NIM_VERSION}" fi +# rm -rf can fail with "Directory not empty" on overlay filesystems (e.g. Docker). +# Using cp -r src/. dst/ handles both cases: dst absent (clean) or partially present. +rm -rf "${NIM_DEST}" 2>/dev/null || true +mkdir -p "${NIM_DEST}" +cp -r "${SRC_DIR}/." "${NIM_DEST}/" + mkdir -p "${HOME}/.nimble/bin" for bin_path in "${NIM_DEST}/bin/"*; do ln -sf "${bin_path}" "${HOME}/.nimble/bin/$(basename "${bin_path}")" done echo "Nim ${NIM_VERSION} installed to ${NIM_DEST}" -echo "Binaries symlinked in ~/.nimble/bin — ensure it is in your PATH." +echo "Binaries symlinked in ~/.nimble/bin — ensure it is in your PATH." \ No newline at end of file diff --git a/scripts/install_nimble.sh b/scripts/install_nimble.sh new file mode 100755 index 000000000..dba2d6612 --- /dev/null +++ b/scripts/install_nimble.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash +# Installs a specific nimble version without using `nimble install nimble`. +# +# `nimble install nimble` is inherently fragile: +# - ETXTBSY: overwriting the running nimble binary in pkgs2/ +# - JSON parse failures with older nimble versions reading packages_official.json +# +# Strategy: +# 1. If the right version is already at ~/.nimble/bin/nimble → done. +# 2. If a previously-compiled binary exists in pkgs2/ → re-link it. +# 3. Otherwise: clone the nimble git repo, init submodules, build with nim, +# and atomically replace the target (mv avoids ETXTBSY on the old binary). + +set -e + +NIMBLE_VERSION="${1:-}" +if [ -z "${NIMBLE_VERSION}" ]; then + echo "Usage: $0 " >&2 + exit 1 +fi + +NIMBLE_BIN="${HOME}/.nimble/bin/nimble" + +# 1. Already installed at the right version? +if [ -x "${NIMBLE_BIN}" ]; then + nimble_ver=$("${NIMBLE_BIN}" --version 2>/dev/null \ + | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1 || true) + if [ "${nimble_ver}" = "${NIMBLE_VERSION}" ]; then + echo "Nimble ${NIMBLE_VERSION} already installed, skipping." + exit 0 + fi +fi + +# 2. Already compiled into pkgs2/ from a previous (possibly partial) run? +PKGS2_NIMBLE=$(ls -dt "${HOME}/.nimble/pkgs2/nimble-${NIMBLE_VERSION}-"*/nimble \ + 2>/dev/null | head -1 || true) +if [ -n "${PKGS2_NIMBLE}" ] && [ -x "${PKGS2_NIMBLE}" ]; then + echo "Nimble ${NIMBLE_VERSION} found in pkgs2, re-linking to ${NIMBLE_BIN}." + mkdir -p "${HOME}/.nimble/bin" + ln -sf "${PKGS2_NIMBLE}" "${NIMBLE_BIN}" + exit 0 +fi + +# 3. Build from source. +NIM_BIN="${HOME}/.nimble/bin/nim" +if [ ! -x "${NIM_BIN}" ]; then + NIM_BIN="$(command -v nim)" +fi + +WORK_DIR="$(mktemp -d)" +trap 'rm -rf "${WORK_DIR}"' EXIT + +echo "Cloning nimble v${NIMBLE_VERSION} with submodules..." +git clone --depth=1 --branch "v${NIMBLE_VERSION}" \ + --recurse-submodules --shallow-submodules \ + https://github.com/nim-lang/nimble.git \ + "${WORK_DIR}/nimble" + +echo "Building nimble ${NIMBLE_VERSION} with $("${NIM_BIN}" --version | head -1)..." +cd "${WORK_DIR}/nimble" +# nim reads nim.cfg / config.nims in the current dir, which sets vendor paths. +"${NIM_BIN}" c -d:release --path:src \ + -o:"${WORK_DIR}/nimble_new" src/nimble.nim + +mkdir -p "${HOME}/.nimble/bin" +# Atomic rename: avoids ETXTBSY when the old binary at NIMBLE_BIN is still running. +cp "${WORK_DIR}/nimble_new" "${NIMBLE_BIN}.new.$$" +mv -f "${NIMBLE_BIN}.new.$$" "${NIMBLE_BIN}" + +echo "Nimble ${NIMBLE_VERSION} installed to ${NIMBLE_BIN}" \ No newline at end of file From 5e262badf74aa6bf80315887bfd11f4193859968 Mon Sep 17 00:00:00 2001 From: Darshan <35736874+darshankabariya@users.noreply.github.com> Date: Wed, 27 May 2026 23:58:30 +0530 Subject: [PATCH 3/8] chore: fixing daily ci (#3878) --- .github/workflows/ci-daily.yml | 1 + Makefile | 3 ++- apps/chat2bridge/chat2bridge.nim | 10 ++++++++-- apps/chat2bridge/config_chat2bridge.nim | 11 +++++------ apps/chat2mix/config_chat2mix.nim | 8 ++++++-- apps/liteprotocoltester/tester_config.nim | 2 +- apps/networkmonitor/networkmonitor_config.nim | 2 +- .../node/peer_manager/peer_store/test_migrations.nim | 4 ++-- .../peer_manager/peer_store/test_peer_storage.nim | 2 +- tests/node/test_wakunode_relay_rln.nim | 2 +- tests/test_utils_compat.nim | 2 +- tests/waku_enr/test_sharding.nim | 2 +- 12 files changed, 30 insertions(+), 19 deletions(-) diff --git a/.github/workflows/ci-daily.yml b/.github/workflows/ci-daily.yml index d52775ae2..009e6c523 100644 --- a/.github/workflows/ci-daily.yml +++ b/.github/workflows/ci-daily.yml @@ -3,6 +3,7 @@ name: Daily logos-delivery CI on: schedule: - cron: '30 6 * * *' + workflow_dispatch: env: NPROC: 2 diff --git a/Makefile b/Makefile index 0515ef3ec..ea1bf66f0 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,7 @@ export PATH := $(HOME)/.nimble/bin:$(PATH) # NIM binary location NIM_BINARY := $(shell which nim 2>/dev/null) NPH := $(HOME)/.nimble/bin/nph +NIMBLE := $(HOME)/.nimble/bin/nimble NIMBLEDEPS_STAMP := nimbledeps/.nimble-setup # Compilation parameters @@ -71,7 +72,7 @@ waku.nims: ln -s waku.nimble $@ $(NIMBLEDEPS_STAMP): nimble.lock | install-nimble build-nph waku.nims - nimble setup --localdeps + $(NIMBLE) setup --localdeps touch $@ # Must be phony so the recipe always runs and the sub-make re-evaluates diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 53eb5d04b..eeeea328b 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[tables, times, strutils, hashes, sequtils, json], + std/[tables, times, strutils, hashes, sequtils, json, options], chronos, confutils, chronicles, @@ -267,10 +267,16 @@ when isMainModule: else: nodev2ExtPort + let nodev2Key = + if conf.nodekey.isSome(): + conf.nodekey.get() + else: + crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() + let bridge = Chat2Matterbridge.new( mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)), mbGateway = conf.mbGateway, - nodev2Key = conf.nodekey, + nodev2Key = nodev2Key, nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift), nodev2ExtIp = nodev2ExtIp, diff --git a/apps/chat2bridge/config_chat2bridge.nim b/apps/chat2bridge/config_chat2bridge.nim index abb5e329f..048fc4d87 100644 --- a/apps/chat2bridge/config_chat2bridge.nim +++ b/apps/chat2bridge/config_chat2bridge.nim @@ -1,4 +1,5 @@ import + std/options, confutils, confutils/defs, confutils/std/net, @@ -45,7 +46,7 @@ type Chat2MatterbridgeConf* = object metricsServerAddress* {. desc: "Listening address of the metrics server", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress @@ -62,10 +63,8 @@ type Chat2MatterbridgeConf* = object .}: seq[string] nodekey* {. - desc: "P2P node private key as hex", - defaultValue: crypto.PrivateKey.random(Secp256k1, newRng()[]).tryGet(), - name: "nodekey" - .}: crypto.PrivateKey + desc: "P2P node private key as hex", defaultValueDesc: "random", name: "nodekey" + .}: Option[crypto.PrivateKey] store* {. desc: "Flag whether to start store protocol", defaultValue: true, name: "store" @@ -94,7 +93,7 @@ type Chat2MatterbridgeConf* = object # Matterbridge options mbHostAddress* {. desc: "Listening address of the Matterbridge host", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "mb-host-address" .}: IpAddress diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim index 4e5a32e6d..639e14986 100644 --- a/apps/chat2mix/config_chat2mix.nim +++ b/apps/chat2mix/config_chat2mix.nim @@ -162,7 +162,8 @@ type metricsServerAddress* {. desc: "Listening address of the metrics server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: + IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress @@ -194,7 +195,10 @@ type dnsDiscoveryNameServers* {. desc: "DNS name server IPs to query. Argument may be repeated.", - defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")], + defaultValue: @[ + IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1]), + IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 0, 0, 1]), + ], name: "dns-discovery-name-server" .}: seq[IpAddress] diff --git a/apps/liteprotocoltester/tester_config.nim b/apps/liteprotocoltester/tester_config.nim index dee918b8c..1f4bedaa8 100644 --- a/apps/liteprotocoltester/tester_config.nim +++ b/apps/liteprotocoltester/tester_config.nim @@ -133,7 +133,7 @@ type LiteProtocolTesterConf* = object ## Tester REST service configuration restAddress* {. desc: "Listening address of the REST HTTP server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "rest-address" .}: IpAddress diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index f67fb09a8..b5bcfbd96 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -116,7 +116,7 @@ type NetworkMonitorConf* = object metricsServerAddress* {. desc: "Listening address of the metrics server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress diff --git a/tests/node/peer_manager/peer_store/test_migrations.nim b/tests/node/peer_manager/peer_store/test_migrations.nim index a20d065ec..d6b86a15b 100644 --- a/tests/node/peer_manager/peer_store/test_migrations.nim +++ b/tests/node/peer_manager/peer_store/test_migrations.nim @@ -1,11 +1,11 @@ -import std/[options], stew/results, testutils/unittests +import std/[options], results, testutils/unittests import waku/node/peer_manager/peer_store/migrations, ../../waku_archive/archive_utils, ../../testlib/[simple_mock] -import std/[tables, strutils, os], stew/results, chronicles +import std/[tables, strutils, os], results, chronicles import waku/common/databases/db_sqlite, waku/common/databases/common diff --git a/tests/node/peer_manager/peer_store/test_peer_storage.nim b/tests/node/peer_manager/peer_store/test_peer_storage.nim index c8a479178..871df8644 100644 --- a/tests/node/peer_manager/peer_store/test_peer_storage.nim +++ b/tests/node/peer_manager/peer_store/test_peer_storage.nim @@ -1,4 +1,4 @@ -import stew/results, testutils/unittests +import results, testutils/unittests import waku/node/peer_manager/peer_store/peer_storage, waku/waku_core/peers diff --git a/tests/node/test_wakunode_relay_rln.nim b/tests/node/test_wakunode_relay_rln.nim index c8ca9b43d..3a2a8a67c 100644 --- a/tests/node/test_wakunode_relay_rln.nim +++ b/tests/node/test_wakunode_relay_rln.nim @@ -2,7 +2,7 @@ import std/[tempfiles, strutils, options], - stew/results, + results, testutils/unittests, chronos, libp2p/switch, diff --git a/tests/test_utils_compat.nim b/tests/test_utils_compat.nim index 121efa4a5..1394982ef 100644 --- a/tests/test_utils_compat.nim +++ b/tests/test_utils_compat.nim @@ -1,7 +1,7 @@ {.used.} import testutils/unittests -import stew/results, waku/waku_core/message, waku/waku_core/time, ./testlib/common +import results, waku/waku_core/message, waku/waku_core/time, ./testlib/common suite "Waku Payload": test "Encode/Decode waku message with timestamp": diff --git a/tests/waku_enr/test_sharding.nim b/tests/waku_enr/test_sharding.nim index 344436d0e..789f8faec 100644 --- a/tests/waku_enr/test_sharding.nim +++ b/tests/waku_enr/test_sharding.nim @@ -1,7 +1,7 @@ {.used.} import - stew/results, + results, chronos, testutils/unittests, libp2p/crypto/crypto as libp2p_keys, From 74057c66224f43b4aa27b42033d4ed52eed5c7a7 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 27 May 2026 23:05:20 +0200 Subject: [PATCH 4/8] start basic reliable channel folder (#3886) --- .github/workflows/ci.yml | 1 + channels/encryption/encryption.nim | 25 ++ channels/encryption/noop_encryption.nim | 18 ++ channels/events.nim | 23 ++ .../rate_limit_manager/rate_limit_manager.nim | 80 ++++++ channels/reliable_channel.nim | 264 ++++++++++++++++++ channels/reliable_channel_manager.nim | 138 +++++++++ .../scalable_data_sync/scalable_data_sync.nim | 62 ++++ .../scalable_data_sync/sds_persistence.nim | 25 ++ .../segmentation/segment_message_proto.nim | 34 +++ channels/segmentation/segmentation.nim | 70 +++++ .../segmentation/segmentation_persistence.nim | 20 ++ channels/types.nim | 15 + tests/all_tests_waku.nim | 3 + tests/channels/test_all.nim | 3 + .../test_reliable_channel_send_receive.nim | 149 ++++++++++ 16 files changed, 930 insertions(+) create mode 100644 channels/encryption/encryption.nim create mode 100644 channels/encryption/noop_encryption.nim create mode 100644 channels/events.nim create mode 100644 channels/rate_limit_manager/rate_limit_manager.nim create mode 100644 channels/reliable_channel.nim create mode 100644 channels/reliable_channel_manager.nim create mode 100644 channels/scalable_data_sync/scalable_data_sync.nim create mode 100644 channels/scalable_data_sync/sds_persistence.nim create mode 100644 channels/segmentation/segment_message_proto.nim create mode 100644 channels/segmentation/segmentation.nim create mode 100644 channels/segmentation/segmentation_persistence.nim create mode 100644 channels/types.nim create mode 100644 tests/channels/test_all.nim create mode 100644 tests/channels/test_reliable_channel_send_receive.nim diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 52d20157a..c54d828ae 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,6 +43,7 @@ jobs: - 'tools/**' - 'tests/all_tests_v2.nim' - 'tests/**' + - 'channels/**' docker: - 'docker/**' diff --git a/channels/encryption/encryption.nim b/channels/encryption/encryption.nim new file mode 100644 index 000000000..5cb53be2f --- /dev/null +++ b/channels/encryption/encryption.nim @@ -0,0 +1,25 @@ +## Optional encryption hooks for the Reliable Channel API. +## +## Modelled as `RequestBroker`s: the broker pattern lets the channel +## delegate work to a provider that may live in any module without +## introducing a direct dependency. If no provider is registered the +## broker returns an error, so installing the noop providers from +## `noop_encryption` is required when the application does not want +## actual encryption. +## +## Applied per-segment after SDS processing on outgoing, and before +## SDS processing on incoming. No specific scheme is mandated. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import brokers/request_broker + +export request_broker + +RequestBroker: + type Encrypt* = seq[byte] + proc signature*(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} + +RequestBroker: + type Decrypt* = seq[byte] + proc signature*(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} diff --git a/channels/encryption/noop_encryption.nim b/channels/encryption/noop_encryption.nim new file mode 100644 index 000000000..f09ed9cf4 --- /dev/null +++ b/channels/encryption/noop_encryption.nim @@ -0,0 +1,18 @@ +## No-op encryption providers. Install these when the application does +## not want actual encryption so the `Encrypt` / `Decrypt` brokers have +## something to dispatch to. + +import results +import chronos +import ./encryption + +proc setNoopEncryption*() = + discard Encrypt.setProvider( + proc(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} = + return ok(Encrypt(payload)) + ) + + discard Decrypt.setProvider( + proc(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} = + return ok(Decrypt(payload)) + ) diff --git a/channels/events.nim b/channels/events.nim new file mode 100644 index 000000000..5a17c99d2 --- /dev/null +++ b/channels/events.nim @@ -0,0 +1,23 @@ +## Reliable Channel event types emitted to API consumers. +## +## Lifecycle events for individual segments (sent / propagated / errored) +## are the same as the network-level ones the DeliveryService already +## emits — `requestId` is shared across layers — so we just re-export +## `waku/events/message_events` and avoid declaring duplicates. +## +## Only the channel-level `MessageReceivedEvent` carries data that has +## no analogue in the lower layer (reassembled application payload, +## senderId, channelId), so it lives here. + +import waku/events/message_events as waku_message_events +import brokers/event_broker + +import ./types as channel_types + +export waku_message_events, channel_types, event_broker + +EventBroker: + type ChannelMessageReceivedEvent* = object + channelId*: ChannelId + senderId*: SdsParticipantID + payload*: seq[byte] diff --git a/channels/rate_limit_manager/rate_limit_manager.nim b/channels/rate_limit_manager/rate_limit_manager.nim new file mode 100644 index 000000000..5ea6486a5 --- /dev/null +++ b/channels/rate_limit_manager/rate_limit_manager.nim @@ -0,0 +1,80 @@ +## Rate Limit Manager for the Reliable Channel API. +## +## Tracks messages sent per RLN epoch and delays dispatch when the +## limit is approached, ensuring RLN compliance on enforcing relays. +## +## For the skeleton this is a pass-through: messages are immediately +## released as ready-to-send. Real epoch budgeting will be added later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/times +import message +import brokers/event_broker +import brokers/broker_context + +export event_broker, broker_context +export message.SdsChannelID + +const + DefaultEpochPeriodSec* = 600 + DefaultMessagesPerEpoch* = 1 + +EventBroker: + ## Emitted by `enqueueToSend` carrying the batch of opaque message + ## blobs that may now leave the rate limiter and continue down the + ## outgoing pipeline (encryption -> dispatch). Bytes only: the rate + ## limiter is intentionally agnostic of SDS, so anything serialisable + ## can flow through it. + ## + ## `channelId` lets listeners filter to their own channel, since all + ## reliable channels share the underlying Waku node's broker context. + type ReadyToSendEvent* = object + channelId*: SdsChannelID + msgs*: seq[seq[byte]] + +type + RateLimitConfig* = object + enabled*: bool ## spec: rate limiting opt-in; SHOULD be true when RLN active + epochPeriodSec*: int + messagesPerEpoch*: int + + RateLimitManager* = ref object + config*: RateLimitConfig + queue*: seq[seq[byte]] + currentEpochStart*: Time + sentInCurrentEpoch*: int + channelId*: SdsChannelID ## tag for the emitted `ReadyToSendEvent` + brokerCtx: BrokerContext + +proc new*( + T: type RateLimitManager, + config: RateLimitConfig, + channelId: SdsChannelID, + brokerCtx: BrokerContext = globalBrokerContext(), +): T = + return T( + config: config, + queue: @[], + currentEpochStart: getTime(), + sentInCurrentEpoch: 0, + channelId: channelId, + brokerCtx: brokerCtx, + ) + +proc enqueueToSend*(self: RateLimitManager, msg: seq[byte]) = + ## Skeleton behaviour: enqueue and immediately release as a single + ## ready batch. Real per-epoch budgeting will park messages on + ## `self.queue` and emit only when the budget allows. + ReadyToSendEvent.emit( + self.brokerCtx, ReadyToSendEvent(channelId: self.channelId, msgs: @[msg]) + ) + +proc dequeueReady*(self: RateLimitManager): seq[seq[byte]] = + ## Returns the set of queued messages that may be dispatched now + ## without exceeding the configured rate limit. + discard + +proc resetEpoch*(self: RateLimitManager) = + self.currentEpochStart = getTime() + self.sentInCurrentEpoch = 0 diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim new file mode 100644 index 000000000..2a7d01d35 --- /dev/null +++ b/channels/reliable_channel.nim @@ -0,0 +1,264 @@ +## Reliable Channel type. +## +## A `ReliableChannel` orchestrates segmentation, SDS (end-to-end +## reliability), optional encryption, and rate-limited dispatch on top +## of the Messaging API for a single channel. +## +## Outgoing pipeline: Segment -> SDS -> Rate Limit -> Encrypt -> Dispatch +## Incoming pipeline: Decrypt -> SDS -> Reassemble -> Emit event +## +## Channels are owned by a `ReliableChannelManager`. Lifecycle and send +## operations are addressed by `ChannelId`, so callers only need to keep +## an opaque handle around. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/[options, tables] +import results +import chronos +import bearssl/rand +import stew/byteutils +import libp2p/crypto/crypto as libp2p_crypto + +import waku/node/delivery_service/delivery_service +import waku/node/delivery_service/send_service +import waku/waku_core/topics + +import ./events +import ./segmentation/segmentation +import ./scalable_data_sync/scalable_data_sync +import ./rate_limit_manager/rate_limit_manager +import ./encryption/encryption + +export + delivery_service, send_service, events, segmentation, scalable_data_sync, + rate_limit_manager, encryption + +const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" + ## Wire-format spec marker for the Reliable Channel layer, as defined + ## in the reliable-channel-api LIP (`Wire Format / Spec Marker`). + ## A `WakuMessage` whose `meta` field does not equal these bytes is + ## not addressed to this layer and is silently dropped on ingress. + ## The trailing `/N` is the wire-format version and is bumped only + ## on breaking on-the-wire changes; implementations pin one version. + +type ReliableChannel* = ref object + ## Spec-defined public type. Fields are private so callers cannot + ## mutate internals and break invariants. Getters are added below + ## for the few values consumers may need. + deliveryService: DeliveryService + channelId: ChannelId + contentTopic: ContentTopic + senderId: SdsParticipantID + rng: ref HmacDrbgContext + segmentation: SegmentationHandler + sdsHandler: SdsHandler + rateLimit: RateLimitManager + + requestIds: Table[RequestId, seq[RequestId]] + pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]] + brokerCtx: BrokerContext + ## Captured here so the channel emits `ChannelMessageReceivedEvent` + ## on the same broker context the owning manager registered its + ## listeners on. Without this, an emit via `globalBrokerContext()` + ## would land on whatever context happens to be thread-local at + ## emit time, which is not necessarily the manager's. + +func getChannelId*(self: ReliableChannel): ChannelId {.inline.} = + self.channelId + +func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} = + self.contentTopic + +func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = + self.senderId + +proc onReadyToSend( + self: ReliableChannel, msgs: seq[seq[byte]] +) {.async: (raises: []).} = + ## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent` + ## listener once `rate_limit_manager` releases a batch of opaque + ## blobs (already-encoded SDS messages): + ## + ## ... -> rate_limit_manager -> [encryption] -> dispatch + for m in msgs: + ## Each `m` was preceded by exactly one push onto `pendingRequests` + ## in `send`, so this pop is always safe in the current skeleton. + let pending = self.pendingRequests[0] + self.pendingRequests.delete(0) + + ## TODO: revisit which fields of the SDS message must be encrypted. + ## Encrypting the whole encoded blob forces every receiver to attempt + ## decryption before it can route, which breaks selective dispatch. + ## Leave routing metadata (channelId, causal-history references) in + ## clear and encrypt only the application payload. + let encRes = await Encrypt.request(m) + let encrypted = encRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: pending.parent, + messageHash: "", + error: "encryption failed: " & error, + ), + ) + continue + let wireBytes = seq[byte](encrypted) + + let envelope = MessageEnvelope( + contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral + ) + + let deliveryReqId = RequestId.new(self.rng) + let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr: + ## TODO: emit waku `MessageErrorEvent` for the parent request id. + continue + + ## Stamp the Reliable Channel wire-format spec marker so the ingress + ## side of any peer can route this WakuMessage to its Reliable + ## Channel layer. Done on the constructed WakuMessage rather than + ## via the envelope because `MessageEnvelope` does not expose a + ## `meta` field. + deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes() + + asyncSpawn self.deliveryService.sendService.send(deliveryTask) + self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId) + +proc send*( + self: ReliableChannel, payload: seq[byte], ephemeral: bool = false +): Result[RequestId, string] = + ## Single application-level send. The first three stages of the + ## outgoing pipeline are chained explicitly so the flow is visible + ## at a glance: + ## + ## segmentation -> sds -> rate_limit_manager + ## + ## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with + ## the SDS messages cleared for transmission; the channel's listener + ## then runs the final stage (encryption -> dispatch). The `ephemeral` + ## flag is carried alongside each segment in `pendingRequests` and + ## stamped onto the eventual `MessageEnvelope`. + ## + ## The returned `RequestId` is the parent of one-or-more + ## delivery-service `RequestId`s; the mapping is recorded in + ## `self.requestIds`. + if payload.len == 0: + return err("empty payload") + + let parentReqId = RequestId.new(self.rng) + self.requestIds[parentReqId] = @[] + + for segmentBytes in self.segmentation.performSegmentation(payload): + ## Segments arrive already encoded; the segmentation module owns + ## the wire format so SDS only ever sees opaque bytes. + let sdsBytes = self.sdsHandler.wrapOutgoing( + self.channelId, self.senderId, segmentBytes + ).valueOr: + return err("SDS wrap failed: " & error) + self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral)) + self.rateLimit.enqueueToSend(sdsBytes) + + return ok(parentReqId) + +proc onMessageReceived( + self: ReliableChannel, messageHash: string, payload: seq[byte] +) {.async: (raises: []).} = + ## Ingress pipeline made visible: + ## + ## payload -> decrypt -> sds -> reassemble -> emit + ## + ## Invoked from this channel's `MessageReceivedEvent` listener, which + ## already filtered on the spec marker and on `contentTopic`. The + ## channel only sees the raw payload bytes for itself. + + ## Notice that the following "request" is implemented implicitly as a broker call to + ## the `Decrypt` request broker. + let decRes = await Decrypt.request(payload) + let plaintext = decRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: RequestId(""), + messageHash: messageHash, + error: "decryption failed: " & error, + ), + ) + return + let plaintextBytes = seq[byte](plaintext) + + let unwrapped = self.sdsHandler.handleIncoming(plaintextBytes) + if unwrapped.isErr(): + return + + let reassembled = self.segmentation.handleIncomingSegment(unwrapped.get().content) + if reassembled.isSome(): + ## Emit on the captured `brokerCtx` (the manager's), so the + ## application listener that the manager has set up on that same + ## context picks the event up. + ChannelMessageReceivedEvent.emit( + self.brokerCtx, + ChannelMessageReceivedEvent( + channelId: self.channelId, + senderId: self.senderId, + payload: reassembled.get().payload, + ), + ) + +proc new*( + T: type ReliableChannel, + deliveryService: DeliveryService, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, + segConfig: SegmentationConfig, + sdsConfig: SdsConfig, + rateConfig: RateLimitConfig, + brokerCtx: BrokerContext = globalBrokerContext(), +): T = + ## Pipeline handlers (segmentation/SDS/rate-limit) are constructed + ## inside the channel rather than handed in by the caller — they are + ## implementation details of the channel, not knobs the API consumer + ## should be wiring up. Encryption is delegated to the `Encrypt`/ + ## `Decrypt` request brokers, so the channel keeps no per-instance + ## encryption state either. + let chn = T( + deliveryService: deliveryService, + channelId: channelId, + contentTopic: contentTopic, + senderId: senderId, + rng: libp2p_crypto.newRng(), + segmentation: SegmentationHandler.new(segConfig), + sdsHandler: SdsHandler.new(sdsConfig, senderId), + rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx), + requestIds: initTable[RequestId, seq[RequestId]](), + pendingRequests: @[], + brokerCtx: brokerCtx, + ) + + ## Each channel owns its own egress + ingress listeners on + ## `chn.brokerCtx`, filtered to traffic addressed to this channel. + ## Keeping the listeners (and the procs they call) inside the + ## channel lets `onReadyToSend` and `onMessageReceived` stay private + ## — the manager doesn't need to know about them. + discard ReadyToSendEvent.listen( + chn.brokerCtx, + proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} = + if evt.channelId == chn.channelId: + await chn.onReadyToSend(evt.msgs) + , + ) + + discard MessageReceivedEvent.listen( + chn.brokerCtx, + proc(evt: MessageReceivedEvent): Future[void] {.async: (raises: []).} = + ## Drop foreign traffic (non-Reliable-Channel `meta`) and traffic + ## for other channels before doing any decode work. + if string.fromBytes(evt.message.meta) != LipWireReliableChannelVersion: + return + if evt.message.contentTopic != chn.contentTopic: + return + await chn.onMessageReceived(evt.messageHash, evt.message.payload) + , + ) + + return chn diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim new file mode 100644 index 000000000..ddbdb37a6 --- /dev/null +++ b/channels/reliable_channel_manager.nim @@ -0,0 +1,138 @@ +## Reliable Channel API entry point. +## +## Owns the set of `ReliableChannel` instances and exposes lifecycle and +## send/receive operations addressed by `ChannelId`. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/tables +import results +import chronos +import stew/byteutils + +import waku/api/api +import waku/api/api_conf +import waku/events/message_events as waku_message_events +import waku/factory/waku as waku_factory +import waku/waku_core/topics + +import ./reliable_channel +import ./encryption/noop_encryption + +export reliable_channel + +type ReliableChannelManager* = ref object + channels: Table[ChannelId, ReliableChannel] + deliveryService: DeliveryService + ## Owned by the manager. The ownership chain is + ## ReliableChannelManager -> DeliveryService -> Waku -> WakuNode. + ## Hidden so callers can't substitute their own and bypass the + ## manager's pipeline. + brokerCtx: BrokerContext + +proc new*( + T: type ReliableChannelManager, + conf: WakuNodeConf, + brokerCtx: BrokerContext = globalBrokerContext(), +): Future[Result[T, string]] {.async.} = + ## TODO !! The proper ownership chain is: + ## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode, + ## and this will be implemented in the future. For now, `createNode` + ## is called here to get a DeliveryService instance, and the WakuNode is immediately discarded. + ## This is a temporary workaround to get the API + + let waku = ?(await createNode(conf)) + + let manager = T( + channels: initTable[ChannelId, ReliableChannel](), + deliveryService: waku.deliveryService, + brokerCtx: brokerCtx, + ) + + return ok(manager) + +proc start*(self: ReliableChannelManager): Result[void, string] = + ## Bring the owned DeliveryService up. Separated from `new` so callers + ## can register encryption providers / create channels before traffic + ## starts flowing. + self.deliveryService.startDeliveryService() + +proc stop*(self: ReliableChannelManager) {.async.} = + if not self.deliveryService.isNil(): + await self.deliveryService.stopDeliveryService() + +proc createReliableChannel*( + self: ReliableChannelManager, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, +): Result[ChannelId, string] = + ## Spec entry point. The `DeliveryService` and `rng` the channel needs + ## are sourced from the owning `ReliableChannelManager` rather than + ## passed per call. Encryption is wired up through the `Encrypt`/ + ## `Decrypt` request brokers — the application installs its own + ## providers (or `setNoopEncryption()`) before traffic flows. + ## + ## Segmentation, SDS and rate-limit configs will eventually be read + ## from the node's `NodeConfig`. Defaults for now. + if self.channels.hasKey(channelId): + return err("channel already exists: " & channelId) + + let segConfig = SegmentationConfig( + segmentSizeBytes: DefaultSegmentSizeBytes, + enableReedSolomon: false, + persistence: nil, + ) + let sdsConfig = SdsConfig( + acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, + maxRetransmissions: DefaultMaxRetransmissions, + causalHistorySize: DefaultCausalHistorySize, + persistence: nil, + ) + let rateConfig = RateLimitConfig( + epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch + ) + + let chn = ReliableChannel.new( + deliveryService = self.deliveryService, + channelId = channelId, + contentTopic = contentTopic, + senderId = senderId, + segConfig = segConfig, + sdsConfig = sdsConfig, + rateConfig = rateConfig, + brokerCtx = self.brokerCtx, + ) + + self.channels[channelId] = chn + return ok(channelId) + +proc closeChannel*( + self: ReliableChannelManager, channelId: ChannelId +): Result[void, string] = + ## Flush state, persist outstanding SDS buffers, release resources. + if not self.channels.hasKey(channelId): + return err("unknown channel: " & channelId) + self.channels.del(channelId) + return ok() + +proc send*( + self: ReliableChannelManager, + channelId: ChannelId, + appPayload: seq[byte], + ephemeral: bool = false, +): Result[RequestId, string] = + ## Spec-level entry point. Looks the channel up by id and delegates + ## to `ReliableChannel.send`, which exposes the visible pipeline + ## segmentation -> sds -> rate_limit_manager -> encryption. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): + return err("unknown channel: " & channelId) + return chn.send(appPayload, ephemeral) + +## Inbound messages are not handed to the manager by direct call. Each +## `ReliableChannel` installs its own `MessageReceivedEvent` listener +## in `ReliableChannel.new`, filters by spec marker and `contentTopic`, +## and routes to its private `onMessageReceived`. This keeps the lower +## layer (MessagingAPI/Waku) unaware of the existence of ReliableChannel +## and keeps the manager out of per-channel event dispatch. diff --git a/channels/scalable_data_sync/scalable_data_sync.nim b/channels/scalable_data_sync/scalable_data_sync.nim new file mode 100644 index 000000000..30ad0e02b --- /dev/null +++ b/channels/scalable_data_sync/scalable_data_sync.nim @@ -0,0 +1,62 @@ +## Scalable Data Sync (SDS) component for the Reliable Channel API. +## +## Provides end-to-end delivery guarantees via causal history tracking, +## acknowledgements, and retransmission of unacknowledged segments. +## +## Skeleton: `wrapOutgoing` and `handleIncoming` are pass-throughs so +## the send/receive circuit can exercise the surrounding pipeline. +## Real SDS wrapping will plug in via `nim-sds` later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import results +import message as sds_message + +import ./sds_persistence + +export sds_message, sds_persistence + +const + DefaultAcknowledgementTimeoutMs* = 5_000 + DefaultMaxRetransmissions* = 5 + DefaultCausalHistorySize* = 2 + +type + SdsConfig* = object + acknowledgementTimeoutMs*: int + maxRetransmissions*: int + causalHistorySize*: int + persistence*: SdsPersistence + + SdsHandler* = ref object + config*: SdsConfig + participantId*: SdsParticipantID + +proc new*( + T: type SdsHandler, + config: SdsConfig, + participantId: SdsParticipantID = SdsParticipantID(""), +): T = + return T(config: config, participantId: participantId) + +proc wrapOutgoing*( + self: SdsHandler, + channelId: SdsChannelID, + senderId: SdsParticipantID, + payload: seq[byte], +): Result[seq[byte], string] = + ## Stage 2 of the outgoing pipeline (segmentation -> sds -> rate_limit_manager -> encryption). + ## Skeleton: pass the encoded segment through unchanged. Real causal + ## history / lamport / bloom-filter population will replace this. + return ok(payload) + +proc handleIncoming*( + self: SdsHandler, msg: seq[byte] +): Result[tuple[content: seq[byte], channelId: SdsChannelID], string] = + ## Skeleton: pass the bytes through; channel id is left empty until + ## the real wire format provides it. + return ok((content: msg, channelId: SdsChannelID(""))) + +proc tickRetransmissions*(self: SdsHandler) = + ## Drives retransmissions of unacknowledged messages. + discard diff --git a/channels/scalable_data_sync/sds_persistence.nim b/channels/scalable_data_sync/sds_persistence.nim new file mode 100644 index 000000000..8089595ea --- /dev/null +++ b/channels/scalable_data_sync/sds_persistence.nim @@ -0,0 +1,25 @@ +## Persistence backend for SDS outgoing buffer and causal history. +## +## TODO (raised in PR review): this surface is duplicating concerns that +## should come from the SDS module itself. Once the SDS module exposes a +## complete persistence contract, drop this file and import that surface +## instead of re-declaring it here. + +import message + +type + SdsPersistenceKind* {.pure.} = enum + InMemory + Sqlite + + SdsPersistence* = ref object of RootObj + kind*: SdsPersistenceKind + +method storeOutgoing*(self: SdsPersistence, msg: SdsMessage) {.base.} = + discard + +method markAcknowledged*(self: SdsPersistence, messageId: SdsMessageID) {.base.} = + discard + +method unackedOlderThan*(self: SdsPersistence, ageMs: int): seq[SdsMessage] {.base.} = + discard diff --git a/channels/segmentation/segment_message_proto.nim b/channels/segmentation/segment_message_proto.nim new file mode 100644 index 000000000..f19cdc27f --- /dev/null +++ b/channels/segmentation/segment_message_proto.nim @@ -0,0 +1,34 @@ +## Wire format for a single segment, per the Reliable Channel API spec. +## +## Skeleton: encode/decode treat the segment as just its payload bytes, +## since for now we only ever produce a single segment per send. + +type SegmentMessageProto* = object + entireMessageHash*: seq[byte] ## Keccak256(original payload), 32 bytes + dataSegmentIndex*: uint32 ## zero-indexed sequence number for data segments + dataSegmentCount*: uint32 ## number of data segments (>= 1) + payload*: seq[byte] ## segment payload (data or parity shard) + paritySegmentIndex*: uint32 ## zero-based sequence number for parity segments + paritySegmentCount*: uint32 ## number of parity segments + isParity*: bool ## true for parity segments, false (default) for data segments + +proc isParityMessage*(self: SegmentMessageProto): bool = + self.isParity + +proc isValid*(self: SegmentMessageProto): bool = + ## Validates hash length (32 bytes), segment indices and counts. + discard + +proc encode*(self: SegmentMessageProto): seq[byte] = + self.payload + +proc decode*(T: type SegmentMessageProto, buf: seq[byte]): T = + T( + entireMessageHash: @[], + dataSegmentIndex: 0, + dataSegmentCount: 1, + payload: buf, + paritySegmentIndex: 0, + paritySegmentCount: 0, + isParity: false, + ) diff --git a/channels/segmentation/segmentation.nim b/channels/segmentation/segmentation.nim new file mode 100644 index 000000000..9fc7964c0 --- /dev/null +++ b/channels/segmentation/segmentation.nim @@ -0,0 +1,70 @@ +## Segmentation component for the Reliable Channel API. +## +## Splits large application payloads into transmittable segments and +## reassembles them on reception. Supports optional Reed-Solomon parity +## segments for loss recovery, as per the Reliable Channel API spec. +## +## For the skeleton everything fits in a single segment: real chunking +## and Reed-Solomon parity will be plugged in later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/options +import ./segment_message_proto +import ./segmentation_persistence + +export segment_message_proto, segmentation_persistence + +const + DefaultSegmentSizeBytes* = 102_400 + SegmentsParityRate* = 0.125 + SegmentsReedSolomonMaxCount* = 256 + +type + SegmentationConfig* = object + segmentSizeBytes*: int + enableReedSolomon*: bool + persistence*: SegmentationPersistence + + SegmentationHandler* = ref object + config*: SegmentationConfig + + ReassemblyResult* = object + payload*: seq[byte] + entireMessageHash*: seq[byte] + +proc new*(T: type SegmentationHandler, config: SegmentationConfig): T = + return T(config: config) + +proc performSegmentation*( + self: SegmentationHandler, payload: seq[byte] +): seq[seq[byte]] = + ## Skeleton behaviour: emit exactly one segment carrying the whole + ## payload. Real chunking and Reed-Solomon parity will replace this. + let segment = SegmentMessageProto( + entireMessageHash: @[], + dataSegmentIndex: 0, + dataSegmentCount: 1, + payload: payload, + paritySegmentIndex: 0, + paritySegmentCount: 0, + isParity: false, + ) + return @[segment.encode()] + +proc handleIncomingSegment*( + self: SegmentationHandler, segmentBytes: seq[byte] +): Option[ReassemblyResult] = + ## Skeleton behaviour: every segment is already a complete message + ## (since `performSegmentation` always emits one), so just hand the + ## payload straight back. + let segment = SegmentMessageProto.decode(segmentBytes) + return some( + ReassemblyResult( + payload: segment.payload, entireMessageHash: segment.entireMessageHash + ) + ) + +proc cleanupSegments*(self: SegmentationHandler) = + ## Drop expired partial-reassembly state. + discard diff --git a/channels/segmentation/segmentation_persistence.nim b/channels/segmentation/segmentation_persistence.nim new file mode 100644 index 000000000..cc34c36d2 --- /dev/null +++ b/channels/segmentation/segmentation_persistence.nim @@ -0,0 +1,20 @@ +## Persistence backend interface for segmentation reassembly state. +## +## Allows partial reassembly state to survive process restarts. + +type + SegmentationPersistenceKind* {.pure.} = enum + InMemory + Sqlite + + SegmentationPersistence* = ref object of RootObj + kind*: SegmentationPersistenceKind + +method put*(self: SegmentationPersistence, key: seq[byte], value: seq[byte]) {.base.} = + discard + +method get*(self: SegmentationPersistence, key: seq[byte]): seq[byte] {.base.} = + discard + +method delete*(self: SegmentationPersistence, key: seq[byte]) {.base.} = + discard diff --git a/channels/types.nim b/channels/types.nim new file mode 100644 index 000000000..4070ed620 --- /dev/null +++ b/channels/types.nim @@ -0,0 +1,15 @@ +## Core identifier types for the Reliable Channel API. + +import std/hashes +import waku/api/types as api_types + +import ./scalable_data_sync/scalable_data_sync + +export scalable_data_sync +export api_types + +type ChannelId* = SdsChannelID + +proc hash*(r: RequestId): Hash = + ## Allows `RequestId` to be used as a `Table` key. + hash(string(r)) diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index e64922f4c..963a948a3 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -88,3 +88,6 @@ import ./tools/test_all # Persistency library tests import ./persistency/test_all + +# Reliable Channel API tests +import ./channels/test_all diff --git a/tests/channels/test_all.nim b/tests/channels/test_all.nim new file mode 100644 index 000000000..04b448707 --- /dev/null +++ b/tests/channels/test_all.nim @@ -0,0 +1,3 @@ +{.used.} + +import ./test_reliable_channel_send_receive diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim new file mode 100644 index 000000000..052cd35c9 --- /dev/null +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -0,0 +1,149 @@ +{.used.} + +import std/[net] +import chronos, testutils/unittests, stew/byteutils +import brokers/broker_context + +import ../testlib/[common, wakucore, wakunode, testasync] + +import waku +import waku/[waku_node, waku_core] +import waku/factory/waku_conf +import waku/events/message_events as waku_message_events +import tools/confutils/cli_args + +import channels/reliable_channel_manager +import channels/encryption/noop_encryption + +const TestTimeout = chronos.seconds(15) + +proc createApiNodeConf(): WakuNodeConf = + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = cli_args.WakuMode.Core + conf.listenAddress = parseIpAddress("0.0.0.0") + conf.tcpPort = Port(0) + conf.discv5UdpPort = Port(0) + conf.clusterId = 3'u16 + conf.numShardsInNetwork = 1 + conf.reliabilityEnabled = true + conf.rest = false + return conf + +suite "Reliable Channel - ingress": + asyncTest "manager dispatches marked WakuMessage to the right channel": + ## Unit test for the receive side of the API: instead of standing + ## up two libp2p nodes and a relay mesh, we drive the manager + ## directly by emitting a `MessageReceivedEvent` (the exact event + ## the DeliveryService emits when a `WakuMessage` arrives off the + ## wire). The manager must: + ## - drop traffic missing the Reliable Channel spec marker + ## - dispatch the matching channel's `onMessageReceived` + ## - emit `ChannelMessageReceivedEvent` with the payload + const + channelId = ChannelId("test-channel") + contentTopic = ContentTopic("/reliable-channel/test/proto") + let appPayload = "hello reliable channel".toBytes() + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + ## Noop encryption providers so the Encrypt/Decrypt brokers have + ## something to dispatch to; without this the channel falls back to + ## plaintext anyway, but installing them is the documented setup. + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + let received = newFuture[seq[byte]]("channel-message-received") + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if not received.finished() and evt.channelId == channelId: + received.complete(evt.payload) + , + ) + .expect("listen ChannelMessageReceivedEvent") + + ## Build a `WakuMessage` that looks like one that came in off the + ## wire from a peer: the spec marker on `meta` plus the right content + ## topic. The manager's ingress listener should pick it up, + ## decrypt (noop), unwrap SDS (pass-through), reassemble (one + ## segment), and finally emit `ChannelMessageReceivedEvent`. + let inboundMsg = WakuMessage( + payload: appPayload, + contentTopic: contentTopic, + version: 0, + meta: LipWireReliableChannelVersion.toBytes(), + ) + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + ) + + let arrived = await received.withTimeout(TestTimeout) + check arrived + if arrived: + check received.read() == appPayload + + await manager.stop() + + asyncTest "manager drops unmarked WakuMessage": + ## Mirror of the above: same content topic, but `meta` is empty + ## (i.e. foreign traffic). The channel-level event must NOT fire. + const + channelId = ChannelId("test-channel-2") + contentTopic = ContentTopic("/reliable-channel/test/proto") + let appPayload = "foreign payload".toBytes() + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + var fired = false + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + fired = true + , + ) + .expect("listen ChannelMessageReceivedEvent") + + let inboundMsg = WakuMessage( + payload: appPayload, + contentTopic: contentTopic, + version: 0, + meta: @[], ## no Reliable Channel spec marker + ) + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + ) + + ## Give the event broker a chance to fan out. + await sleepAsync(100.milliseconds) + check not fired + + await manager.stop() From 2447ce9e739f51e62b3cfb7aea02787eff9419ee Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 29 May 2026 08:11:41 +0200 Subject: [PATCH 5/8] disable js-waku from ci (#3917) --- .github/workflows/ci.yml | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index c54d828ae..f924d0f8b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -177,20 +177,6 @@ jobs: secrets: inherit - js-waku-node: - needs: build-docker-image - uses: logos-messaging/logos-delivery-js/.github/workflows/test-node.yml@master - with: - nim_wakunode_image: ${{ needs.build-docker-image.outputs.image }} - test_type: node - - js-waku-node-optional: - needs: build-docker-image - uses: logos-messaging/logos-delivery-js/.github/workflows/test-node.yml@master - with: - nim_wakunode_image: ${{ needs.build-docker-image.outputs.image }} - test_type: node-optional - lint: name: "Lint" runs-on: ubuntu-22.04 From bb23ee64af84afab929d7b5dc9a16135a08a075f Mon Sep 17 00:00:00 2001 From: Darshan <35736874+darshankabariya@users.noreply.github.com> Date: Fri, 29 May 2026 23:53:38 +0530 Subject: [PATCH 6/8] feat: fetch prebuilt zerokit rln, fall back to source build (#3915) --- .github/workflows/ci.yml | 3 ++ flake.lock | 44 +------------------ flake.nix | 94 ++++++++++++++++++++++++++++++---------- scripts/build_rln.sh | 34 ++++++++++++--- 4 files changed, 104 insertions(+), 71 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f924d0f8b..84a7f0b8d 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -35,6 +35,9 @@ jobs: - 'nimble.lock' - 'waku.nimble' - 'Makefile' + - 'scripts/**' + - 'flake.nix' + - 'flake.lock' - 'library/**' - 'liblogosdelivery/**' v2: diff --git a/flake.lock b/flake.lock index 8d0db9269..411bf2430 100644 --- a/flake.lock +++ b/flake.lock @@ -19,8 +19,7 @@ "root": { "inputs": { "nixpkgs": "nixpkgs", - "rust-overlay": "rust-overlay", - "zerokit": "zerokit" + "rust-overlay": "rust-overlay" } }, "rust-overlay": { @@ -42,47 +41,6 @@ "repo": "rust-overlay", "type": "github" } - }, - "rust-overlay_2": { - "inputs": { - "nixpkgs": [ - "zerokit", - "nixpkgs" - ] - }, - "locked": { - "lastModified": 1771211437, - "narHash": "sha256-lcNK438i4DGtyA+bPXXyVLHVmJjYpVKmpux9WASa3ro=", - "owner": "oxalica", - "repo": "rust-overlay", - "rev": "c62195b3d6e1bb11e0c2fb2a494117d3b55d410f", - "type": "github" - }, - "original": { - "owner": "oxalica", - "repo": "rust-overlay", - "type": "github" - } - }, - "zerokit": { - "inputs": { - "nixpkgs": [ - "nixpkgs" - ], - "rust-overlay": "rust-overlay_2" - }, - "locked": { - "owner": "vacp2p", - "repo": "zerokit", - "rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63", - "type": "github" - }, - "original": { - "owner": "vacp2p", - "repo": "zerokit", - "rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63", - "type": "github" - } } }, "root": "root", diff --git a/flake.nix b/flake.nix index 6c283780d..b32a53455 100644 --- a/flake.nix +++ b/flake.nix @@ -17,19 +17,9 @@ url = "github:oxalica/rust-overlay"; inputs.nixpkgs.follows = "nixpkgs"; }; - - # External flake input: Zerokit pinned to a specific commit. - # Update the rev here when a new zerokit version is needed. - zerokit = { - # Pinned to v2.0.2 (5e64cb8822bee65eed6cf459f95ae72b80c6ba63) to match - # the vendor/zerokit submodule. Keep these two in sync: the nix build - # links librln from this input, the Makefile build from the submodule. - url = "github:vacp2p/zerokit/5e64cb8822bee65eed6cf459f95ae72b80c6ba63"; - inputs.nixpkgs.follows = "nixpkgs"; - }; }; - outputs = { self, nixpkgs, rust-overlay, zerokit }: + outputs = { self, nixpkgs, rust-overlay }: let systems = [ "x86_64-linux" "aarch64-linux" @@ -69,19 +59,78 @@ inherit system; overlays = [ (import rust-overlay) nimbleOverlay ]; }; + + # Prebuilt zerokit librln, fetched from the upstream GitHub release + # rather than compiled from source. Compiling zerokit makes Nix download + # its many crate dependencies from crates.io in one parallel burst, which + # crates.io intermittently rejects with HTTP 403 (rate limiting from the + # self-hosted runners' shared IP), breaking the nix build. The release + # ships the exact `stateless` library this project links (see + # scripts/build_rln.sh), so we use it directly — no Rust toolchain and + # no crates.io access needed. + # + # Keep `rlnVersion` aligned with `LIBRLN_VERSION` in the Makefile and the + # vendor/zerokit submodule. Each hash is the sha256 of the release tarball + # for that platform; refresh all four when bumping the version. + rlnVersion = "v2.0.2"; + rlnAssets = { + "x86_64-linux" = { triple = "x86_64-unknown-linux-gnu"; hash = "sha256-qbrUdaetYKFhjzxUP/QcwD3JHWJ8qk/tCMK3yXceIAk="; }; + "aarch64-linux" = { triple = "aarch64-unknown-linux-gnu"; hash = "sha256-s4bWrmCcNTWHNyJwV73ilWNp58ZdAVG+TAgtWN1cTQs="; }; + "x86_64-darwin" = { triple = "x86_64-apple-darwin"; hash = "sha256-ZaHP5CApN66FYY7jxwOmGcF9kJR78Fng3k1qE2W08Mk="; }; + "aarch64-darwin" = { triple = "aarch64-apple-darwin"; hash = "sha256-f2YppkPsKFdN00j+IY8fpvsebWTIb9lW/V1/vOTiVKU="; }; + }; + + mkZerokitRln = system: pkgs: + let + asset = rlnAssets.${system} or + (throw "zerokit ${rlnVersion} has no prebuilt rln asset for system '${system}'"); + in pkgs.stdenv.mkDerivation { + pname = "librln"; + version = lib.removePrefix "v" rlnVersion; + + src = pkgs.fetchurl { + url = "https://github.com/vacp2p/zerokit/releases/download/" + + "${rlnVersion}/${asset.triple}-stateless-rln.tar.gz"; + hash = asset.hash; + }; + + # The tarball lays its files out under release/. + sourceRoot = "release"; + dontConfigure = true; + dontBuild = true; + + # The release .so was linked outside Nix, so it references system + # libraries (libgcc_s, libstdc++, glibc) by bare name. autoPatchelfHook + # points those at the Nix versions so the library loads correctly when + # used by the Nix build. It does nothing for the static .a, and the + # step is skipped on macOS (dylib paths are fixed in nix/default.nix). + nativeBuildInputs = + pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.autoPatchelfHook ]; + buildInputs = + pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.stdenv.cc.cc.lib ]; + + installPhase = '' + runHook preInstall + mkdir -p $out/lib + cp librln.a $out/lib/ 2>/dev/null || true + cp librln.so $out/lib/ 2>/dev/null || true + cp librln.dylib $out/lib/ 2>/dev/null || true + runHook postInstall + ''; + + meta = with pkgs.lib; { + description = "Prebuilt zerokit RLN library (stateless flavor)"; + homepage = "https://github.com/vacp2p/zerokit"; + license = with licenses; [ mit asl20 ]; + platforms = builtins.attrNames rlnAssets; + }; + }; in { packages = forAllSystems (system: let pkgs = pkgsFor system; - # HACK: Fix for stale cargoHash in 2.0.2 release. - zerokitRln = zerokit.packages.${system}.rln.overrideAttrs (old: { - cargoDeps = old.cargoDeps.overrideAttrs (oldCargoDeps: { - vendorStaging = oldCargoDeps.vendorStaging.overrideAttrs (_: { - outputHash = "sha256-PNwEdZLgGQPqQDrEK2hsQtSybVfBbD6xn4K47fPFJUU="; - }); - }); - }); + zerokitRln = mkZerokitRln system pkgs; liblogosdelivery = pkgs.callPackage ./nix/default.nix { inherit pkgs; @@ -94,14 +143,13 @@ inherit pkgs; src = ./.; targets = ["wakucanary"]; - zerokitRln = zerokit.packages.${system}.rln; + inherit zerokitRln; }; in { inherit liblogosdelivery wakucanary; - # Expose the cargoHash-corrected librln so downstream consumers + # Expose the prebuilt librln so downstream consumers # (e.g. logos-delivery-module) bundle the exact same librln this - # build links, instead of pulling zerokit's rln directly — whose - # committed cargoHash is stale for v2.0.2 (see zerokitRln above). + # build links against. rln = zerokitRln; default = liblogosdelivery; } diff --git a/scripts/build_rln.sh b/scripts/build_rln.sh index 35b5b8953..b028885e2 100755 --- a/scripts/build_rln.sh +++ b/scripts/build_rln.sh @@ -1,8 +1,15 @@ #!/usr/bin/env bash -# This script is used to build the rln library for the current platform. -# Previously downloaded prebuilt binaries, but due to compatibility issues -# we now always build from source. +# Provides the rln static library for the current platform. +# +# If zerokit publishes a prebuilt `stateless` release asset for this platform, +# download and use it: that is faster than compiling and avoids fetching +# zerokit's many crate dependencies from crates.io. The asset is selected by +# the Rust host target triple (the platform identifier reported by rustc, +# e.g. x86_64-unknown-linux-gnu or aarch64-apple-darwin). +# +# When no matching asset exists (e.g. Windows), build from the vendored +# zerokit submodule instead. set -e @@ -15,8 +22,26 @@ output_filename=$3 [[ -z "${rln_version}" ]] && { echo "No rln version specified"; exit 1; } [[ -z "${output_filename}" ]] && { echo "No output filename specified"; exit 1; } -echo "Building RLN library from source (version ${rln_version})..." +# --- Prefer the prebuilt release asset -------------------------------------- +# Host target triple, e.g. x86_64-unknown-linux-gnu / aarch64-apple-darwin. +host_triplet=$(rustc --version --verbose | awk '/host:/{print $2}') +tarball="${host_triplet}-stateless-rln.tar.gz" +url="https://github.com/vacp2p/zerokit/releases/download/${rln_version}/${tarball}" +echo "Looking for prebuilt RLN: ${url}" +if curl --silent --fail-with-body -L "${url}" -o "${tarball}"; then + echo "Downloaded prebuilt ${tarball}" + tar -xzf "${tarball}" + mv "release/librln.a" "${output_filename}" + rm -rf "${tarball}" release + echo "Using prebuilt ${output_filename}" + exit 0 +fi +# curl --fail-with-body writes the error body to the file on HTTP failure. +rm -f "${tarball}" +echo "No prebuilt asset for ${host_triplet} at ${rln_version}; building from source." + +# --- Fall back to building from the vendored submodule ---------------------- # Check if submodule version = version in Makefile cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" @@ -33,7 +58,6 @@ if [[ "v${submodule_version}" != "${rln_version}" ]]; then exit 1 fi -# Build rln from source. # `stateless` feature: logos-delivery does not maintain a local Merkle tree # (post-PR #3312); the contract is the source of truth and the path is fetched # via getMerkleProof(index). The stateless build compiles out tree code. From c5b24e21da73ec6f9464868e924f6c0bd763791b Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 29 May 2026 22:24:46 +0200 Subject: [PATCH 7/8] better pending segments management (#3914) Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> --- .github/workflows/version-check.yml | 7 +- channels/events.nim | 16 + .../rate_limit_manager/rate_limit_manager.nim | 2 +- channels/reliable_channel.nim | 315 ++++++++++++++---- channels/reliable_channel_manager.nim | 29 +- .../test_reliable_channel_send_receive.nim | 168 ++++++++++ waku/api/types.nim | 15 +- .../send_service/send_service.nim | 2 +- 8 files changed, 472 insertions(+), 82 deletions(-) diff --git a/.github/workflows/version-check.yml b/.github/workflows/version-check.yml index e3ad958ba..ee01a9f1a 100644 --- a/.github/workflows/version-check.yml +++ b/.github/workflows/version-check.yml @@ -28,8 +28,11 @@ jobs: set -euo pipefail NIMBLE_VERSION=$(grep -m1 '^version = ' waku.nimble | sed -E 's/version = "([^"]+)"/\1/') # Nearest tag reachable from HEAD; --abbrev=0 drops the --g - # suffix so we get the bare tag (e.g. v0.38.0). - BASE_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "") + # suffix so we get the bare tag (e.g. v0.38.0). `--match 'v*'` skips + # the moving `nightly` tag (auto-updated by the daily CI to point at + # master HEAD), which would otherwise be picked as the nearest tag + # and break the version-sort comparison below. + BASE_TAG=$(git describe --tags --abbrev=0 --match 'v*' 2>/dev/null || echo "") BASE_TAG=${BASE_TAG#v} # Compare on the base version, ignoring any -rc.N prerelease suffix. BASE_TAG=${BASE_TAG%%-*} diff --git a/channels/events.nim b/channels/events.nim index 5a17c99d2..904a34dc6 100644 --- a/channels/events.nim +++ b/channels/events.nim @@ -21,3 +21,19 @@ EventBroker: channelId*: ChannelId senderId*: SdsParticipantID payload*: seq[byte] + +EventBroker: + ## Emitted when every segment of a channel-level `send()` reached + ## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the + ## `requestId` is the channel-layer parent returned by `send()`. + type ChannelMessageSentEvent* = object + channelId*: ChannelId + requestId*: RequestId + +EventBroker: + ## Emitted when a channel-level `send()` finalises with at least one + ## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`. + type ChannelMessageErrorEvent* = object + channelId*: ChannelId + requestId*: RequestId + error*: string diff --git a/channels/rate_limit_manager/rate_limit_manager.nim b/channels/rate_limit_manager/rate_limit_manager.nim index 5ea6486a5..ab5a9f67b 100644 --- a/channels/rate_limit_manager/rate_limit_manager.nim +++ b/channels/rate_limit_manager/rate_limit_manager.nim @@ -29,7 +29,7 @@ EventBroker: ## ## `channelId` lets listeners filter to their own channel, since all ## reliable channels share the underlying Waku node's broker context. - type ReadyToSendEvent* = object + type ReadyToSendEvent* = ref object channelId*: SdsChannelID msgs*: seq[seq[byte]] diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim index 2a7d01d35..c3fbe5d77 100644 --- a/channels/reliable_channel.nim +++ b/channels/reliable_channel.nim @@ -13,14 +13,15 @@ ## ## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html -import std/[options, tables] +import std/[options, sets, tables] import results import chronos import bearssl/rand import stew/byteutils import libp2p/crypto/crypto as libp2p_crypto -import waku/node/delivery_service/delivery_service +import waku/api/api +import waku/factory/waku as waku_factory import waku/node/delivery_service/send_service import waku/waku_core/topics @@ -31,8 +32,8 @@ import ./rate_limit_manager/rate_limit_manager import ./encryption/encryption export - delivery_service, send_service, events, segmentation, scalable_data_sync, - rate_limit_manager, encryption + api, waku_factory, events, segmentation, scalable_data_sync, rate_limit_manager, + encryption const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## Wire-format spec marker for the Reliable Channel layer, as defined @@ -42,27 +43,64 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## The trailing `/N` is the wire-format version and is bumped only ## on breaking on-the-wire changes; implementations pin one version. -type ReliableChannel* = ref object - ## Spec-defined public type. Fields are private so callers cannot - ## mutate internals and break invariants. Getters are added below - ## for the few values consumers may need. - deliveryService: DeliveryService - channelId: ChannelId - contentTopic: ContentTopic - senderId: SdsParticipantID - rng: ref HmacDrbgContext - segmentation: SegmentationHandler - sdsHandler: SdsHandler - rateLimit: RateLimitManager +type + SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. + async: (raises: [CatchableError]), gcsafe + .} + ## Egress dispatch boundary. Defaults to `waku.send`; tests inject a + ## fake that records calls and returns canned `RequestId`s so the + ## send state machine can be exercised end-to-end without a network. - requestIds: Table[RequestId, seq[RequestId]] - pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]] - brokerCtx: BrokerContext - ## Captured here so the channel emits `ChannelMessageReceivedEvent` - ## on the same broker context the owning manager registered its - ## listeners on. Without this, an emit via `globalBrokerContext()` - ## would land on whatever context happens to be thread-local at - ## emit time, which is not necessarily the manager's. + MessagePersistence {.pure.} = enum + Persistent + Ephemeral + + SegmentSendState {.pure.} = enum + ## Lifecycle of a single segment as tracked by the channel. The + ## messaging layer has its own richer `DeliveryState` (retries, + ## propagated-vs-validated); here we only model what's needed to + ## decide when a `channelReqId` is fully accounted for. + AwaitingRateLimit ## Pushed by `send`; not yet released by rate_limit_manager. + InFlight + ## Released by rate_limit_manager and handed to delivery_service; + ## `messagingReqId` is now set. + Confirmed ## `MessageSentEvent` arrived for `messagingReqId`. + Failed + ## `MessageErrorEvent` arrived for `messagingReqId`, or the local + ## delivery-task construction failed before any id was reachable. + + PendingMessagingRequest = object + ## One entry per segment (i.e. per messaging-layer request). The + ## relative order of `AwaitingRateLimit` entries must match the + ## order in which `rate_limit_manager` re-emits messages, which is + ## FIFO with `send()`. + channelReqId*: RequestId + ## The channel-layer parent id returned to the caller of `send()` in channel layer. + ## One channel request maps to N pending messaging requests. + messagingReqId*: Option[RequestId] + ## Per-segment messaging layer id. `none` until `onReadyToSend` assigns it. + persistenceReqType: MessagePersistence + segmentSendState*: SegmentSendState + + ReliableChannel* = ref object + ## Spec-defined public type. Fields are private so callers cannot + ## mutate internals and break invariants. Getters are added below + ## for the few values consumers may need. + sendHandler: SendHandler + channelId: ChannelId + contentTopic: ContentTopic + senderId: SdsParticipantID + rng: ref HmacDrbgContext + segmentation: SegmentationHandler + sdsHandler: SdsHandler + rateLimit: RateLimitManager + + requestIds: Table[RequestId, seq[RequestId]] + pendingMessagingRequests: seq[PendingMessagingRequest] + ## Entries are kept until the matching segment reaches a final + ## state (`Confirmed` or `Failed`); a whole channel request is + ## then pruned in one pass once all its segments are final. + brokerCtx: BrokerContext func getChannelId*(self: ReliableChannel): ChannelId {.inline.} = self.channelId @@ -73,19 +111,103 @@ func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} = func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = self.senderId +func isFinal(state: SegmentSendState): bool {.inline.} = + return state in {SegmentSendState.Confirmed, SegmentSendState.Failed} + +proc pruneCompletedChannelReqs(self: ReliableChannel) = + ## Drop every `pendingMessagingRequests` entry whose `channelReqId` + ## has all of its segments in a final state. A single failing + ## segment doesn't trigger a drop on its own — we wait until siblings + ## are also accounted for, so the channel-level outcome is decided + ## from a complete picture. For each fully-final `channelReqId`, emit + ## the channel-level final event before the entries are dropped: + ## `ChannelMessageSentEvent` if every sibling Confirmed, + ## `ChannelMessageErrorEvent` if any sibling Failed. + var hasPending = initHashSet[RequestId]() + var anyFailed = initHashSet[RequestId]() + for entry in self.pendingMessagingRequests: + if not entry.segmentSendState.isFinal(): + hasPending.incl(entry.channelReqId) + elif entry.segmentSendState == SegmentSendState.Failed: + anyFailed.incl(entry.channelReqId) + + var emitted = initHashSet[RequestId]() + for entry in self.pendingMessagingRequests: + if entry.channelReqId in hasPending or entry.channelReqId in emitted: + continue + emitted.incl(entry.channelReqId) + if entry.channelReqId in anyFailed: + ChannelMessageErrorEvent.emit( + self.brokerCtx, + ChannelMessageErrorEvent( + channelId: self.channelId, + requestId: entry.channelReqId, + error: "one or more segments failed", + ), + ) + else: + ChannelMessageSentEvent.emit( + self.brokerCtx, + ChannelMessageSentEvent( + channelId: self.channelId, requestId: entry.channelReqId + ), + ) + + self.pendingMessagingRequests.keepItIf(it.channelReqId in hasPending) + +proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) = + ## Invoked from this channel's `MessageSentEvent` listener. Flips + ## the matching `InFlight` segment to `Confirmed` and prunes. The + ## listener routes every event through here; entries that don't + ## belong to this channel simply don't match and are no-ops. + self.pendingMessagingRequests.applyItIf( + it.segmentSendState == SegmentSendState.InFlight and + it.messagingReqId == some(messagingReqId) + ): + it.segmentSendState = SegmentSendState.Confirmed + self.pruneCompletedChannelReqs() + +proc onMessageError(self: ReliableChannel, messagingReqId: RequestId) = + ## Symmetric to `onMessageSent` but for `MessageErrorEvent`. + self.pendingMessagingRequests.applyItIf( + it.segmentSendState == SegmentSendState.InFlight and + it.messagingReqId == some(messagingReqId) + ): + it.segmentSendState = SegmentSendState.Failed + self.pruneCompletedChannelReqs() + proc onReadyToSend( - self: ReliableChannel, msgs: seq[seq[byte]] + self: ReliableChannel, readyToSendEvent: ReadyToSendEvent ) {.async: (raises: []).} = ## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent` ## listener once `rate_limit_manager` releases a batch of opaque ## blobs (already-encoded SDS messages): ## ## ... -> rate_limit_manager -> [encryption] -> dispatch - for m in msgs: - ## Each `m` was preceded by exactly one push onto `pendingRequests` - ## in `send`, so this pop is always safe in the current skeleton. - let pending = self.pendingRequests[0] - self.pendingRequests.delete(0) + var idx = 0 + for m in readyToSendEvent.msgs: + ## The first `AwaitingRateLimit` entry in push order is the one + ## this `m` belongs to: `send()` adds one entry per segment, and + ## `rate_limit_manager` re-emits them in the same FIFO order, so + ## the two sequences advance in lockstep. Earlier entries may + ## already be `InFlight` / `Confirmed` / `Failed` because they + ## live on until every sibling of their `channelReqId` is final, + ## so we walk past those to find the next one that was awaiting for this batch. + while idx < self.pendingMessagingRequests.len and + self.pendingMessagingRequests[idx].segmentSendState != + SegmentSendState.AwaitingRateLimit + : + idx.inc() + if idx >= self.pendingMessagingRequests.len: + ## rate_limit_manager emitted more messages than we have pending — + ## should not happen given `send` pushes one entry per enqueued + ## SDS payload. Drop silently rather than corrupt state. + break + + let channelReqId = self.pendingMessagingRequests[idx].channelReqId + let isEphemeral = + self.pendingMessagingRequests[idx].persistenceReqType == + MessagePersistence.Ephemeral ## TODO: revisit which fields of the SDS message must be encrypted. ## Encrypting the whole encoded blob forces every receiver to attempt @@ -97,32 +219,58 @@ proc onReadyToSend( MessageErrorEvent.emit( self.brokerCtx, MessageErrorEvent( - requestId: pending.parent, - messageHash: "", - error: "encryption failed: " & error, + requestId: channelReqId, messageHash: "", error: "encryption failed: " & error ), ) + ## Encryption failed *before* we could hand the segment to the + ## delivery layer — no `messagingReqId` was minted and no + ## `DeliveryTask` was queued on `sendService`. The delivery + ## layer will therefore never emit a `MessageSentEvent` / + ## `MessageErrorEvent` for this segment, so `onMessageError` + ## won't fire either. Advance the state machine inline so the + ## parent `channelReqId` can still be pruned once its siblings + ## are also final. + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed + idx.inc() continue let wireBytes = seq[byte](encrypted) + ## The `meta` field carries the Reliable Channel wire-format spec + ## marker so the ingress side of any peer can route this WakuMessage + ## to its Reliable Channel layer. let envelope = MessageEnvelope( - contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral + contentTopic: self.contentTopic, + payload: wireBytes, + ephemeral: isEphemeral, + meta: LipWireReliableChannelVersion.toBytes(), ) - let deliveryReqId = RequestId.new(self.rng) - let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr: - ## TODO: emit waku `MessageErrorEvent` for the parent request id. + ## `waku.send` is not annotated `(raises: [])`, but this listener is. + ## Convert any raise to a Result error so the state machine handles + ## both failure modes (Result.err and exception) through one path. + let sendRes = + try: + await self.sendHandler(envelope) + except CatchableError as e: + Result[RequestId, string].err("waku send raised: " & e.msg) + + let messagingReqId = sendRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: channelReqId, messageHash: "", error: "waku send failed: " & error + ), + ) + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed + idx.inc() continue - ## Stamp the Reliable Channel wire-format spec marker so the ingress - ## side of any peer can route this WakuMessage to its Reliable - ## Channel layer. Done on the constructed WakuMessage rather than - ## via the envelope because `MessageEnvelope` does not expose a - ## `meta` field. - deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes() + self.pendingMessagingRequests[idx].messagingReqId = some(messagingReqId) + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.InFlight + self.requestIds.mgetOrPut(channelReqId, @[]).add(messagingReqId) + idx.inc() - asyncSpawn self.deliveryService.sendService.send(deliveryTask) - self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId) + self.pruneCompletedChannelReqs() proc send*( self: ReliableChannel, payload: seq[byte], ephemeral: bool = false @@ -135,18 +283,22 @@ proc send*( ## ## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with ## the SDS messages cleared for transmission; the channel's listener - ## then runs the final stage (encryption -> dispatch). The `ephemeral` - ## flag is carried alongside each segment in `pendingRequests` and - ## stamped onto the eventual `MessageEnvelope`. + ## then runs the final stage (encryption -> dispatch). The + ## `persistenceReqType` is carried alongside each segment in + ## `pendingMessagingRequests` and stamped onto the eventual + ## `MessageEnvelope`. ## - ## The returned `RequestId` is the parent of one-or-more - ## delivery-service `RequestId`s; the mapping is recorded in + ## The returned `RequestId` is the channel-level parent of one-or-more + ## messaging-layer `RequestId`s; the mapping is recorded in ## `self.requestIds`. if payload.len == 0: return err("empty payload") - let parentReqId = RequestId.new(self.rng) - self.requestIds[parentReqId] = @[] + let channelReqId = RequestId.new(self.rng) + self.requestIds[channelReqId] = @[] + + let persistenceReqType = + if ephemeral: MessagePersistence.Ephemeral else: MessagePersistence.Persistent for segmentBytes in self.segmentation.performSegmentation(payload): ## Segments arrive already encoded; the segmentation module owns @@ -155,10 +307,17 @@ proc send*( self.channelId, self.senderId, segmentBytes ).valueOr: return err("SDS wrap failed: " & error) - self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral)) + self.pendingMessagingRequests.add( + PendingMessagingRequest( + channelReqId: channelReqId, + messagingReqId: none(RequestId), + persistenceReqType: persistenceReqType, + segmentSendState: SegmentSendState.AwaitingRateLimit, + ) + ) self.rateLimit.enqueueToSend(sdsBytes) - return ok(parentReqId) + return ok(channelReqId) proc onMessageReceived( self: ReliableChannel, messageHash: string, payload: seq[byte] @@ -206,7 +365,7 @@ proc onMessageReceived( proc new*( T: type ReliableChannel, - deliveryService: DeliveryService, + waku: Waku, channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, @@ -214,6 +373,7 @@ proc new*( sdsConfig: SdsConfig, rateConfig: RateLimitConfig, brokerCtx: BrokerContext = globalBrokerContext(), + sendHandler: SendHandler = nil, ): T = ## Pipeline handlers (segmentation/SDS/rate-limit) are constructed ## inside the channel rather than handed in by the caller — they are @@ -221,8 +381,20 @@ proc new*( ## should be wiring up. Encryption is delegated to the `Encrypt`/ ## `Decrypt` request brokers, so the channel keeps no per-instance ## encryption state either. + ## + ## `sendHandler` defaults to `waku.send`; tests pass a fake to drive + ## the send state machine without touching the network. + let resolvedSendHandler = + if sendHandler.isNil(): + proc( + envelope: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + return await waku.send(envelope) + else: + sendHandler + let chn = T( - deliveryService: deliveryService, + sendHandler: resolvedSendHandler, channelId: channelId, contentTopic: contentTopic, senderId: senderId, @@ -231,20 +403,21 @@ proc new*( sdsHandler: SdsHandler.new(sdsConfig, senderId), rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx), requestIds: initTable[RequestId, seq[RequestId]](), - pendingRequests: @[], + pendingMessagingRequests: @[], brokerCtx: brokerCtx, ) - ## Each channel owns its own egress + ingress listeners on - ## `chn.brokerCtx`, filtered to traffic addressed to this channel. - ## Keeping the listeners (and the procs they call) inside the - ## channel lets `onReadyToSend` and `onMessageReceived` stay private - ## — the manager doesn't need to know about them. + ## Each channel owns its own egress + ingress + send-completion + ## listeners on `chn.brokerCtx`, filtered to traffic addressed to + ## this channel. Keeping the listeners (and the handler procs they + ## call) inside the channel lets `onReadyToSend` / + ## `onMessageReceived` / `onMessageSent` / `onMessageError` stay + ## private — the manager doesn't need to know about them. discard ReadyToSendEvent.listen( chn.brokerCtx, proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} = if evt.channelId == chn.channelId: - await chn.onReadyToSend(evt.msgs) + await chn.onReadyToSend(evt) , ) @@ -261,4 +434,20 @@ proc new*( , ) + ## Send-completion events are tagged with the per-segment messaging + ## `requestId` — globally unique, so we don't need any channel filter + ## up front. The handler scans this channel's pending entries for a + ## match and is a no-op when the id belongs to a different channel. + discard MessageSentEvent.listen( + chn.brokerCtx, + proc(evt: MessageSentEvent): Future[void] {.async: (raises: []).} = + chn.onMessageSent(evt.requestId), + ) + + discard MessageErrorEvent.listen( + chn.brokerCtx, + proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} = + chn.onMessageError(evt.requestId), + ) + return chn diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim index ddbdb37a6..747f755b4 100644 --- a/channels/reliable_channel_manager.nim +++ b/channels/reliable_channel_manager.nim @@ -14,6 +14,7 @@ import waku/api/api import waku/api/api_conf import waku/events/message_events as waku_message_events import waku/factory/waku as waku_factory +import waku/node/delivery_service/delivery_service import waku/waku_core/topics import ./reliable_channel @@ -23,11 +24,10 @@ export reliable_channel type ReliableChannelManager* = ref object channels: Table[ChannelId, ReliableChannel] - deliveryService: DeliveryService - ## Owned by the manager. The ownership chain is - ## ReliableChannelManager -> DeliveryService -> Waku -> WakuNode. - ## Hidden so callers can't substitute their own and bypass the - ## manager's pipeline. + waku: Waku + ## Owned by the manager. The channel layer reaches the messaging + ## API through `waku.send(envelope)`; constructing DeliveryTasks + ## directly would breach the layer boundary. brokerCtx: BrokerContext proc new*( @@ -38,15 +38,13 @@ proc new*( ## TODO !! The proper ownership chain is: ## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode, ## and this will be implemented in the future. For now, `createNode` - ## is called here to get a DeliveryService instance, and the WakuNode is immediately discarded. + ## is called here to get a Waku instance, and the WakuNode is immediately discarded. ## This is a temporary workaround to get the API let waku = ?(await createNode(conf)) let manager = T( - channels: initTable[ChannelId, ReliableChannel](), - deliveryService: waku.deliveryService, - brokerCtx: brokerCtx, + channels: initTable[ChannelId, ReliableChannel](), waku: waku, brokerCtx: brokerCtx ) return ok(manager) @@ -55,17 +53,18 @@ proc start*(self: ReliableChannelManager): Result[void, string] = ## Bring the owned DeliveryService up. Separated from `new` so callers ## can register encryption providers / create channels before traffic ## starts flowing. - self.deliveryService.startDeliveryService() + self.waku.deliveryService.startDeliveryService() proc stop*(self: ReliableChannelManager) {.async.} = - if not self.deliveryService.isNil(): - await self.deliveryService.stopDeliveryService() + if not self.waku.isNil(): + await self.waku.deliveryService.stopDeliveryService() proc createReliableChannel*( self: ReliableChannelManager, channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, + sendHandler: SendHandler = nil, ): Result[ChannelId, string] = ## Spec entry point. The `DeliveryService` and `rng` the channel needs ## are sourced from the owning `ReliableChannelManager` rather than @@ -75,6 +74,9 @@ proc createReliableChannel*( ## ## Segmentation, SDS and rate-limit configs will eventually be read ## from the node's `NodeConfig`. Defaults for now. + ## + ## `sendHandler` is left `nil` in production so the channel uses the + ## owned `waku.send`; tests pass a fake to bypass the network. if self.channels.hasKey(channelId): return err("channel already exists: " & channelId) @@ -94,7 +96,7 @@ proc createReliableChannel*( ) let chn = ReliableChannel.new( - deliveryService = self.deliveryService, + waku = self.waku, channelId = channelId, contentTopic = contentTopic, senderId = senderId, @@ -102,6 +104,7 @@ proc createReliableChannel*( sdsConfig = sdsConfig, rateConfig = rateConfig, brokerCtx = self.brokerCtx, + sendHandler = sendHandler, ) self.channels[channelId] = chn diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index 052cd35c9..2f49182a2 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -147,3 +147,171 @@ suite "Reliable Channel - ingress": check not fired await manager.stop() + +suite "Reliable Channel - send state machine": + asyncTest "MessageSentEvent finalises the channelReqId as Sent": + ## Drives the real send pipeline (`send` -> segmentation -> SDS -> + ## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that + ## returns a canned `RequestId` instead of hitting the network. + ## Emitting the delivery-layer `MessageSentEvent` must drive the + ## channel-level state machine through `Confirmed` and produce a + ## `ChannelMessageSentEvent` (channel-level terminal event) for the + ## `channelReqId` returned by `send()`. + const + channelId = ChannelId("sm-success-channel") + contentTopic = ContentTopic("/reliable-channel/test/sm-success") + fakeMsgReqId = RequestId("fake-msg-req-1") + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + var sendCalls = 0 + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + sendCalls.inc + return ok(fakeMsgReqId) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + let sentFut = newFuture[RequestId]("channel-sent") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + + let channelReqId = manager.send(channelId, "hello".toBytes()).expect("send") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and sendCalls == 0: + await sleepAsync(5.milliseconds) + check sendCalls == 1 + + waku_message_events.MessageSentEvent.emit( + brokerCtx, + waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""), + ) + + let finalised = await sentFut.withTimeout(1.seconds) + check finalised + if finalised: + check sentFut.read() == channelReqId + + await manager.stop() + + asyncTest "two independent channelReqIds are finalised independently": + ## Two `send()` calls -> two independent `channelReqId`s, each with + ## one segment under the current segmentation skeleton + ## (`performSegmentation` always emits exactly one segment). The + ## fake `SendHandler` returns distinct `messagingReqId`s; finalising + ## the first emits `ChannelMessageSentEvent` for its `channelReqId`, + ## finalising the second as a failure emits `ChannelMessageErrorEvent` + ## for the other. + const + channelId = ChannelId("sm-multi-channel") + contentTopic = ContentTopic("/reliable-channel/test/sm-multi") + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + var msgReqIds: seq[RequestId] + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1)) + msgReqIds.add(id) + return ok(id) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + let sentFut = newFuture[RequestId]("channel-sent") + let erroredFut = newFuture[RequestId]("channel-errored") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + discard ChannelMessageErrorEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageErrorEvent) {.async: (raises: []).} = + if not erroredFut.finished() and evt.channelId == channelId: + erroredFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageErrorEvent") + + let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1") + let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and msgReqIds.len < 2: + await sleepAsync(5.milliseconds) + check msgReqIds.len == 2 + + waku_message_events.MessageSentEvent.emit( + brokerCtx, + waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""), + ) + let sentArrived = await sentFut.withTimeout(1.seconds) + check sentArrived + if sentArrived: + check sentFut.read() == channelReqId1 + ## The second `channelReqId` must NOT have finalised yet — its + ## segment is still `InFlight`. + check not erroredFut.finished() + + waku_message_events.MessageErrorEvent.emit( + brokerCtx, + waku_message_events.MessageErrorEvent( + requestId: msgReqIds[1], messageHash: "", error: "synthetic" + ), + ) + let erroredArrived = await erroredFut.withTimeout(1.seconds) + check erroredArrived + if erroredArrived: + check erroredFut.read() == channelReqId2 + + await manager.stop() + + asyncTest "TODO: channelReqId not pruned until ALL its segments are final": + ## Placeholder for the multi-sibling prune rule. Today's + ## `performSegmentation` (segmentation skeleton) always emits + ## exactly one segment per `send()`, so multiple siblings under one + ## `channelReqId` cannot be produced through the real pipeline. + ## Implement once segmentation does real chunking: send a payload + ## larger than `DefaultSegmentSizeBytes`, capture the N + ## `messagingReqId`s from a fake `SendHandler`, finalise some, and + ## assert prune only fires once every sibling is final. + skip() diff --git a/waku/api/types.nim b/waku/api/types.nim index 9eae503c8..2b7edd616 100644 --- a/waku/api/types.nim +++ b/waku/api/types.nim @@ -11,6 +11,10 @@ type contentTopic*: ContentTopic payload*: seq[byte] ephemeral*: bool + meta*: seq[byte] + ## Opaque wire-format marker carried on the underlying WakuMessage. + ## Higher layers (e.g. Reliable Channel) stamp this so peers can route + ## ingress traffic to their corresponding layer. Empty by default. RequestId* = distinct string @@ -34,12 +38,18 @@ proc init*( contentTopic: ContentTopic, payload: seq[byte] | string, ephemeral: bool = false, + meta: seq[byte] = @[], ): MessageEnvelope = when payload is seq[byte]: - MessageEnvelope(contentTopic: contentTopic, payload: payload, ephemeral: ephemeral) + MessageEnvelope( + contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta + ) else: MessageEnvelope( - contentTopic: contentTopic, payload: payload.toBytes(), ephemeral: ephemeral + contentTopic: contentTopic, + payload: payload.toBytes(), + ephemeral: ephemeral, + meta: meta, ) proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = @@ -48,6 +58,7 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = contentTopic: envelope.contentTopic, payload: envelope.payload, ephemeral: envelope.ephemeral, + meta: envelope.meta, timestamp: getNowInNanosecondTime(), ) diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index 902f3aa1c..88ec802cf 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -26,7 +26,7 @@ logScope: # This useful util is missing from sequtils, this extends applyIt with predicate... template applyItIf*(varSeq, pred, op: untyped) = for i in low(varSeq) .. high(varSeq): - let it {.inject.} = varSeq[i] + var it {.inject.} = varSeq[i] if pred: op varSeq[i] = it From 5bc1ad63a76ed2b41eeeced50d2ea778138d09e6 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Fri, 29 May 2026 22:28:15 +0200 Subject: [PATCH 8/8] ci: pass -d:disableMarchNative to avoid secp256k1 build failures (#3916) --- .github/workflows/ci-daily.yml | 3 ++- .github/workflows/ci.yml | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/.github/workflows/ci-daily.yml b/.github/workflows/ci-daily.yml index 009e6c523..dea30ab7e 100644 --- a/.github/workflows/ci-daily.yml +++ b/.github/workflows/ci-daily.yml @@ -8,7 +8,8 @@ on: env: NPROC: 2 MAKEFLAGS: "-j${NPROC}" - NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none" + NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative" + NIM_PARAMS: "-d:disableMarchNative" jobs: build: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 84a7f0b8d..9ddf904ef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,8 @@ concurrency: env: NPROC: 2 MAKEFLAGS: "-j${NPROC}" - NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none" + NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative" + NIM_PARAMS: "-d:disableMarchNative" NIM_VERSION: '2.2.4' NIMBLE_VERSION: '0.22.3' @@ -160,7 +161,7 @@ jobs: fi export MAKEFLAGS="-j1" - export NIMFLAGS="--colors:off -d:chronicles_colors:none" + export NIMFLAGS="--colors:off -d:chronicles_colors:none -d:disableMarchNative" export USE_LIBBACKTRACE=0 make V=1 POSTGRES=$postgres_enabled test