diff --git a/.github/workflows/ci-daily.yml b/.github/workflows/ci-daily.yml index d52775ae2..009e6c523 100644 --- a/.github/workflows/ci-daily.yml +++ b/.github/workflows/ci-daily.yml @@ -3,6 +3,7 @@ name: Daily logos-delivery CI on: schedule: - cron: '30 6 * * *' + workflow_dispatch: env: NPROC: 2 diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 52d20157a..f924d0f8b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,6 +43,7 @@ jobs: - 'tools/**' - 'tests/all_tests_v2.nim' - 'tests/**' + - 'channels/**' docker: - 'docker/**' @@ -176,20 +177,6 @@ jobs: secrets: inherit - js-waku-node: - needs: build-docker-image - uses: logos-messaging/logos-delivery-js/.github/workflows/test-node.yml@master - with: - nim_wakunode_image: ${{ needs.build-docker-image.outputs.image }} - test_type: node - - js-waku-node-optional: - needs: build-docker-image - uses: logos-messaging/logos-delivery-js/.github/workflows/test-node.yml@master - with: - nim_wakunode_image: ${{ needs.build-docker-image.outputs.image }} - test_type: node-optional - lint: name: "Lint" runs-on: ubuntu-22.04 diff --git a/AGENTS.md b/AGENTS.md index 28e455f47..ff7d29a39 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -16,7 +16,7 @@ Key architectural decisions: Resource-restricted first: Protocols differentiate between full nodes (relay) and light clients (filter, lightpush, store). Light clients can participate without maintaining full message history or relay capabilities. This explains the client/server split in protocol implementations. -Privacy through unlinkability: RLN (Rate Limiting Nullifier) provides DoS protection while preserving sender anonymity. Messages are routed through pubsub topics with automatic sharding across 8 shards. Code prioritizes metadata privacy alongside content encryption. +Privacy through unlinkability: RLN (Rate Limiting Nullifier) provides DoS protection while preserving sender anonymity. Messages are routed through pubsub topics with automatic content-topic-based sharding (shard count is configurable; generation-zero defaults to 8 shards on cluster 0). Code prioritizes metadata privacy alongside content encryption. Scalability via sharding: The network uses automatic content-topic-based sharding to distribute traffic. This is why you'll see sharding logic throughout the codebase and why pubsub topic selection is protocol-level, not application-level. @@ -36,7 +36,10 @@ See [documentation](https://docs.waku.org/learn/) for architectural details. ### Key Terminology - ENR (Ethereum Node Record): Node identity and capability advertisement - Multiaddr: libp2p addressing format (e.g., `/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2...`) -- PubsubTopic: Gossipsub topic for message routing (e.g., `/waku/2/default-waku/proto`) +- PubsubTopic: Gossipsub topic for message routing (shard-based, e.g., `/waku/2/rs//`; the default is `/waku/2/rs/0/0`) + - cluster-id: network id + - shard-id: shard differentiator inside the network - drivers mesh forming. + - autosharding: network supports n (configured) shards [0..n-1], shard derived from ContentTopic - ContentTopic: Application-level message categorization (e.g., `/my-app/1/chat/proto`) - Sharding: Partitioning network traffic across topics (static or auto-sharding) - RLN (Rate Limiting Nullifier): Zero-knowledge proof system for spam prevention @@ -77,29 +80,29 @@ type WakuFilter* = ref object of LPProtocol ### Build Requirements - Nim 2.x (check `waku.nimble` for minimum version) - Rust toolchain (required for RLN dependencies) -- Build system: Make with nimbus-build-system +- Build system: Make driven by Nimble (dependencies pinned in `nimble.lock`) ### Build System -The project uses Makefile with nimbus-build-system (Status's Nim build framework): +The project uses a Makefile that drives Nimble. Dependencies are resolved from +`nimble.lock` into a local `nimbledeps/` directory (tracked by the +`NIMBLEDEPS_STAMP` target). ```bash -# Initial build (updates submodules) +# Initial build (resolves Nimble deps automatically) make wakunode2 -# After git pull, update submodules -make update - # Build with custom flags make wakunode2 NIMFLAGS="-d:chronicles_log_level=DEBUG" ``` -Note: The build system uses `--mm:refc` memory management (automatically enforced). Only relevant if compiling outside the standard build system. +Note: The build uses `--mm:refc` memory management (passed automatically by the Nimble tasks in `waku.nimble`). Only relevant if compiling outside the standard build system. ### Common Make Targets ```bash make wakunode2 # Build main node binary make test # Run all tests make testcommon # Run common tests only -make libwakuStatic # Build static C library +make libwaku # Build the legacy C library (libwaku) +make liblogosdelivery. # Build actual C FFI library make chat2 # Build chat example make install-nph # Install git hook for auto-formatting ``` @@ -127,7 +130,7 @@ suite "Waku ENR - Capabilities": test "check capabilities support": ## Given let bitfield: CapabilitiesBitfield = 0b0000_1101u8 - + ## Then check: bitfield.supportsCapability(Capabilities.Relay) @@ -135,7 +138,7 @@ suite "Waku ENR - Capabilities": ``` ### Code Formatting -Mandatory: All code must be formatted with `nph` (vendored in `vendor/nph`) +Mandatory: All code must be formatted with `nph` (installed via `make build-nph`, which fetches a pinned `nph` version with Nimble) ```bash # Format specific file make nph/waku/waku_core.nim @@ -162,7 +165,6 @@ Compile with log level: nim c -d:chronicles_log_level=TRACE myfile.nim ``` - ## Code Conventions Common pitfalls: @@ -181,8 +183,13 @@ Common pitfalls: - Exceptions: `XxxError` for CatchableError, `XxxDefect` for Defect - ref object types: `XxxRef` suffix +### Calls and Member Access +- Prefer dot call syntax for predicates: `x.isNil()` instead of `isNil(x)` +- Use parentheses for "verbs" (operations/actions): `isSome()`, `handleRequest()` +- Omit parentheses for "nouns" (properties/values): `.len`, `.high` + ### Imports Organization -Group imports: stdlib, external libs, internal modules: +Stdlib + external in one `import` block, internal modules in a separate block: ```nim import std/[options, sequtils], # stdlib @@ -214,11 +221,11 @@ proc subscribe( ): Future[FilterSubscribeResult] {.async.} = if contentTopics.len > MaxContentTopicsPerRequest: return err(FilterSubscribeError.badRequest("exceeds maximum")) - + # Handle Result with isOkOr (await wf.subscriptions.addSubscription(peerId, criteria)).isOkOr: return err(FilterSubscribeError.serviceUnavailable(error)) - + ok() ``` @@ -460,8 +467,7 @@ nim c -r \ ### Vendor Directory - Never edit files directly in vendor - it is auto-generated from git submodules -- Always run `make update` after pulling changes -- Managed by `nimbus-build-system` +- Nimble dependencies are resolved from `nimble.lock` into `nimbledeps/` ### Chronicles Performance - Log levels are configured at compile time for performance @@ -475,7 +481,7 @@ nim c -r \ ### RLN Dependencies - RLN code requires a Rust toolchain, which explains Rust imports in some modules -- Pre-built `librln` libraries are checked into the repository +- `librln` is built from the vendored `zerokit` submodule via the `librln`/`rln-deps` Make targets ## Quick Reference @@ -483,18 +489,19 @@ Language: Nim 2.x | License: MIT or Apache 2.0 ### Important Files - `Makefile` - Primary build interface -- `waku.nimble` - Package definition and build tasks (called via nimbus-build-system) -- `vendor/nimbus-build-system/` - Status's build framework +- `waku.nimble` - Package definition and build tasks (invoked by the Makefile via Nimble) +- `nimble.lock` - Pinned dependency versions resolved into `nimbledeps/` - `waku/node/waku_node.nim` - Core node implementation - `apps/wakunode2/wakunode2.nim` - Main CLI application - `waku/factory/waku_conf.nim` - Configuration types -- `library/libwaku.nim` - C bindings entry point +- `liblogosdelivery/liblogosdelivery.nim` - C bindings entry point ### Testing Entry Points - `tests/all_tests_waku.nim` - All Waku protocol tests - `tests/all_tests_wakunode2.nim` - Node application tests - `tests/all_tests_common.nim` - Common utilities tests - +#### in-flight testing +- any test can be run separately by issuing `make test tests//.nim` ### Key Dependencies - `chronos` - Async framework - `nim-results` - Result type for error handling diff --git a/BearSSL.mk b/BearSSL.mk index 355e46563..65e4f72a7 100644 --- a/BearSSL.mk +++ b/BearSSL.mk @@ -9,7 +9,7 @@ ## bearssl (nimbledeps) ## ########################### # Rebuilds libbearssl.a from the package installed by nimble under -# nimbledeps/pkgs2/. Used by `make update` / $(NIMBLEDEPS_STAMP). +# nimbledeps/pkgs2/. Invoked via $(NIMBLEDEPS_STAMP) / build-deps. # # BEARSSL_NIMBLEDEPS_DIR is evaluated at parse time, so targets that # depend on it must be invoked via a recursive $(MAKE) call so the sub-make @@ -29,18 +29,11 @@ else PORTABLE_BEARSSL_CFLAGS := -W -Wall -Os -fPIC endif -.PHONY: clean-bearssl-nimbledeps rebuild-bearssl-nimbledeps +.PHONY: rebuild-bearssl-nimbledeps -clean-bearssl-nimbledeps: +rebuild-bearssl-nimbledeps: ifeq ($(BEARSSL_NIMBLEDEPS_DIR),) - $(error No bearssl package found under nimbledeps/pkgs2/ — run 'make update' first) -endif - + [ -e "$(BEARSSL_CSOURCES_DIR)/build" ] && \ - "$(MAKE)" -C "$(BEARSSL_CSOURCES_DIR)" clean || true - -rebuild-bearssl-nimbledeps: | clean-bearssl-nimbledeps -ifeq ($(BEARSSL_NIMBLEDEPS_DIR),) - $(error No bearssl package found under nimbledeps/pkgs2/ — run 'make update' first) + $(error No bearssl package found under nimbledeps/pkgs2/ — run 'make build-deps' first) endif @echo "Rebuilding bearssl from $(BEARSSL_CSOURCES_DIR)" + "$(MAKE)" -C "$(BEARSSL_CSOURCES_DIR)" CFLAGS="$(PORTABLE_BEARSSL_CFLAGS)" lib \ No newline at end of file diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 000000000..43c994c2d --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1 @@ +@AGENTS.md diff --git a/Makefile b/Makefile index f147c5e7e..ea1bf66f0 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,7 @@ export PATH := $(HOME)/.nimble/bin:$(PATH) # NIM binary location NIM_BINARY := $(shell which nim 2>/dev/null) NPH := $(HOME)/.nimble/bin/nph +NIMBLE := $(HOME)/.nimble/bin/nimble NIMBLEDEPS_STAMP := nimbledeps/.nimble-setup # Compilation parameters @@ -42,7 +43,8 @@ endif ########## ## Main ## ########## -.PHONY: all test update clean examples deps nimble install-nim install-nimble +# The Makefile automatically bootstraps dependency setup when needed for build and test targets. +.PHONY: all test clean examples deps nimble install-nim install-nimble # default target all: | wakunode2 libwaku liblogosdelivery @@ -69,18 +71,16 @@ endif waku.nims: ln -s waku.nimble $@ -$(NIMBLEDEPS_STAMP): nimble.lock | waku.nims - $(MAKE) install-nimble - nimble setup --localdeps - $(MAKE) build-nph - $(MAKE) rebuild-bearssl-nimbledeps - $(MAKE) rebuild-nat-libs-nimbledeps +$(NIMBLEDEPS_STAMP): nimble.lock | install-nimble build-nph waku.nims + $(NIMBLE) setup --localdeps touch $@ -update: - rm -f $(NIMBLEDEPS_STAMP) - $(MAKE) $(NIMBLEDEPS_STAMP) - nimble lock +# Must be phony so the recipe always runs and the sub-make re-evaluates +# BEARSSL_NIMBLEDEPS_DIR / NAT_TRAVERSAL_NIMBLEDEPS_DIR (parse-time variables) +# after nimble setup has populated nimbledeps/. +.PHONY: build-deps +build-deps: | $(NIMBLEDEPS_STAMP) + $(MAKE) rebuild-bearssl-nimbledeps rebuild-nat-libs-nimbledeps clean: rm -rf build 2> /dev/null || true @@ -93,15 +93,14 @@ REQUIRED_NIM_VERSION := $(shell grep -E '^const RequiredNimVersion\s*=' waku. REQUIRED_NIMBLE_VERSION := $(shell grep -E '^const RequiredNimbleVersion\s*=' waku.nimble | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"') install-nim: +ifneq ($(detected_OS),Windows) scripts/install_nim.sh $(REQUIRED_NIM_VERSION) +endif install-nimble: install-nim - @nimble_ver=$$(nimble --version 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1); \ - if [ "$$nimble_ver" = "$(REQUIRED_NIMBLE_VERSION)" ]; then \ - echo "nimble $(REQUIRED_NIMBLE_VERSION) already installed, skipping."; \ - else \ - cd $$(mktemp -d) && nimble install "nimble@$(REQUIRED_NIMBLE_VERSION)" -y; \ - fi +ifneq ($(detected_OS),Windows) + scripts/install_nimble.sh $(REQUIRED_NIMBLE_VERSION) +endif build: mkdir -p build @@ -203,7 +202,7 @@ clean: | clean-librln ################# .PHONY: testcommon -testcommon: | $(NIMBLEDEPS_STAMP) build +testcommon: | build-deps build echo -e $(BUILD_MSG) "build/$@" && \ nimble testcommon @@ -212,59 +211,59 @@ testcommon: | $(NIMBLEDEPS_STAMP) build ########## .PHONY: testwaku wakunode2 testwakunode2 example2 chat2 chat2bridge liteprotocoltester -testwaku: | $(NIMBLEDEPS_STAMP) build rln-deps librln +testwaku: | build-deps build rln-deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble test -wakunode2: | $(NIMBLEDEPS_STAMP) build deps librln +wakunode2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble wakunode2 -benchmarks: | $(NIMBLEDEPS_STAMP) build deps librln +benchmarks: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble benchmarks -testwakunode2: | $(NIMBLEDEPS_STAMP) build deps librln +testwakunode2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble testwakunode2 -example2: | $(NIMBLEDEPS_STAMP) build deps librln +example2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble example2 -chat2: | $(NIMBLEDEPS_STAMP) build deps librln +chat2: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble chat2 -chat2mix: | $(NIMBLEDEPS_STAMP) build deps librln +chat2mix: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble chat2mix -rln-db-inspector: | $(NIMBLEDEPS_STAMP) build deps librln +rln-db-inspector: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble rln_db_inspector -chat2bridge: | $(NIMBLEDEPS_STAMP) build deps librln +chat2bridge: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble chat2bridge -liteprotocoltester: | $(NIMBLEDEPS_STAMP) build deps librln +liteprotocoltester: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble liteprotocoltester -lightpushwithmix: | $(NIMBLEDEPS_STAMP) build deps librln +lightpushwithmix: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble lightpushwithmix -api_example: | $(NIMBLEDEPS_STAMP) build deps librln +api_example: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ $(ENV_SCRIPT) nim api_example $(NIM_PARAMS) waku.nims -build/%: | $(NIMBLEDEPS_STAMP) build deps librln +build/%: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$*" && \ nimble buildone $* -compile-test: | $(NIMBLEDEPS_STAMP) build deps librln +compile-test: | build-deps build deps librln echo -e $(BUILD_MSG) "$(TEST_FILE)" "\"$(TEST_NAME)\"" && \ nimble buildTest $(TEST_FILE) && \ nimble execTest $(TEST_FILE) "\"$(TEST_NAME)\"" @@ -276,11 +275,11 @@ compile-test: | $(NIMBLEDEPS_STAMP) build deps librln tools: networkmonitor wakucanary -wakucanary: | $(NIMBLEDEPS_STAMP) build deps librln +wakucanary: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble wakucanary -networkmonitor: | $(NIMBLEDEPS_STAMP) build deps librln +networkmonitor: | build-deps build deps librln echo -e $(BUILD_MSG) "build/$@" && \ nimble networkmonitor @@ -424,10 +423,10 @@ else ifeq ($(detected_OS),Linux) BUILD_COMMAND := $(BUILD_COMMAND)Linux endif -libwaku: | $(NIMBLEDEPS_STAMP) librln +libwaku: | build-deps librln nimble --verbose libwaku$(BUILD_COMMAND) waku.nimble -liblogosdelivery: | $(NIMBLEDEPS_STAMP) librln +liblogosdelivery: | build-deps librln nimble --verbose liblogosdelivery$(BUILD_COMMAND) waku.nimble logosdelivery_example: | build liblogosdelivery diff --git a/Nat.mk b/Nat.mk index 90d0b2ead..1161121ba 100644 --- a/Nat.mk +++ b/Nat.mk @@ -9,7 +9,7 @@ ## nat-libs (nimbledeps) ## ########################### # Builds miniupnpc and libnatpmp from the package installed by nimble under -# nimbledeps/pkgs2/. Used by `make update` / $(NIMBLEDEPS_STAMP). +# nimbledeps/pkgs2/. Invoked via $(NIMBLEDEPS_STAMP) / build-deps. # # NAT_TRAVERSAL_NIMBLEDEPS_DIR is evaluated at parse time, so targets that # depend on it must be invoked via a recursive $(MAKE) call so the sub-make @@ -28,20 +28,11 @@ else PORTABLE_NAT_MARCH := endif -.PHONY: clean-cross-nimbledeps rebuild-nat-libs-nimbledeps +.PHONY: rebuild-nat-libs-nimbledeps -clean-cross-nimbledeps: +rebuild-nat-libs-nimbledeps: ifeq ($(NAT_TRAVERSAL_NIMBLEDEPS_DIR),) - $(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make update' first) -endif - + [ -e "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/miniupnp/miniupnpc" ] && \ - "$(MAKE)" -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/miniupnp/miniupnpc" CC=$(CC) clean $(HANDLE_OUTPUT) || true - + [ -e "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" ] && \ - "$(MAKE)" -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" CC=$(CC) clean $(HANDLE_OUTPUT) || true - -rebuild-nat-libs-nimbledeps: | clean-cross-nimbledeps -ifeq ($(NAT_TRAVERSAL_NIMBLEDEPS_DIR),) - $(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make update' first) + $(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make build-deps' first) endif @echo "Rebuilding nat-libs from $(NAT_TRAVERSAL_NIMBLEDEPS_DIR)" ifeq ($(OS), Windows_NT) @@ -58,4 +49,4 @@ else + "$(MAKE)" CFLAGS="-Wall -Wno-cpp -Os -fPIC $(PORTABLE_NAT_MARCH) -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4 $(CFLAGS)" \ -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" \ CC=$(CC) libnatpmp.a $(HANDLE_OUTPUT) -endif +endif \ No newline at end of file diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 53eb5d04b..eeeea328b 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[tables, times, strutils, hashes, sequtils, json], + std/[tables, times, strutils, hashes, sequtils, json, options], chronos, confutils, chronicles, @@ -267,10 +267,16 @@ when isMainModule: else: nodev2ExtPort + let nodev2Key = + if conf.nodekey.isSome(): + conf.nodekey.get() + else: + crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() + let bridge = Chat2Matterbridge.new( mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)), mbGateway = conf.mbGateway, - nodev2Key = conf.nodekey, + nodev2Key = nodev2Key, nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift), nodev2ExtIp = nodev2ExtIp, diff --git a/apps/chat2bridge/config_chat2bridge.nim b/apps/chat2bridge/config_chat2bridge.nim index abb5e329f..048fc4d87 100644 --- a/apps/chat2bridge/config_chat2bridge.nim +++ b/apps/chat2bridge/config_chat2bridge.nim @@ -1,4 +1,5 @@ import + std/options, confutils, confutils/defs, confutils/std/net, @@ -45,7 +46,7 @@ type Chat2MatterbridgeConf* = object metricsServerAddress* {. desc: "Listening address of the metrics server", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress @@ -62,10 +63,8 @@ type Chat2MatterbridgeConf* = object .}: seq[string] nodekey* {. - desc: "P2P node private key as hex", - defaultValue: crypto.PrivateKey.random(Secp256k1, newRng()[]).tryGet(), - name: "nodekey" - .}: crypto.PrivateKey + desc: "P2P node private key as hex", defaultValueDesc: "random", name: "nodekey" + .}: Option[crypto.PrivateKey] store* {. desc: "Flag whether to start store protocol", defaultValue: true, name: "store" @@ -94,7 +93,7 @@ type Chat2MatterbridgeConf* = object # Matterbridge options mbHostAddress* {. desc: "Listening address of the Matterbridge host", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "mb-host-address" .}: IpAddress diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim index 4e5a32e6d..639e14986 100644 --- a/apps/chat2mix/config_chat2mix.nim +++ b/apps/chat2mix/config_chat2mix.nim @@ -162,7 +162,8 @@ type metricsServerAddress* {. desc: "Listening address of the metrics server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: + IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress @@ -194,7 +195,10 @@ type dnsDiscoveryNameServers* {. desc: "DNS name server IPs to query. Argument may be repeated.", - defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")], + defaultValue: @[ + IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1]), + IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 0, 0, 1]), + ], name: "dns-discovery-name-server" .}: seq[IpAddress] diff --git a/apps/liteprotocoltester/tester_config.nim b/apps/liteprotocoltester/tester_config.nim index dee918b8c..1f4bedaa8 100644 --- a/apps/liteprotocoltester/tester_config.nim +++ b/apps/liteprotocoltester/tester_config.nim @@ -133,7 +133,7 @@ type LiteProtocolTesterConf* = object ## Tester REST service configuration restAddress* {. desc: "Listening address of the REST HTTP server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "rest-address" .}: IpAddress diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index f67fb09a8..b5bcfbd96 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -116,7 +116,7 @@ type NetworkMonitorConf* = object metricsServerAddress* {. desc: "Listening address of the metrics server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress diff --git a/channels/encryption/encryption.nim b/channels/encryption/encryption.nim new file mode 100644 index 000000000..5cb53be2f --- /dev/null +++ b/channels/encryption/encryption.nim @@ -0,0 +1,25 @@ +## Optional encryption hooks for the Reliable Channel API. +## +## Modelled as `RequestBroker`s: the broker pattern lets the channel +## delegate work to a provider that may live in any module without +## introducing a direct dependency. If no provider is registered the +## broker returns an error, so installing the noop providers from +## `noop_encryption` is required when the application does not want +## actual encryption. +## +## Applied per-segment after SDS processing on outgoing, and before +## SDS processing on incoming. No specific scheme is mandated. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import brokers/request_broker + +export request_broker + +RequestBroker: + type Encrypt* = seq[byte] + proc signature*(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} + +RequestBroker: + type Decrypt* = seq[byte] + proc signature*(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} diff --git a/channels/encryption/noop_encryption.nim b/channels/encryption/noop_encryption.nim new file mode 100644 index 000000000..f09ed9cf4 --- /dev/null +++ b/channels/encryption/noop_encryption.nim @@ -0,0 +1,18 @@ +## No-op encryption providers. Install these when the application does +## not want actual encryption so the `Encrypt` / `Decrypt` brokers have +## something to dispatch to. + +import results +import chronos +import ./encryption + +proc setNoopEncryption*() = + discard Encrypt.setProvider( + proc(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} = + return ok(Encrypt(payload)) + ) + + discard Decrypt.setProvider( + proc(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} = + return ok(Decrypt(payload)) + ) diff --git a/channels/events.nim b/channels/events.nim new file mode 100644 index 000000000..2ee20a26d --- /dev/null +++ b/channels/events.nim @@ -0,0 +1,23 @@ +## Reliable Channel event types emitted to API consumers. +## +## Lifecycle events for individual segments (sent / propagated / errored) +## are the same as the network-level ones the MessagingClient already +## emits — `requestId` is shared across layers — so we just re-export +## `waku/events/message_events` and avoid declaring duplicates. +## +## Only the channel-level `MessageReceivedEvent` carries data that has +## no analogue in the lower layer (reassembled application payload, +## senderId, channelId), so it lives here. + +import waku/events/message_events as waku_message_events +import brokers/event_broker + +import ./types as channel_types + +export waku_message_events, channel_types, event_broker + +EventBroker: + type ChannelMessageReceivedEvent* = object + channelId*: ChannelId + senderId*: SdsParticipantID + payload*: seq[byte] diff --git a/channels/rate_limit_manager/rate_limit_manager.nim b/channels/rate_limit_manager/rate_limit_manager.nim new file mode 100644 index 000000000..5ea6486a5 --- /dev/null +++ b/channels/rate_limit_manager/rate_limit_manager.nim @@ -0,0 +1,80 @@ +## Rate Limit Manager for the Reliable Channel API. +## +## Tracks messages sent per RLN epoch and delays dispatch when the +## limit is approached, ensuring RLN compliance on enforcing relays. +## +## For the skeleton this is a pass-through: messages are immediately +## released as ready-to-send. Real epoch budgeting will be added later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/times +import message +import brokers/event_broker +import brokers/broker_context + +export event_broker, broker_context +export message.SdsChannelID + +const + DefaultEpochPeriodSec* = 600 + DefaultMessagesPerEpoch* = 1 + +EventBroker: + ## Emitted by `enqueueToSend` carrying the batch of opaque message + ## blobs that may now leave the rate limiter and continue down the + ## outgoing pipeline (encryption -> dispatch). Bytes only: the rate + ## limiter is intentionally agnostic of SDS, so anything serialisable + ## can flow through it. + ## + ## `channelId` lets listeners filter to their own channel, since all + ## reliable channels share the underlying Waku node's broker context. + type ReadyToSendEvent* = object + channelId*: SdsChannelID + msgs*: seq[seq[byte]] + +type + RateLimitConfig* = object + enabled*: bool ## spec: rate limiting opt-in; SHOULD be true when RLN active + epochPeriodSec*: int + messagesPerEpoch*: int + + RateLimitManager* = ref object + config*: RateLimitConfig + queue*: seq[seq[byte]] + currentEpochStart*: Time + sentInCurrentEpoch*: int + channelId*: SdsChannelID ## tag for the emitted `ReadyToSendEvent` + brokerCtx: BrokerContext + +proc new*( + T: type RateLimitManager, + config: RateLimitConfig, + channelId: SdsChannelID, + brokerCtx: BrokerContext = globalBrokerContext(), +): T = + return T( + config: config, + queue: @[], + currentEpochStart: getTime(), + sentInCurrentEpoch: 0, + channelId: channelId, + brokerCtx: brokerCtx, + ) + +proc enqueueToSend*(self: RateLimitManager, msg: seq[byte]) = + ## Skeleton behaviour: enqueue and immediately release as a single + ## ready batch. Real per-epoch budgeting will park messages on + ## `self.queue` and emit only when the budget allows. + ReadyToSendEvent.emit( + self.brokerCtx, ReadyToSendEvent(channelId: self.channelId, msgs: @[msg]) + ) + +proc dequeueReady*(self: RateLimitManager): seq[seq[byte]] = + ## Returns the set of queued messages that may be dispatched now + ## without exceeding the configured rate limit. + discard + +proc resetEpoch*(self: RateLimitManager) = + self.currentEpochStart = getTime() + self.sentInCurrentEpoch = 0 diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim new file mode 100644 index 000000000..f6a1f2ec4 --- /dev/null +++ b/channels/reliable_channel.nim @@ -0,0 +1,264 @@ +## Reliable Channel type. +## +## A `ReliableChannel` orchestrates segmentation, SDS (end-to-end +## reliability), optional encryption, and rate-limited dispatch on top +## of the Messaging API for a single channel. +## +## Outgoing pipeline: Segment -> SDS -> Rate Limit -> Encrypt -> Dispatch +## Incoming pipeline: Decrypt -> SDS -> Reassemble -> Emit event +## +## Channels are owned by a `ReliableChannelManager`. Lifecycle and send +## operations are addressed by `ChannelId`, so callers only need to keep +## an opaque handle around. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/[options, tables] +import results +import chronos +import bearssl/rand +import stew/byteutils +import libp2p/crypto/crypto as libp2p_crypto + +import waku/messaging_client +import waku/node/delivery_service/send_service +import waku/waku_core/topics + +import ./events +import ./segmentation/segmentation +import ./scalable_data_sync/scalable_data_sync +import ./rate_limit_manager/rate_limit_manager +import ./encryption/encryption + +export + messaging_client, send_service, events, segmentation, scalable_data_sync, + rate_limit_manager, encryption + +const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" + ## Wire-format spec marker for the Reliable Channel layer, as defined + ## in the reliable-channel-api LIP (`Wire Format / Spec Marker`). + ## A `WakuMessage` whose `meta` field does not equal these bytes is + ## not addressed to this layer and is silently dropped on ingress. + ## The trailing `/N` is the wire-format version and is bumped only + ## on breaking on-the-wire changes; implementations pin one version. + +type ReliableChannel* = ref object + ## Spec-defined public type. Fields are private so callers cannot + ## mutate internals and break invariants. Getters are added below + ## for the few values consumers may need. + messagingClient: MessagingClient + channelId: ChannelId + contentTopic: ContentTopic + senderId: SdsParticipantID + rng: ref HmacDrbgContext + segmentation: SegmentationHandler + sdsHandler: SdsHandler + rateLimit: RateLimitManager + + requestIds: Table[RequestId, seq[RequestId]] + pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]] + brokerCtx: BrokerContext + ## Captured here so the channel emits `ChannelMessageReceivedEvent` + ## on the same broker context the owning manager registered its + ## listeners on. Without this, an emit via `globalBrokerContext()` + ## would land on whatever context happens to be thread-local at + ## emit time, which is not necessarily the manager's. + +func getChannelId*(self: ReliableChannel): ChannelId {.inline.} = + self.channelId + +func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} = + self.contentTopic + +func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = + self.senderId + +proc onReadyToSend( + self: ReliableChannel, msgs: seq[seq[byte]] +) {.async: (raises: []).} = + ## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent` + ## listener once `rate_limit_manager` releases a batch of opaque + ## blobs (already-encoded SDS messages): + ## + ## ... -> rate_limit_manager -> [encryption] -> dispatch + for m in msgs: + ## Each `m` was preceded by exactly one push onto `pendingRequests` + ## in `send`, so this pop is always safe in the current skeleton. + let pending = self.pendingRequests[0] + self.pendingRequests.delete(0) + + ## TODO: revisit which fields of the SDS message must be encrypted. + ## Encrypting the whole encoded blob forces every receiver to attempt + ## decryption before it can route, which breaks selective dispatch. + ## Leave routing metadata (channelId, causal-history references) in + ## clear and encrypt only the application payload. + let encRes = await Encrypt.request(m) + let encrypted = encRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: pending.parent, + messageHash: "", + error: "encryption failed: " & error, + ), + ) + continue + let wireBytes = seq[byte](encrypted) + + let envelope = MessageEnvelope( + contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral + ) + + let deliveryReqId = RequestId.new(self.rng) + let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr: + ## TODO: emit waku `MessageErrorEvent` for the parent request id. + continue + + ## Stamp the Reliable Channel wire-format spec marker so the ingress + ## side of any peer can route this WakuMessage to its Reliable + ## Channel layer. Done on the constructed WakuMessage rather than + ## via the envelope because `MessageEnvelope` does not expose a + ## `meta` field. + deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes() + + asyncSpawn self.messagingClient.sendService.send(deliveryTask) + self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId) + +proc send*( + self: ReliableChannel, payload: seq[byte], ephemeral: bool = false +): Result[RequestId, string] = + ## Single application-level send. The first three stages of the + ## outgoing pipeline are chained explicitly so the flow is visible + ## at a glance: + ## + ## segmentation -> sds -> rate_limit_manager + ## + ## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with + ## the SDS messages cleared for transmission; the channel's listener + ## then runs the final stage (encryption -> dispatch). The `ephemeral` + ## flag is carried alongside each segment in `pendingRequests` and + ## stamped onto the eventual `MessageEnvelope`. + ## + ## The returned `RequestId` is the parent of one-or-more + ## delivery-service `RequestId`s; the mapping is recorded in + ## `self.requestIds`. + if payload.len == 0: + return err("empty payload") + + let parentReqId = RequestId.new(self.rng) + self.requestIds[parentReqId] = @[] + + for segmentBytes in self.segmentation.performSegmentation(payload): + ## Segments arrive already encoded; the segmentation module owns + ## the wire format so SDS only ever sees opaque bytes. + let sdsBytes = self.sdsHandler.wrapOutgoing( + self.channelId, self.senderId, segmentBytes + ).valueOr: + return err("SDS wrap failed: " & error) + self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral)) + self.rateLimit.enqueueToSend(sdsBytes) + + return ok(parentReqId) + +proc onMessageReceived( + self: ReliableChannel, messageHash: string, payload: seq[byte] +) {.async: (raises: []).} = + ## Ingress pipeline made visible: + ## + ## payload -> decrypt -> sds -> reassemble -> emit + ## + ## Invoked from this channel's `MessageReceivedEvent` listener, which + ## already filtered on the spec marker and on `contentTopic`. The + ## channel only sees the raw payload bytes for itself. + + ## Notice that the following "request" is implemented implicitly as a broker call to + ## the `Decrypt` request broker. + let decRes = await Decrypt.request(payload) + let plaintext = decRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: RequestId(""), + messageHash: messageHash, + error: "decryption failed: " & error, + ), + ) + return + let plaintextBytes = seq[byte](plaintext) + + let unwrapped = self.sdsHandler.handleIncoming(plaintextBytes) + if unwrapped.isErr(): + return + + let reassembled = self.segmentation.handleIncomingSegment(unwrapped.get().content) + if reassembled.isSome(): + ## Emit on the captured `brokerCtx` (the manager's), so the + ## application listener that the manager has set up on that same + ## context picks the event up. + ChannelMessageReceivedEvent.emit( + self.brokerCtx, + ChannelMessageReceivedEvent( + channelId: self.channelId, + senderId: self.senderId, + payload: reassembled.get().payload, + ), + ) + +proc new*( + T: type ReliableChannel, + messagingClient: MessagingClient, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, + segConfig: SegmentationConfig, + sdsConfig: SdsConfig, + rateConfig: RateLimitConfig, + brokerCtx: BrokerContext = globalBrokerContext(), +): T = + ## Pipeline handlers (segmentation/SDS/rate-limit) are constructed + ## inside the channel rather than handed in by the caller — they are + ## implementation details of the channel, not knobs the API consumer + ## should be wiring up. Encryption is delegated to the `Encrypt`/ + ## `Decrypt` request brokers, so the channel keeps no per-instance + ## encryption state either. + let chn = T( + messagingClient: messagingClient, + channelId: channelId, + contentTopic: contentTopic, + senderId: senderId, + rng: libp2p_crypto.newRng(), + segmentation: SegmentationHandler.new(segConfig), + sdsHandler: SdsHandler.new(sdsConfig, senderId), + rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx), + requestIds: initTable[RequestId, seq[RequestId]](), + pendingRequests: @[], + brokerCtx: brokerCtx, + ) + + ## Each channel owns its own egress + ingress listeners on + ## `chn.brokerCtx`, filtered to traffic addressed to this channel. + ## Keeping the listeners (and the procs they call) inside the + ## channel lets `onReadyToSend` and `onMessageReceived` stay private + ## — the manager doesn't need to know about them. + discard ReadyToSendEvent.listen( + chn.brokerCtx, + proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} = + if evt.channelId == chn.channelId: + await chn.onReadyToSend(evt.msgs) + , + ) + + discard MessageReceivedEvent.listen( + chn.brokerCtx, + proc(evt: MessageReceivedEvent): Future[void] {.async: (raises: []).} = + ## Drop foreign traffic (non-Reliable-Channel `meta`) and traffic + ## for other channels before doing any decode work. + if string.fromBytes(evt.message.meta) != LipWireReliableChannelVersion: + return + if evt.message.contentTopic != chn.contentTopic: + return + await chn.onMessageReceived(evt.messageHash, evt.message.payload) + , + ) + + return chn diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim new file mode 100644 index 000000000..c94baa836 --- /dev/null +++ b/channels/reliable_channel_manager.nim @@ -0,0 +1,133 @@ +## Reliable Channel API entry point. +## +## Owns the set of `ReliableChannel` instances and exposes lifecycle and +## send/receive operations addressed by `ChannelId`. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/tables +import results +import chronos +import stew/byteutils + +import brokers/broker_context + +import waku/events/message_events as waku_message_events +import waku/messaging_client +import waku/waku_core/topics + +import ./reliable_channel +import ./encryption/noop_encryption + +export reliable_channel + +type ReliableChannelManager* = ref object + channels: Table[ChannelId, ReliableChannel] + messagingClient: MessagingClient + ## Borrowed from the owning `Waku`. The ownership chain is + ## Waku -> ReliableChannelManager -> MessagingClient (also Waku-owned). + ## Hidden so callers can't substitute their own and bypass the + ## manager's pipeline. + brokerCtx: BrokerContext + +proc new*( + T: type ReliableChannelManager, + messagingClient: MessagingClient, + brokerCtx: BrokerContext = globalBrokerContext(), +): Result[T, string] = + if messagingClient.isNil(): + return err("messaging client is required") + ok( + T( + channels: initTable[ChannelId, ReliableChannel](), + messagingClient: messagingClient, + brokerCtx: brokerCtx, + ) + ) + +proc start*(self: ReliableChannelManager): Result[void, string] = + ## Placeholder: per-channel listeners are installed in `ReliableChannel.new`, + ## so the manager has nothing to start at this layer. Kept for symmetry + ## with the `Waku` mount/start lifecycle and as a hook for future state. + discard + ok() + +proc stop*(self: ReliableChannelManager) {.async.} = + ## Placeholder mirror of `start`. + discard + +proc createReliableChannel*( + self: ReliableChannelManager, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, +): Result[ChannelId, string] = + ## Spec entry point. The `MessagingClient` and `rng` the channel needs + ## are sourced from the owning `ReliableChannelManager` rather than + ## passed per call. Encryption is wired up through the `Encrypt`/ + ## `Decrypt` request brokers — the application installs its own + ## providers (or `setNoopEncryption()`) before traffic flows. + ## + ## Segmentation, SDS and rate-limit configs will eventually be read + ## from the node's `NodeConfig`. Defaults for now. + if self.channels.hasKey(channelId): + return err("channel already exists: " & channelId) + + let segConfig = SegmentationConfig( + segmentSizeBytes: DefaultSegmentSizeBytes, + enableReedSolomon: false, + persistence: nil, + ) + let sdsConfig = SdsConfig( + acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, + maxRetransmissions: DefaultMaxRetransmissions, + causalHistorySize: DefaultCausalHistorySize, + persistence: nil, + ) + let rateConfig = RateLimitConfig( + epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch + ) + + let chn = ReliableChannel.new( + messagingClient = self.messagingClient, + channelId = channelId, + contentTopic = contentTopic, + senderId = senderId, + segConfig = segConfig, + sdsConfig = sdsConfig, + rateConfig = rateConfig, + brokerCtx = self.brokerCtx, + ) + + self.channels[channelId] = chn + return ok(channelId) + +proc closeChannel*( + self: ReliableChannelManager, channelId: ChannelId +): Result[void, string] = + ## Flush state, persist outstanding SDS buffers, release resources. + if not self.channels.hasKey(channelId): + return err("unknown channel: " & channelId) + self.channels.del(channelId) + return ok() + +proc send*( + self: ReliableChannelManager, + channelId: ChannelId, + appPayload: seq[byte], + ephemeral: bool = false, +): Result[RequestId, string] = + ## Spec-level entry point. Looks the channel up by id and delegates + ## to `ReliableChannel.send`, which exposes the visible pipeline + ## segmentation -> sds -> rate_limit_manager -> encryption. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): + return err("unknown channel: " & channelId) + return chn.send(appPayload, ephemeral) + +## Inbound messages are not handed to the manager by direct call. Each +## `ReliableChannel` installs its own `MessageReceivedEvent` listener +## in `ReliableChannel.new`, filters by spec marker and `contentTopic`, +## and routes to its private `onMessageReceived`. This keeps the lower +## layer (MessagingClient/Waku) unaware of the existence of ReliableChannel +## and keeps the manager out of per-channel event dispatch. diff --git a/channels/scalable_data_sync/scalable_data_sync.nim b/channels/scalable_data_sync/scalable_data_sync.nim new file mode 100644 index 000000000..30ad0e02b --- /dev/null +++ b/channels/scalable_data_sync/scalable_data_sync.nim @@ -0,0 +1,62 @@ +## Scalable Data Sync (SDS) component for the Reliable Channel API. +## +## Provides end-to-end delivery guarantees via causal history tracking, +## acknowledgements, and retransmission of unacknowledged segments. +## +## Skeleton: `wrapOutgoing` and `handleIncoming` are pass-throughs so +## the send/receive circuit can exercise the surrounding pipeline. +## Real SDS wrapping will plug in via `nim-sds` later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import results +import message as sds_message + +import ./sds_persistence + +export sds_message, sds_persistence + +const + DefaultAcknowledgementTimeoutMs* = 5_000 + DefaultMaxRetransmissions* = 5 + DefaultCausalHistorySize* = 2 + +type + SdsConfig* = object + acknowledgementTimeoutMs*: int + maxRetransmissions*: int + causalHistorySize*: int + persistence*: SdsPersistence + + SdsHandler* = ref object + config*: SdsConfig + participantId*: SdsParticipantID + +proc new*( + T: type SdsHandler, + config: SdsConfig, + participantId: SdsParticipantID = SdsParticipantID(""), +): T = + return T(config: config, participantId: participantId) + +proc wrapOutgoing*( + self: SdsHandler, + channelId: SdsChannelID, + senderId: SdsParticipantID, + payload: seq[byte], +): Result[seq[byte], string] = + ## Stage 2 of the outgoing pipeline (segmentation -> sds -> rate_limit_manager -> encryption). + ## Skeleton: pass the encoded segment through unchanged. Real causal + ## history / lamport / bloom-filter population will replace this. + return ok(payload) + +proc handleIncoming*( + self: SdsHandler, msg: seq[byte] +): Result[tuple[content: seq[byte], channelId: SdsChannelID], string] = + ## Skeleton: pass the bytes through; channel id is left empty until + ## the real wire format provides it. + return ok((content: msg, channelId: SdsChannelID(""))) + +proc tickRetransmissions*(self: SdsHandler) = + ## Drives retransmissions of unacknowledged messages. + discard diff --git a/channels/scalable_data_sync/sds_persistence.nim b/channels/scalable_data_sync/sds_persistence.nim new file mode 100644 index 000000000..8089595ea --- /dev/null +++ b/channels/scalable_data_sync/sds_persistence.nim @@ -0,0 +1,25 @@ +## Persistence backend for SDS outgoing buffer and causal history. +## +## TODO (raised in PR review): this surface is duplicating concerns that +## should come from the SDS module itself. Once the SDS module exposes a +## complete persistence contract, drop this file and import that surface +## instead of re-declaring it here. + +import message + +type + SdsPersistenceKind* {.pure.} = enum + InMemory + Sqlite + + SdsPersistence* = ref object of RootObj + kind*: SdsPersistenceKind + +method storeOutgoing*(self: SdsPersistence, msg: SdsMessage) {.base.} = + discard + +method markAcknowledged*(self: SdsPersistence, messageId: SdsMessageID) {.base.} = + discard + +method unackedOlderThan*(self: SdsPersistence, ageMs: int): seq[SdsMessage] {.base.} = + discard diff --git a/channels/segmentation/segment_message_proto.nim b/channels/segmentation/segment_message_proto.nim new file mode 100644 index 000000000..f19cdc27f --- /dev/null +++ b/channels/segmentation/segment_message_proto.nim @@ -0,0 +1,34 @@ +## Wire format for a single segment, per the Reliable Channel API spec. +## +## Skeleton: encode/decode treat the segment as just its payload bytes, +## since for now we only ever produce a single segment per send. + +type SegmentMessageProto* = object + entireMessageHash*: seq[byte] ## Keccak256(original payload), 32 bytes + dataSegmentIndex*: uint32 ## zero-indexed sequence number for data segments + dataSegmentCount*: uint32 ## number of data segments (>= 1) + payload*: seq[byte] ## segment payload (data or parity shard) + paritySegmentIndex*: uint32 ## zero-based sequence number for parity segments + paritySegmentCount*: uint32 ## number of parity segments + isParity*: bool ## true for parity segments, false (default) for data segments + +proc isParityMessage*(self: SegmentMessageProto): bool = + self.isParity + +proc isValid*(self: SegmentMessageProto): bool = + ## Validates hash length (32 bytes), segment indices and counts. + discard + +proc encode*(self: SegmentMessageProto): seq[byte] = + self.payload + +proc decode*(T: type SegmentMessageProto, buf: seq[byte]): T = + T( + entireMessageHash: @[], + dataSegmentIndex: 0, + dataSegmentCount: 1, + payload: buf, + paritySegmentIndex: 0, + paritySegmentCount: 0, + isParity: false, + ) diff --git a/channels/segmentation/segmentation.nim b/channels/segmentation/segmentation.nim new file mode 100644 index 000000000..9fc7964c0 --- /dev/null +++ b/channels/segmentation/segmentation.nim @@ -0,0 +1,70 @@ +## Segmentation component for the Reliable Channel API. +## +## Splits large application payloads into transmittable segments and +## reassembles them on reception. Supports optional Reed-Solomon parity +## segments for loss recovery, as per the Reliable Channel API spec. +## +## For the skeleton everything fits in a single segment: real chunking +## and Reed-Solomon parity will be plugged in later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/options +import ./segment_message_proto +import ./segmentation_persistence + +export segment_message_proto, segmentation_persistence + +const + DefaultSegmentSizeBytes* = 102_400 + SegmentsParityRate* = 0.125 + SegmentsReedSolomonMaxCount* = 256 + +type + SegmentationConfig* = object + segmentSizeBytes*: int + enableReedSolomon*: bool + persistence*: SegmentationPersistence + + SegmentationHandler* = ref object + config*: SegmentationConfig + + ReassemblyResult* = object + payload*: seq[byte] + entireMessageHash*: seq[byte] + +proc new*(T: type SegmentationHandler, config: SegmentationConfig): T = + return T(config: config) + +proc performSegmentation*( + self: SegmentationHandler, payload: seq[byte] +): seq[seq[byte]] = + ## Skeleton behaviour: emit exactly one segment carrying the whole + ## payload. Real chunking and Reed-Solomon parity will replace this. + let segment = SegmentMessageProto( + entireMessageHash: @[], + dataSegmentIndex: 0, + dataSegmentCount: 1, + payload: payload, + paritySegmentIndex: 0, + paritySegmentCount: 0, + isParity: false, + ) + return @[segment.encode()] + +proc handleIncomingSegment*( + self: SegmentationHandler, segmentBytes: seq[byte] +): Option[ReassemblyResult] = + ## Skeleton behaviour: every segment is already a complete message + ## (since `performSegmentation` always emits one), so just hand the + ## payload straight back. + let segment = SegmentMessageProto.decode(segmentBytes) + return some( + ReassemblyResult( + payload: segment.payload, entireMessageHash: segment.entireMessageHash + ) + ) + +proc cleanupSegments*(self: SegmentationHandler) = + ## Drop expired partial-reassembly state. + discard diff --git a/channels/segmentation/segmentation_persistence.nim b/channels/segmentation/segmentation_persistence.nim new file mode 100644 index 000000000..cc34c36d2 --- /dev/null +++ b/channels/segmentation/segmentation_persistence.nim @@ -0,0 +1,20 @@ +## Persistence backend interface for segmentation reassembly state. +## +## Allows partial reassembly state to survive process restarts. + +type + SegmentationPersistenceKind* {.pure.} = enum + InMemory + Sqlite + + SegmentationPersistence* = ref object of RootObj + kind*: SegmentationPersistenceKind + +method put*(self: SegmentationPersistence, key: seq[byte], value: seq[byte]) {.base.} = + discard + +method get*(self: SegmentationPersistence, key: seq[byte]): seq[byte] {.base.} = + discard + +method delete*(self: SegmentationPersistence, key: seq[byte]) {.base.} = + discard diff --git a/channels/types.nim b/channels/types.nim new file mode 100644 index 000000000..4070ed620 --- /dev/null +++ b/channels/types.nim @@ -0,0 +1,15 @@ +## Core identifier types for the Reliable Channel API. + +import std/hashes +import waku/api/types as api_types + +import ./scalable_data_sync/scalable_data_sync + +export scalable_data_sync +export api_types + +type ChannelId* = SdsChannelID + +proc hash*(r: RequestId): Hash = + ## Allows `RequestId` to be used as a `Table` key. + hash(string(r)) diff --git a/flake.nix b/flake.nix index 077668b9a..6c283780d 100644 --- a/flake.nix +++ b/flake.nix @@ -89,8 +89,15 @@ inherit zerokitRln; gitVersion = "v${nimbleVersion}-g${builtins.substring 0 6 shortRev}"; }; + + wakucanary = pkgs.callPackage ./nix/default.nix { + inherit pkgs; + src = ./.; + targets = ["wakucanary"]; + zerokitRln = zerokit.packages.${system}.rln; + }; in { - inherit liblogosdelivery; + inherit liblogosdelivery wakucanary; # Expose the cargoHash-corrected librln so downstream consumers # (e.g. logos-delivery-module) bundle the exact same librln this # build links, instead of pulling zerokit's rln directly — whose diff --git a/liblogosdelivery/logos_delivery_api/node_api.nim b/liblogosdelivery/logos_delivery_api/node_api.nim index 3d9442726..9818626cc 100644 --- a/liblogosdelivery/logos_delivery_api/node_api.nim +++ b/liblogosdelivery/logos_delivery_api/node_api.nim @@ -177,6 +177,11 @@ proc logosdelivery_start_node( chronicles.error "mountMessagingClient failed", err = errMsg return err("failed to mount messaging: " & errMsg) + ctx.myLib[].mountReliableChannelManager().isOkOr: + let errMsg = $error + chronicles.error "mountReliableChannelManager failed", err = errMsg + return err("failed to mount reliable channel manager: " & errMsg) + (await ctx.myLib[].start()).isOkOr: let errMsg = $error chronicles.error "START_NODE failed", err = errMsg diff --git a/nix/default.nix b/nix/default.nix index 7b7989e1a..ec9e0542c 100644 --- a/nix/default.nix +++ b/nix/default.nix @@ -1,6 +1,7 @@ { pkgs , src , zerokitRln +, targets ? [] , gitVersion ? "n/a" , enablePostgres ? true , enableNimDebugDlOpen ? true @@ -10,6 +11,8 @@ let deps = import ./deps.nix { inherit pkgs; }; + buildWakucanary = builtins.elem "wakucanary" targets; + nimDefineArgs = pkgs.lib.concatStringsSep " \\\n " ( [ "--define:disable_libbacktrace" "--define:git_version=${gitVersion}" ] @@ -34,9 +37,29 @@ let if pkgs.stdenv.hostPlatform.isWindows then "dll" else if pkgs.stdenv.hostPlatform.isDarwin then "dylib" else "so"; + + # Shared `nim c` invocation. Callers vary the output, the source file and a + # few mode-specific flags (e.g. --app:lib, --noMain, --header); everything + # else (paths, defines, threading, gc, nimcache, rln linkage) is constant. + # $NAT_TRAV and $NIMCACHE are shell variables defined in buildPhase. + nimCompile = { outFile, sourceFile, extraArgs ? [] }: '' + nim c \ + --noNimblePath \ + ${pathArgs} \ + --path:$NAT_TRAV \ + --path:$NAT_TRAV/src \ + --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ + ${nimDefineArgs} \ + --threads:on \ + --mm:refc \ + --nimcache:$NIMCACHE \ + --out:${outFile} \ + ${pkgs.lib.concatStringsSep " \\\n " extraArgs} \ + ${sourceFile} + ''; in pkgs.stdenv.mkDerivation { - pname = "liblogosdelivery"; + pname = if buildWakucanary then "wakucanary" else "liblogosdelivery"; version = "dev"; inherit src; @@ -71,45 +94,47 @@ pkgs.stdenv.mkDerivation { make -C $NAT_TRAV/vendor/libnatpmp-upstream \ CFLAGS="-Wall -Os -fPIC -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4" libnatpmp.a + ${if buildWakucanary then '' + echo "== Building wakucanary ==" + ${nimCompile { + outFile = "build/wakucanary"; + sourceFile = "apps/wakucanary/wakucanary.nim"; + extraArgs = [ "--path:." ]; + }} + '' else '' echo "== Building liblogosdelivery (dynamic) ==" - nim c \ - --noNimblePath \ - ${pathArgs} \ - --path:$NAT_TRAV \ - --path:$NAT_TRAV/src \ - --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ - ${nimDefineArgs} \ - --out:build/liblogosdelivery.${libExt} \ - --app:lib \ - --threads:on \ - --opt:size \ - --noMain \ - --mm:refc \ - --header \ - --nimMainPrefix:liblogosdelivery \ - --nimcache:$NIMCACHE \ - liblogosdelivery/liblogosdelivery.nim + ${nimCompile { + outFile = "build/liblogosdelivery.${libExt}"; + sourceFile = "liblogosdelivery/liblogosdelivery.nim"; + extraArgs = [ + "--app:lib" + "--opt:size" + "--noMain" + "--header" + "--nimMainPrefix:liblogosdelivery" + ]; + }} echo "== Building liblogosdelivery (static) ==" - nim c \ - --noNimblePath \ - ${pathArgs} \ - --path:$NAT_TRAV \ - --path:$NAT_TRAV/src \ - --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ - ${nimDefineArgs} \ - --out:build/liblogosdelivery.a \ - --app:staticlib \ - --threads:on \ - --opt:size \ - --noMain \ - --mm:refc \ - --nimMainPrefix:liblogosdelivery \ - --nimcache:$NIMCACHE \ - liblogosdelivery/liblogosdelivery.nim + ${nimCompile { + outFile = "build/liblogosdelivery.a"; + sourceFile = "liblogosdelivery/liblogosdelivery.nim"; + extraArgs = [ + "--app:staticlib" + "--opt:size" + "--noMain" + "--nimMainPrefix:liblogosdelivery" + ]; + }} + ''} ''; - installPhase = '' + installPhase = if buildWakucanary then '' + runHook preInstall + mkdir -p $out/bin $out/lib + cp build/wakucanary $out/bin/ + runHook postInstall + '' else '' runHook preInstall mkdir -p $out/lib $out/include cp build/liblogosdelivery.${libExt} $out/lib/ 2>/dev/null || true @@ -118,21 +143,47 @@ pkgs.stdenv.mkDerivation { runHook postInstall ''; - # Bundle librln alongside liblogosdelivery so the output is self-contained. + # Bundle librln alongside the produced artifact so the output is self-contained. # Use --add-rpath (not --set-rpath) so fixupPhase's stdenv RUNPATH injection # for libstdc++ is preserved. postInstall = - pkgs.lib.optionalString pkgs.stdenv.isDarwin '' - cp ${zerokitRln}/lib/librln.dylib $out/lib/ - chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib - old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln) - install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib - '' - + pkgs.lib.optionalString pkgs.stdenv.isLinux '' - cp ${zerokitRln}/lib/librln.so $out/lib/ - patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so - ''; + if buildWakucanary then + pkgs.lib.optionalString pkgs.stdenv.isDarwin '' + cp ${zerokitRln}/lib/librln.dylib $out/lib/ + chmod +w $out/lib/librln.dylib $out/bin/wakucanary + install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib + old=$(otool -L $out/bin/wakucanary | awk 'NR>1{print $1}' | grep librln || true) + if [ -n "$old" ]; then + install_name_tool -change "$old" @rpath/librln.dylib $out/bin/wakucanary + fi + install_name_tool -add_rpath @loader_path/../lib $out/bin/wakucanary + '' + + pkgs.lib.optionalString pkgs.stdenv.isLinux '' + cp ${zerokitRln}/lib/librln.so $out/lib/ + patchelf --add-rpath '$ORIGIN/../lib' $out/bin/wakucanary + '' + else + pkgs.lib.optionalString pkgs.stdenv.isDarwin '' + cp ${zerokitRln}/lib/librln.dylib $out/lib/ + chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib + old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln) + install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib + '' + + pkgs.lib.optionalString pkgs.stdenv.isLinux '' + cp ${zerokitRln}/lib/librln.so $out/lib/ + patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so + ''; + + meta = with pkgs.lib; { + description = + if buildWakucanary + then "Waku network canary tool" + else "logos-delivery shared/static library"; + homepage = "https://github.com/logos-messaging/logos-delivery"; + license = licenses.mit; + platforms = platforms.unix; + }; } diff --git a/scripts/install_nim.sh b/scripts/install_nim.sh index c8d0f439d..42aa88ecd 100755 --- a/scripts/install_nim.sh +++ b/scripts/install_nim.sh @@ -17,26 +17,36 @@ if [ -z "${NIM_VERSION}" ]; then exit 1 fi -# Check if the right version is already installed +NIM_DEST="${HOME}/.nim/nim-${NIM_VERSION}" + +# 1. A matching Nim is already on PATH (e.g. provided by CI's setup-nim-action, +# choosenim, or a previous run of this script). Use it as-is: installing over it +# would symlink a freshly downloaded Nim into ~/.nimble/bin (first on PATH) and +# shadow a known-good toolchain, which has caused C-backend build failures. nim_ver=$(nim --version 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1 || true) if [ "${nim_ver}" = "${NIM_VERSION}" ]; then - echo "Nim ${NIM_VERSION} already installed, skipping." + echo "Nim ${NIM_VERSION} already on PATH ($(command -v nim)), skipping install." + exit 0 +fi + +# 2. Already installed at our expected location from a previous run, but not on PATH. +# Re-link binaries into ~/.nimble/bin. +if [ -f "${NIM_DEST}/lib/system.nim" ]; then + echo "Nim ${NIM_VERSION} already installed at ${NIM_DEST}, re-linking binaries." + mkdir -p "${HOME}/.nimble/bin" + for bin_path in "${NIM_DEST}/bin/"*; do + ln -sf "${bin_path}" "${HOME}/.nimble/bin/$(basename "${bin_path}")" + done exit 0 fi if [ -n "${nim_ver}" ]; then - newer=$(printf '%s\n%s\n' "${NIM_VERSION}" "${nim_ver}" | sort -V | tail -1) - if [ "${newer}" = "${nim_ver}" ]; then - echo "WARNING: Nim ${nim_ver} is installed; this repo is validated against ${NIM_VERSION}." >&2 - echo "WARNING: The build will proceed but may behave differently." >&2 - exit 0 - fi + echo "INFO: Nim ${nim_ver} found in PATH; installing Nim ${NIM_VERSION} to ${NIM_DEST}." >&2 fi OS=$(uname -s | tr 'A-Z' 'a-z' | sed 's/darwin/macosx/') ARCH=$(uname -m | sed 's/x86_64/x64/;s/aarch64/arm64/') -NIM_DEST="${HOME}/.nim/nim-${NIM_VERSION}" BINARY_URL="https://nim-lang.org/download/nim-${NIM_VERSION}-${OS}_${ARCH}.tar.xz" WORK_DIR=$(mktemp -d) trap 'rm -rf "${WORK_DIR}"' EXIT @@ -48,9 +58,7 @@ if [ "${HTTP_STATUS}" = "200" ]; then echo "Downloading pre-built binary from ${BINARY_URL}..." curl -fL "${BINARY_URL}" -o "${WORK_DIR}/nim.tar.xz" tar -xJf "${WORK_DIR}/nim.tar.xz" -C "${WORK_DIR}" - rm -rf "${NIM_DEST}" - mkdir -p "${HOME}/.nim" - cp -r "${WORK_DIR}/nim-${NIM_VERSION}" "${NIM_DEST}" + SRC_DIR="${WORK_DIR}/nim-${NIM_VERSION}" else echo "No pre-built binary found for ${OS}_${ARCH}. Building from source..." SRC_URL="https://github.com/nim-lang/Nim/archive/refs/tags/v${NIM_VERSION}.tar.gz" @@ -58,15 +66,19 @@ else tar -xzf "${WORK_DIR}/nim-src.tar.gz" -C "${WORK_DIR}" cd "${WORK_DIR}/Nim-${NIM_VERSION}" sh build_all.sh - rm -rf "${NIM_DEST}" - mkdir -p "${HOME}/.nim" - cp -r "${WORK_DIR}/Nim-${NIM_VERSION}" "${NIM_DEST}" + SRC_DIR="${WORK_DIR}/Nim-${NIM_VERSION}" fi +# rm -rf can fail with "Directory not empty" on overlay filesystems (e.g. Docker). +# Using cp -r src/. dst/ handles both cases: dst absent (clean) or partially present. +rm -rf "${NIM_DEST}" 2>/dev/null || true +mkdir -p "${NIM_DEST}" +cp -r "${SRC_DIR}/." "${NIM_DEST}/" + mkdir -p "${HOME}/.nimble/bin" for bin_path in "${NIM_DEST}/bin/"*; do ln -sf "${bin_path}" "${HOME}/.nimble/bin/$(basename "${bin_path}")" done echo "Nim ${NIM_VERSION} installed to ${NIM_DEST}" -echo "Binaries symlinked in ~/.nimble/bin — ensure it is in your PATH." +echo "Binaries symlinked in ~/.nimble/bin — ensure it is in your PATH." \ No newline at end of file diff --git a/scripts/install_nimble.sh b/scripts/install_nimble.sh new file mode 100755 index 000000000..dba2d6612 --- /dev/null +++ b/scripts/install_nimble.sh @@ -0,0 +1,70 @@ +#!/usr/bin/env bash +# Installs a specific nimble version without using `nimble install nimble`. +# +# `nimble install nimble` is inherently fragile: +# - ETXTBSY: overwriting the running nimble binary in pkgs2/ +# - JSON parse failures with older nimble versions reading packages_official.json +# +# Strategy: +# 1. If the right version is already at ~/.nimble/bin/nimble → done. +# 2. If a previously-compiled binary exists in pkgs2/ → re-link it. +# 3. Otherwise: clone the nimble git repo, init submodules, build with nim, +# and atomically replace the target (mv avoids ETXTBSY on the old binary). + +set -e + +NIMBLE_VERSION="${1:-}" +if [ -z "${NIMBLE_VERSION}" ]; then + echo "Usage: $0 " >&2 + exit 1 +fi + +NIMBLE_BIN="${HOME}/.nimble/bin/nimble" + +# 1. Already installed at the right version? +if [ -x "${NIMBLE_BIN}" ]; then + nimble_ver=$("${NIMBLE_BIN}" --version 2>/dev/null \ + | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1 || true) + if [ "${nimble_ver}" = "${NIMBLE_VERSION}" ]; then + echo "Nimble ${NIMBLE_VERSION} already installed, skipping." + exit 0 + fi +fi + +# 2. Already compiled into pkgs2/ from a previous (possibly partial) run? +PKGS2_NIMBLE=$(ls -dt "${HOME}/.nimble/pkgs2/nimble-${NIMBLE_VERSION}-"*/nimble \ + 2>/dev/null | head -1 || true) +if [ -n "${PKGS2_NIMBLE}" ] && [ -x "${PKGS2_NIMBLE}" ]; then + echo "Nimble ${NIMBLE_VERSION} found in pkgs2, re-linking to ${NIMBLE_BIN}." + mkdir -p "${HOME}/.nimble/bin" + ln -sf "${PKGS2_NIMBLE}" "${NIMBLE_BIN}" + exit 0 +fi + +# 3. Build from source. +NIM_BIN="${HOME}/.nimble/bin/nim" +if [ ! -x "${NIM_BIN}" ]; then + NIM_BIN="$(command -v nim)" +fi + +WORK_DIR="$(mktemp -d)" +trap 'rm -rf "${WORK_DIR}"' EXIT + +echo "Cloning nimble v${NIMBLE_VERSION} with submodules..." +git clone --depth=1 --branch "v${NIMBLE_VERSION}" \ + --recurse-submodules --shallow-submodules \ + https://github.com/nim-lang/nimble.git \ + "${WORK_DIR}/nimble" + +echo "Building nimble ${NIMBLE_VERSION} with $("${NIM_BIN}" --version | head -1)..." +cd "${WORK_DIR}/nimble" +# nim reads nim.cfg / config.nims in the current dir, which sets vendor paths. +"${NIM_BIN}" c -d:release --path:src \ + -o:"${WORK_DIR}/nimble_new" src/nimble.nim + +mkdir -p "${HOME}/.nimble/bin" +# Atomic rename: avoids ETXTBSY when the old binary at NIMBLE_BIN is still running. +cp "${WORK_DIR}/nimble_new" "${NIMBLE_BIN}.new.$$" +mv -f "${NIMBLE_BIN}.new.$$" "${NIMBLE_BIN}" + +echo "Nimble ${NIMBLE_VERSION} installed to ${NIMBLE_BIN}" \ No newline at end of file diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index e64922f4c..963a948a3 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -88,3 +88,6 @@ import ./tools/test_all # Persistency library tests import ./persistency/test_all + +# Reliable Channel API tests +import ./channels/test_all diff --git a/tests/channels/test_all.nim b/tests/channels/test_all.nim new file mode 100644 index 000000000..04b448707 --- /dev/null +++ b/tests/channels/test_all.nim @@ -0,0 +1,3 @@ +{.used.} + +import ./test_reliable_channel_send_receive diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim new file mode 100644 index 000000000..6e189d1be --- /dev/null +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -0,0 +1,153 @@ +{.used.} + +import std/[net] +import chronos, testutils/unittests, stew/byteutils +import brokers/broker_context + +import ../testlib/[common, wakucore, wakunode, testasync] + +import waku +import waku/[waku_node, waku_core] +import waku/factory/waku_conf +import waku/events/message_events as waku_message_events +import tools/confutils/cli_args + +import channels/reliable_channel_manager +import channels/encryption/noop_encryption + +const TestTimeout = chronos.seconds(15) + +proc createApiNodeConf(): WakuNodeConf = + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = cli_args.WakuMode.Core + conf.listenAddress = parseIpAddress("0.0.0.0") + conf.tcpPort = Port(0) + conf.discv5UdpPort = Port(0) + conf.clusterId = 3'u16 + conf.numShardsInNetwork = 1 + conf.reliabilityEnabled = true + conf.rest = false + return conf + +suite "Reliable Channel - ingress": + asyncTest "manager dispatches marked WakuMessage to the right channel": + ## Unit test for the receive side of the API: instead of standing + ## up two libp2p nodes and a relay mesh, we drive the manager + ## directly by emitting a `MessageReceivedEvent` (the exact event + ## the MessagingClient emits when a `WakuMessage` arrives off the + ## wire). The manager must: + ## - drop traffic missing the Reliable Channel spec marker + ## - dispatch the matching channel's `onMessageReceived` + ## - emit `ChannelMessageReceivedEvent` with the payload + const + channelId = ChannelId("test-channel") + contentTopic = ContentTopic("/reliable-channel/test/proto") + let appPayload = "hello reliable channel".toBytes() + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + ## Noop encryption providers so the Encrypt/Decrypt brokers have + ## something to dispatch to; without this the channel falls back to + ## plaintext anyway, but installing them is the documented setup. + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + let received = newFuture[seq[byte]]("channel-message-received") + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if not received.finished() and evt.channelId == channelId: + received.complete(evt.payload) + , + ) + .expect("listen ChannelMessageReceivedEvent") + + ## Build a `WakuMessage` that looks like one that came in off the + ## wire from a peer: the spec marker on `meta` plus the right content + ## topic. The manager's ingress listener should pick it up, + ## decrypt (noop), unwrap SDS (pass-through), reassemble (one + ## segment), and finally emit `ChannelMessageReceivedEvent`. + let inboundMsg = WakuMessage( + payload: appPayload, + contentTopic: contentTopic, + version: 0, + meta: LipWireReliableChannelVersion.toBytes(), + ) + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + ) + + let arrived = await received.withTimeout(TestTimeout) + check arrived + if arrived: + check received.read() == appPayload + + discard await waku.stop() + + asyncTest "manager drops unmarked WakuMessage": + ## Mirror of the above: same content topic, but `meta` is empty + ## (i.e. foreign traffic). The channel-level event must NOT fire. + const + channelId = ChannelId("test-channel-2") + contentTopic = ContentTopic("/reliable-channel/test/proto") + let appPayload = "foreign payload".toBytes() + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + var fired = false + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + fired = true + , + ) + .expect("listen ChannelMessageReceivedEvent") + + let inboundMsg = WakuMessage( + payload: appPayload, + contentTopic: contentTopic, + version: 0, + meta: @[], ## no Reliable Channel spec marker + ) + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + ) + + ## Give the event broker a chance to fan out. + await sleepAsync(100.milliseconds) + check not fired + + discard await waku.stop() diff --git a/tests/node/peer_manager/peer_store/test_migrations.nim b/tests/node/peer_manager/peer_store/test_migrations.nim index a20d065ec..d6b86a15b 100644 --- a/tests/node/peer_manager/peer_store/test_migrations.nim +++ b/tests/node/peer_manager/peer_store/test_migrations.nim @@ -1,11 +1,11 @@ -import std/[options], stew/results, testutils/unittests +import std/[options], results, testutils/unittests import waku/node/peer_manager/peer_store/migrations, ../../waku_archive/archive_utils, ../../testlib/[simple_mock] -import std/[tables, strutils, os], stew/results, chronicles +import std/[tables, strutils, os], results, chronicles import waku/common/databases/db_sqlite, waku/common/databases/common diff --git a/tests/node/peer_manager/peer_store/test_peer_storage.nim b/tests/node/peer_manager/peer_store/test_peer_storage.nim index c8a479178..871df8644 100644 --- a/tests/node/peer_manager/peer_store/test_peer_storage.nim +++ b/tests/node/peer_manager/peer_store/test_peer_storage.nim @@ -1,4 +1,4 @@ -import stew/results, testutils/unittests +import results, testutils/unittests import waku/node/peer_manager/peer_store/peer_storage, waku/waku_core/peers diff --git a/tests/node/test_wakunode_peer_exchange.nim b/tests/node/test_wakunode_peer_exchange.nim index e6649c455..82ca25868 100644 --- a/tests/node/test_wakunode_peer_exchange.nim +++ b/tests/node/test_wakunode_peer_exchange.nim @@ -9,7 +9,8 @@ import libp2p/peerId, libp2p/crypto/crypto, eth/keys, - eth/p2p/discoveryv5/enr + eth/p2p/discoveryv5/enr, + brokers/broker_context import waku/[ @@ -184,114 +185,115 @@ suite "Waku Peer Exchange": suite "Waku Peer Exchange with discv5": asyncTest "Node successfully exchanges px peers with real discv5": - ## Given (copied from test_waku_discv5.nim) - let - # todo: px flag - flags = CapabilitiesBitfield.init( - lightpush = false, filter = false, store = false, relay = true - ) - bindIp = parseIpAddress("0.0.0.0") - extIp = parseIpAddress("127.0.0.1") + lockNewGlobalBrokerContext: + ## Given (copied from test_waku_discv5.nim) + let + # todo: px flag + flags = CapabilitiesBitfield.init( + lightpush = false, filter = false, store = false, relay = true + ) + bindIp = parseIpAddress("0.0.0.0") + extIp = parseIpAddress("127.0.0.1") - nodeKey1 = generateSecp256k1Key() - nodeTcpPort1 = Port(64010) - nodeUdpPort1 = Port(9000) - node1 = newTestWakuNode( - nodeKey1, - bindIp, - nodeTcpPort1, - some(extIp), - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort1), + nodeKey1 = generateSecp256k1Key() + nodeTcpPort1 = Port(64010) + nodeUdpPort1 = Port(9000) + node1 = newTestWakuNode( + nodeKey1, + bindIp, + nodeTcpPort1, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort1), + ) + + nodeKey2 = generateSecp256k1Key() + nodeTcpPort2 = Port(64012) + nodeUdpPort2 = Port(9002) + node2 = newTestWakuNode( + nodeKey2, + bindIp, + nodeTcpPort2, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort2), + ) + + nodeKey3 = generateSecp256k1Key() + nodeTcpPort3 = Port(64014) + nodeUdpPort3 = Port(9004) + node3 = newTestWakuNode( + nodeKey3, + bindIp, + nodeTcpPort3, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort3), + ) + + # discv5 + let conf1 = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: bindIp, + port: nodeUdpPort1, + privateKey: keys.PrivateKey(nodeKey1.skkey), + bootstrapRecords: @[], + autoupdateRecord: true, ) - nodeKey2 = generateSecp256k1Key() - nodeTcpPort2 = Port(64012) - nodeUdpPort2 = Port(9002) - node2 = newTestWakuNode( - nodeKey2, - bindIp, - nodeTcpPort2, - some(extIp), - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort2), + let disc1 = + WakuDiscoveryV5.new(node1.rng, conf1, some(node1.enr), some(node1.peerManager)) + + let conf2 = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: bindIp, + port: nodeUdpPort2, + privateKey: keys.PrivateKey(nodeKey2.skkey), + bootstrapRecords: @[disc1.protocol.getRecord()], + autoupdateRecord: true, ) - nodeKey3 = generateSecp256k1Key() - nodeTcpPort3 = Port(64014) - nodeUdpPort3 = Port(9004) - node3 = newTestWakuNode( - nodeKey3, - bindIp, - nodeTcpPort3, - some(extIp), - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort3), + let disc2 = + WakuDiscoveryV5.new(node2.rng, conf2, some(node2.enr), some(node2.peerManager)) + + await allFutures(node1.start(), node2.start(), node3.start()) + let resultDisc1StartRes = await disc1.start() + assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error + let resultDisc2StartRes = await disc2.start() + assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error + + ## When + var attempts = 10 + while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and + attempts > 0: + await sleepAsync(1.seconds) + attempts -= 1 + + # node2 can be connected, so will be returned by peer exchange + require ( + await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo()) ) - # discv5 - let conf1 = WakuDiscoveryV5Config( - discv5Config: none(DiscoveryConfig), - address: bindIp, - port: nodeUdpPort1, - privateKey: keys.PrivateKey(nodeKey1.skkey), - bootstrapRecords: @[], - autoupdateRecord: true, - ) + # Mount peer exchange + await node1.mountPeerExchange() + await node3.mountPeerExchange() + await node3.mountPeerExchangeClient() - let disc1 = - WakuDiscoveryV5.new(node1.rng, conf1, some(node1.enr), some(node1.peerManager)) + let dialResponse = + await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo()) - let conf2 = WakuDiscoveryV5Config( - discv5Config: none(DiscoveryConfig), - address: bindIp, - port: nodeUdpPort2, - privateKey: keys.PrivateKey(nodeKey2.skkey), - bootstrapRecords: @[disc1.protocol.getRecord()], - autoupdateRecord: true, - ) + check dialResponse.isOk - let disc2 = - WakuDiscoveryV5.new(node2.rng, conf2, some(node2.enr), some(node2.peerManager)) + let + requestPeers = 1 + currentPeers = node3.peerManager.switch.peerStore.peers.len + let res = await node3.fetchPeerExchangePeers(1) + check res.tryGet() == 1 - await allFutures(node1.start(), node2.start(), node3.start()) - let resultDisc1StartRes = await disc1.start() - assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error - let resultDisc2StartRes = await disc2.start() - assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error + # Then node3 has received 1 peer from node1 + check: + node3.peerManager.switch.peerStore.peers.len == currentPeers + requestPeers - ## When - var attempts = 10 - while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and - attempts > 0: - await sleepAsync(1.seconds) - attempts -= 1 - - # node2 can be connected, so will be returned by peer exchange - require ( - await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo()) - ) - - # Mount peer exchange - await node1.mountPeerExchange() - await node3.mountPeerExchange() - await node3.mountPeerExchangeClient() - - let dialResponse = - await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo()) - - check dialResponse.isOk - - let - requestPeers = 1 - currentPeers = node3.peerManager.switch.peerStore.peers.len - let res = await node3.fetchPeerExchangePeers(1) - check res.tryGet() == 1 - - # Then node3 has received 1 peer from node1 - check: - node3.peerManager.switch.peerStore.peers.len == currentPeers + requestPeers - - await allFutures( - [node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()] - ) + await allFutures( + [node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()] + ) diff --git a/tests/node/test_wakunode_relay_rln.nim b/tests/node/test_wakunode_relay_rln.nim index c8ca9b43d..3a2a8a67c 100644 --- a/tests/node/test_wakunode_relay_rln.nim +++ b/tests/node/test_wakunode_relay_rln.nim @@ -2,7 +2,7 @@ import std/[tempfiles, strutils, options], - stew/results, + results, testutils/unittests, chronos, libp2p/switch, diff --git a/tests/test_utils_compat.nim b/tests/test_utils_compat.nim index 121efa4a5..1394982ef 100644 --- a/tests/test_utils_compat.nim +++ b/tests/test_utils_compat.nim @@ -1,7 +1,7 @@ {.used.} import testutils/unittests -import stew/results, waku/waku_core/message, waku/waku_core/time, ./testlib/common +import results, waku/waku_core/message, waku/waku_core/time, ./testlib/common suite "Waku Payload": test "Encode/Decode waku message with timestamp": diff --git a/tests/waku_enr/test_sharding.nim b/tests/waku_enr/test_sharding.nim index 344436d0e..789f8faec 100644 --- a/tests/waku_enr/test_sharding.nim +++ b/tests/waku_enr/test_sharding.nim @@ -1,7 +1,7 @@ {.used.} import - stew/results, + results, chronos, testutils/unittests, libp2p/crypto/crypto as libp2p_keys, diff --git a/tests/waku_peer_exchange/test_protocol.nim b/tests/waku_peer_exchange/test_protocol.nim index 74cdba110..29ec45d1e 100644 --- a/tests/waku_peer_exchange/test_protocol.nim +++ b/tests/waku_peer_exchange/test_protocol.nim @@ -5,7 +5,8 @@ import testutils/unittests, chronos, libp2p/[switch, peerId, crypto/crypto], - eth/[keys, p2p/discoveryv5/enr] + eth/[keys, p2p/discoveryv5/enr], + brokers/broker_context import waku/[ @@ -31,110 +32,113 @@ suite "Waku Peer Exchange": suite "request": asyncTest "Retrieve and provide peer exchange peers from discv5": - ## Given (copied from test_waku_discv5.nim) - let - # todo: px flag - flags = CapabilitiesBitfield.init( - lightpush = false, filter = false, store = false, relay = true - ) - bindIp = parseIpAddress("0.0.0.0") - extIp = parseIpAddress("127.0.0.1") + lockNewGlobalBrokerContext: + ## Given (copied from test_waku_discv5.nim) + let + # todo: px flag + flags = CapabilitiesBitfield.init( + lightpush = false, filter = false, store = false, relay = true + ) + bindIp = parseIpAddress("0.0.0.0") + extIp = parseIpAddress("127.0.0.1") - nodeKey1 = generateSecp256k1Key() - nodeTcpPort1 = Port(64010) - nodeUdpPort1 = Port(9000) - node1 = newTestWakuNode( - nodeKey1, - bindIp, - nodeTcpPort1, - some(extIp), - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort1), + nodeKey1 = generateSecp256k1Key() + nodeTcpPort1 = Port(64010) + nodeUdpPort1 = Port(9000) + node1 = newTestWakuNode( + nodeKey1, + bindIp, + nodeTcpPort1, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort1), + ) + + nodeKey2 = generateSecp256k1Key() + nodeTcpPort2 = Port(64012) + nodeUdpPort2 = Port(9002) + node2 = newTestWakuNode( + nodeKey2, + bindIp, + nodeTcpPort2, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort2), + ) + + nodeKey3 = generateSecp256k1Key() + nodeTcpPort3 = Port(64014) + nodeUdpPort3 = Port(9004) + node3 = newTestWakuNode( + nodeKey3, + bindIp, + nodeTcpPort3, + some(extIp), + wakuFlags = some(flags), + discv5UdpPort = some(nodeUdpPort3), + ) + + # discv5 + let conf1 = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: bindIp, + port: nodeUdpPort1, + privateKey: keys.PrivateKey(nodeKey1.skkey), + bootstrapRecords: @[], + autoupdateRecord: true, ) - nodeKey2 = generateSecp256k1Key() - nodeTcpPort2 = Port(64012) - nodeUdpPort2 = Port(9002) - node2 = newTestWakuNode( - nodeKey2, - bindIp, - nodeTcpPort2, - some(extIp), - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort2), + let disc1 = WakuDiscoveryV5.new( + node1.rng, conf1, some(node1.enr), some(node1.peerManager) ) - nodeKey3 = generateSecp256k1Key() - nodeTcpPort3 = Port(64014) - nodeUdpPort3 = Port(9004) - node3 = newTestWakuNode( - nodeKey3, - bindIp, - nodeTcpPort3, - some(extIp), - wakuFlags = some(flags), - discv5UdpPort = some(nodeUdpPort3), + let conf2 = WakuDiscoveryV5Config( + discv5Config: none(DiscoveryConfig), + address: bindIp, + port: nodeUdpPort2, + privateKey: keys.PrivateKey(nodeKey2.skkey), + bootstrapRecords: @[disc1.protocol.getRecord()], + autoupdateRecord: true, ) - # discv5 - let conf1 = WakuDiscoveryV5Config( - discv5Config: none(DiscoveryConfig), - address: bindIp, - port: nodeUdpPort1, - privateKey: keys.PrivateKey(nodeKey1.skkey), - bootstrapRecords: @[], - autoupdateRecord: true, - ) + let disc2 = WakuDiscoveryV5.new( + node2.rng, conf2, some(node2.enr), some(node2.peerManager) + ) - let disc1 = - WakuDiscoveryV5.new(node1.rng, conf1, some(node1.enr), some(node1.peerManager)) + await allFutures(node1.start(), node2.start(), node3.start()) + let resultDisc1StartRes = await disc1.start() + assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error + let resultDisc2StartRes = await disc2.start() + assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error - let conf2 = WakuDiscoveryV5Config( - discv5Config: none(DiscoveryConfig), - address: bindIp, - port: nodeUdpPort2, - privateKey: keys.PrivateKey(nodeKey2.skkey), - bootstrapRecords: @[disc1.protocol.getRecord()], - autoupdateRecord: true, - ) + ## When + var attempts = 10 + while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and + attempts > 0: + await sleepAsync(1.seconds) + attempts -= 1 - let disc2 = - WakuDiscoveryV5.new(node2.rng, conf2, some(node2.enr), some(node2.peerManager)) + # node2 can be connected, so will be returned by peer exchange + require ( + await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo()) + ) - await allFutures(node1.start(), node2.start(), node3.start()) - let resultDisc1StartRes = await disc1.start() - assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error - let resultDisc2StartRes = await disc2.start() - assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error + # Mount peer exchange + await node1.mountPeerExchange() + await node3.mountPeerExchange() - ## When - var attempts = 10 - while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and - attempts > 0: - await sleepAsync(1.seconds) - attempts -= 1 + let dialResponse = + await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo()) + let response = dialResponse.get() - # node2 can be connected, so will be returned by peer exchange - require ( - await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo()) - ) + ## Then + check: + response.get().peerInfos.len == 1 + response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw - # Mount peer exchange - await node1.mountPeerExchange() - await node3.mountPeerExchange() - - let dialResponse = - await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo()) - let response = dialResponse.get() - - ## Then - check: - response.get().peerInfos.len == 1 - response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw - - await allFutures( - [node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()] - ) + await allFutures( + [node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()] + ) asyncTest "Request returns some discovered peers": let diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 9f1493fe5..d4aef252b 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -48,6 +48,7 @@ import factory/app_callbacks, persistency/persistency, ], + channels/reliable_channel_manager, ./waku_conf, ./waku_state_info @@ -75,6 +76,8 @@ type Waku* = ref object messagingClient*: MessagingClient + reliableChannelManager*: ReliableChannelManager + restServer*: WakuRestServerRef metricsServer*: MetricsHttpServerRef appCallbacks*: AppCallbacks @@ -367,6 +370,19 @@ proc mountMessagingClient*(waku: Waku): Result[void, string] = return err("could not create messaging client: " & $error) return ok() +proc mountReliableChannelManager*(waku: Waku): Result[void, string] = + if not waku.reliableChannelManager.isNil(): + return err("reliable channel manager already mounted") + if waku.messagingClient.isNil(): + return err("reliable channel manager requires a mounted messaging client") + if waku.node.started: + return err("cannot mount reliable channel manager on a started node") + waku.reliableChannelManager = ReliableChannelManager.new( + waku.messagingClient, waku.brokerCtx + ).valueOr: + return err("could not create reliable channel manager: " & $error) + return ok() + proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = if waku.node.started: warn "start: waku node already started" @@ -522,6 +538,10 @@ proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = waku.messagingClient.start().isOkOr: return err("failed to start messaging client: " & $error) + if not waku.reliableChannelManager.isNil(): + waku.reliableChannelManager.start().isOkOr: + return err("failed to start reliable channel manager: " & $error) + return ok() proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = @@ -539,6 +559,9 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = if not waku.wakuDiscv5.isNil(): await waku.wakuDiscv5.stop() + if not waku.reliableChannelManager.isNil(): + await waku.reliableChannelManager.stop() + if not waku.messagingClient.isNil(): await waku.messagingClient.stop() diff --git a/waku/node/node_types.nim b/waku/node/node_types.nim index 1d41a1c97..f5c2a56b6 100644 --- a/waku/node/node_types.nim +++ b/waku/node/node_types.nim @@ -110,5 +110,7 @@ type edgeFilterSubLoopFut*: Future[void] edgeFilterConnectionLoopFut*: Future[void] peerEventListener*: WakuPeerEventListener + ownsEdgeShardHealthProvider*: bool + ownsEdgeFilterPeerCountProvider*: bool {.pop.} diff --git a/waku/node/subscription_manager.nim b/waku/node/subscription_manager.nim index 4ac215acf..98afe2fae 100644 --- a/waku/node/subscription_manager.nim +++ b/waku/node/subscription_manager.nim @@ -627,16 +627,19 @@ proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} = await WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener) proc start*(self: SubscriptionManager): Result[void, string] = - RequestEdgeShardHealth.setProvider( + let edgeShardHealthRes = RequestEdgeShardHealth.setProvider( self.node.brokerCtx, proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] = self.edgeFilterSubStates.withValue(shard, state): return ok(RequestEdgeShardHealth(health: state.currentHealth)) return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)), - ).isOkOr: - error "Can't set provider for RequestEdgeShardHealth", error = error + ) + self.ownsEdgeShardHealthProvider = edgeShardHealthRes.isOk() + if edgeShardHealthRes.isErr(): + error "Can't set provider for RequestEdgeShardHealth", + error = edgeShardHealthRes.error - RequestEdgeFilterPeerCount.setProvider( + let edgeFilterPeerCountRes = RequestEdgeFilterPeerCount.setProvider( self.node.brokerCtx, proc(): Result[RequestEdgeFilterPeerCount, string] = var minPeers = high(int) @@ -645,21 +648,31 @@ proc start*(self: SubscriptionManager): Result[void, string] = if minPeers == high(int): minPeers = 0 return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)), - ).isOkOr: - error "Can't set provider for RequestEdgeFilterPeerCount", error = error + ) + self.ownsEdgeFilterPeerCountProvider = edgeFilterPeerCountRes.isOk() + if edgeFilterPeerCountRes.isErr(): + error "Can't set provider for RequestEdgeFilterPeerCount", + error = edgeFilterPeerCountRes.error - # Start Edge workers if node is in Edge mode (which is - # currently mutually-exclusive with relay being mounted). - if self.node.wakuRelay.isNil(): + # Start Edge workers only when we are in Edge mode (relay not mounted) + # AND the filter client is mounted (otherwise the loops have nothing + # to talk to and just spam "filter client is nil" warnings). + if self.node.wakuRelay.isNil() and not self.node.wakuFilterClient.isNil(): return self.startEdgeFilterLoops() return ok() proc stop*(self: SubscriptionManager) {.async: (raises: []).} = - # Stop Edge workers if node is in Edge mode (which is - # currently mutually-exclusive with relay being mounted). - if self.node.wakuRelay.isNil(): + # Stop Edge workers if we started them in `start` (Edge mode + filter client). + if self.node.wakuRelay.isNil() and not self.node.wakuFilterClient.isNil(): await self.stopEdgeFilterLoops() - RequestEdgeShardHealth.clearProvider(self.node.brokerCtx) - RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx) + # Only clear providers we actually registered: another SubscriptionManager + # sharing this brokerCtx may have won the race, and clearing its provider + # would leave the broker silently provider-less. + if self.ownsEdgeShardHealthProvider: + RequestEdgeShardHealth.clearProvider(self.node.brokerCtx) + self.ownsEdgeShardHealthProvider = false + if self.ownsEdgeFilterPeerCountProvider: + RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx) + self.ownsEdgeFilterPeerCountProvider = false