Merge remote-tracking branch 'origin/master' into feat/simple-lift-messaging

* Refactor channels to fit layering with explicit mount
* Fix channels test to perform the new mount sequence
* Add Waku.reliableChannelManager
* liblogosdelivery_start_node mounts ReliableChannelManager after MessagingClient
* Fix test_wakunode_peer_exchange missing lockNewGlobalBrokerContext
* Fix test_protocol missing lockNewGlobalBrokerContext
* Harden provider setup vs. misconfiguration (improper shared broker context situations)
This commit is contained in:
Fabiana Cecin 2026-05-29 15:07:05 -03:00
commit 8b9ed0c804
No known key found for this signature in database
GPG Key ID: BCAB8A55CB51B6C7
42 changed files with 1492 additions and 387 deletions

View File

@ -3,6 +3,7 @@ name: Daily logos-delivery CI
on:
schedule:
- cron: '30 6 * * *'
workflow_dispatch:
env:
NPROC: 2

View File

@ -43,6 +43,7 @@ jobs:
- 'tools/**'
- 'tests/all_tests_v2.nim'
- 'tests/**'
- 'channels/**'
docker:
- 'docker/**'
@ -176,20 +177,6 @@ jobs:
secrets: inherit
js-waku-node:
needs: build-docker-image
uses: logos-messaging/logos-delivery-js/.github/workflows/test-node.yml@master
with:
nim_wakunode_image: ${{ needs.build-docker-image.outputs.image }}
test_type: node
js-waku-node-optional:
needs: build-docker-image
uses: logos-messaging/logos-delivery-js/.github/workflows/test-node.yml@master
with:
nim_wakunode_image: ${{ needs.build-docker-image.outputs.image }}
test_type: node-optional
lint:
name: "Lint"
runs-on: ubuntu-22.04

View File

@ -16,7 +16,7 @@ Key architectural decisions:
Resource-restricted first: Protocols differentiate between full nodes (relay) and light clients (filter, lightpush, store). Light clients can participate without maintaining full message history or relay capabilities. This explains the client/server split in protocol implementations.
Privacy through unlinkability: RLN (Rate Limiting Nullifier) provides DoS protection while preserving sender anonymity. Messages are routed through pubsub topics with automatic sharding across 8 shards. Code prioritizes metadata privacy alongside content encryption.
Privacy through unlinkability: RLN (Rate Limiting Nullifier) provides DoS protection while preserving sender anonymity. Messages are routed through pubsub topics with automatic content-topic-based sharding (shard count is configurable; generation-zero defaults to 8 shards on cluster 0). Code prioritizes metadata privacy alongside content encryption.
Scalability via sharding: The network uses automatic content-topic-based sharding to distribute traffic. This is why you'll see sharding logic throughout the codebase and why pubsub topic selection is protocol-level, not application-level.
@ -36,7 +36,10 @@ See [documentation](https://docs.waku.org/learn/) for architectural details.
### Key Terminology
- ENR (Ethereum Node Record): Node identity and capability advertisement
- Multiaddr: libp2p addressing format (e.g., `/ip4/127.0.0.1/tcp/60000/p2p/16Uiu2...`)
- PubsubTopic: Gossipsub topic for message routing (e.g., `/waku/2/default-waku/proto`)
- PubsubTopic: Gossipsub topic for message routing (shard-based, e.g., `/waku/2/rs/<cluster-id>/<shard-id>`; the default is `/waku/2/rs/0/0`)
- cluster-id: network id
- shard-id: shard differentiator inside the network - drivers mesh forming.
- autosharding: network supports n (configured) shards [0..n-1], shard derived from ContentTopic
- ContentTopic: Application-level message categorization (e.g., `/my-app/1/chat/proto`)
- Sharding: Partitioning network traffic across topics (static or auto-sharding)
- RLN (Rate Limiting Nullifier): Zero-knowledge proof system for spam prevention
@ -77,29 +80,29 @@ type WakuFilter* = ref object of LPProtocol
### Build Requirements
- Nim 2.x (check `waku.nimble` for minimum version)
- Rust toolchain (required for RLN dependencies)
- Build system: Make with nimbus-build-system
- Build system: Make driven by Nimble (dependencies pinned in `nimble.lock`)
### Build System
The project uses Makefile with nimbus-build-system (Status's Nim build framework):
The project uses a Makefile that drives Nimble. Dependencies are resolved from
`nimble.lock` into a local `nimbledeps/` directory (tracked by the
`NIMBLEDEPS_STAMP` target).
```bash
# Initial build (updates submodules)
# Initial build (resolves Nimble deps automatically)
make wakunode2
# After git pull, update submodules
make update
# Build with custom flags
make wakunode2 NIMFLAGS="-d:chronicles_log_level=DEBUG"
```
Note: The build system uses `--mm:refc` memory management (automatically enforced). Only relevant if compiling outside the standard build system.
Note: The build uses `--mm:refc` memory management (passed automatically by the Nimble tasks in `waku.nimble`). Only relevant if compiling outside the standard build system.
### Common Make Targets
```bash
make wakunode2 # Build main node binary
make test # Run all tests
make testcommon # Run common tests only
make libwakuStatic # Build static C library
make libwaku # Build the legacy C library (libwaku)
make liblogosdelivery. # Build actual C FFI library
make chat2 # Build chat example
make install-nph # Install git hook for auto-formatting
```
@ -127,7 +130,7 @@ suite "Waku ENR - Capabilities":
test "check capabilities support":
## Given
let bitfield: CapabilitiesBitfield = 0b0000_1101u8
## Then
check:
bitfield.supportsCapability(Capabilities.Relay)
@ -135,7 +138,7 @@ suite "Waku ENR - Capabilities":
```
### Code Formatting
Mandatory: All code must be formatted with `nph` (vendored in `vendor/nph`)
Mandatory: All code must be formatted with `nph` (installed via `make build-nph`, which fetches a pinned `nph` version with Nimble)
```bash
# Format specific file
make nph/waku/waku_core.nim
@ -162,7 +165,6 @@ Compile with log level:
nim c -d:chronicles_log_level=TRACE myfile.nim
```
## Code Conventions
Common pitfalls:
@ -181,8 +183,13 @@ Common pitfalls:
- Exceptions: `XxxError` for CatchableError, `XxxDefect` for Defect
- ref object types: `XxxRef` suffix
### Calls and Member Access
- Prefer dot call syntax for predicates: `x.isNil()` instead of `isNil(x)`
- Use parentheses for "verbs" (operations/actions): `isSome()`, `handleRequest()`
- Omit parentheses for "nouns" (properties/values): `.len`, `.high`
### Imports Organization
Group imports: stdlib, external libs, internal modules:
Stdlib + external in one `import` block, internal modules in a separate block:
```nim
import
std/[options, sequtils], # stdlib
@ -214,11 +221,11 @@ proc subscribe(
): Future[FilterSubscribeResult] {.async.} =
if contentTopics.len > MaxContentTopicsPerRequest:
return err(FilterSubscribeError.badRequest("exceeds maximum"))
# Handle Result with isOkOr
(await wf.subscriptions.addSubscription(peerId, criteria)).isOkOr:
return err(FilterSubscribeError.serviceUnavailable(error))
ok()
```
@ -460,8 +467,7 @@ nim c -r \
### Vendor Directory
- Never edit files directly in vendor - it is auto-generated from git submodules
- Always run `make update` after pulling changes
- Managed by `nimbus-build-system`
- Nimble dependencies are resolved from `nimble.lock` into `nimbledeps/`
### Chronicles Performance
- Log levels are configured at compile time for performance
@ -475,7 +481,7 @@ nim c -r \
### RLN Dependencies
- RLN code requires a Rust toolchain, which explains Rust imports in some modules
- Pre-built `librln` libraries are checked into the repository
- `librln` is built from the vendored `zerokit` submodule via the `librln`/`rln-deps` Make targets
## Quick Reference
@ -483,18 +489,19 @@ Language: Nim 2.x | License: MIT or Apache 2.0
### Important Files
- `Makefile` - Primary build interface
- `waku.nimble` - Package definition and build tasks (called via nimbus-build-system)
- `vendor/nimbus-build-system/` - Status's build framework
- `waku.nimble` - Package definition and build tasks (invoked by the Makefile via Nimble)
- `nimble.lock` - Pinned dependency versions resolved into `nimbledeps/`
- `waku/node/waku_node.nim` - Core node implementation
- `apps/wakunode2/wakunode2.nim` - Main CLI application
- `waku/factory/waku_conf.nim` - Configuration types
- `library/libwaku.nim` - C bindings entry point
- `liblogosdelivery/liblogosdelivery.nim` - C bindings entry point
### Testing Entry Points
- `tests/all_tests_waku.nim` - All Waku protocol tests
- `tests/all_tests_wakunode2.nim` - Node application tests
- `tests/all_tests_common.nim` - Common utilities tests
#### in-flight testing
- any test can be run separately by issuing `make test tests/<relativepath>/<unit-test-source>.nim`
### Key Dependencies
- `chronos` - Async framework
- `nim-results` - Result type for error handling

View File

@ -9,7 +9,7 @@
## bearssl (nimbledeps) ##
###########################
# Rebuilds libbearssl.a from the package installed by nimble under
# nimbledeps/pkgs2/. Used by `make update` / $(NIMBLEDEPS_STAMP).
# nimbledeps/pkgs2/. Invoked via $(NIMBLEDEPS_STAMP) / build-deps.
#
# BEARSSL_NIMBLEDEPS_DIR is evaluated at parse time, so targets that
# depend on it must be invoked via a recursive $(MAKE) call so the sub-make
@ -29,18 +29,11 @@ else
PORTABLE_BEARSSL_CFLAGS := -W -Wall -Os -fPIC
endif
.PHONY: clean-bearssl-nimbledeps rebuild-bearssl-nimbledeps
.PHONY: rebuild-bearssl-nimbledeps
clean-bearssl-nimbledeps:
rebuild-bearssl-nimbledeps:
ifeq ($(BEARSSL_NIMBLEDEPS_DIR),)
$(error No bearssl package found under nimbledeps/pkgs2/ — run 'make update' first)
endif
+ [ -e "$(BEARSSL_CSOURCES_DIR)/build" ] && \
"$(MAKE)" -C "$(BEARSSL_CSOURCES_DIR)" clean || true
rebuild-bearssl-nimbledeps: | clean-bearssl-nimbledeps
ifeq ($(BEARSSL_NIMBLEDEPS_DIR),)
$(error No bearssl package found under nimbledeps/pkgs2/ — run 'make update' first)
$(error No bearssl package found under nimbledeps/pkgs2/ — run 'make build-deps' first)
endif
@echo "Rebuilding bearssl from $(BEARSSL_CSOURCES_DIR)"
+ "$(MAKE)" -C "$(BEARSSL_CSOURCES_DIR)" CFLAGS="$(PORTABLE_BEARSSL_CFLAGS)" lib

1
CLAUDE.md Normal file
View File

@ -0,0 +1 @@
@AGENTS.md

View File

@ -24,6 +24,7 @@ export PATH := $(HOME)/.nimble/bin:$(PATH)
# NIM binary location
NIM_BINARY := $(shell which nim 2>/dev/null)
NPH := $(HOME)/.nimble/bin/nph
NIMBLE := $(HOME)/.nimble/bin/nimble
NIMBLEDEPS_STAMP := nimbledeps/.nimble-setup
# Compilation parameters
@ -42,7 +43,8 @@ endif
##########
## Main ##
##########
.PHONY: all test update clean examples deps nimble install-nim install-nimble
# The Makefile automatically bootstraps dependency setup when needed for build and test targets.
.PHONY: all test clean examples deps nimble install-nim install-nimble
# default target
all: | wakunode2 libwaku liblogosdelivery
@ -69,18 +71,16 @@ endif
waku.nims:
ln -s waku.nimble $@
$(NIMBLEDEPS_STAMP): nimble.lock | waku.nims
$(MAKE) install-nimble
nimble setup --localdeps
$(MAKE) build-nph
$(MAKE) rebuild-bearssl-nimbledeps
$(MAKE) rebuild-nat-libs-nimbledeps
$(NIMBLEDEPS_STAMP): nimble.lock | install-nimble build-nph waku.nims
$(NIMBLE) setup --localdeps
touch $@
update:
rm -f $(NIMBLEDEPS_STAMP)
$(MAKE) $(NIMBLEDEPS_STAMP)
nimble lock
# Must be phony so the recipe always runs and the sub-make re-evaluates
# BEARSSL_NIMBLEDEPS_DIR / NAT_TRAVERSAL_NIMBLEDEPS_DIR (parse-time variables)
# after nimble setup has populated nimbledeps/.
.PHONY: build-deps
build-deps: | $(NIMBLEDEPS_STAMP)
$(MAKE) rebuild-bearssl-nimbledeps rebuild-nat-libs-nimbledeps
clean:
rm -rf build 2> /dev/null || true
@ -93,15 +93,14 @@ REQUIRED_NIM_VERSION := $(shell grep -E '^const RequiredNimVersion\s*=' waku.
REQUIRED_NIMBLE_VERSION := $(shell grep -E '^const RequiredNimbleVersion\s*=' waku.nimble | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"')
install-nim:
ifneq ($(detected_OS),Windows)
scripts/install_nim.sh $(REQUIRED_NIM_VERSION)
endif
install-nimble: install-nim
@nimble_ver=$$(nimble --version 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1); \
if [ "$$nimble_ver" = "$(REQUIRED_NIMBLE_VERSION)" ]; then \
echo "nimble $(REQUIRED_NIMBLE_VERSION) already installed, skipping."; \
else \
cd $$(mktemp -d) && nimble install "nimble@$(REQUIRED_NIMBLE_VERSION)" -y; \
fi
ifneq ($(detected_OS),Windows)
scripts/install_nimble.sh $(REQUIRED_NIMBLE_VERSION)
endif
build:
mkdir -p build
@ -203,7 +202,7 @@ clean: | clean-librln
#################
.PHONY: testcommon
testcommon: | $(NIMBLEDEPS_STAMP) build
testcommon: | build-deps build
echo -e $(BUILD_MSG) "build/$@" && \
nimble testcommon
@ -212,59 +211,59 @@ testcommon: | $(NIMBLEDEPS_STAMP) build
##########
.PHONY: testwaku wakunode2 testwakunode2 example2 chat2 chat2bridge liteprotocoltester
testwaku: | $(NIMBLEDEPS_STAMP) build rln-deps librln
testwaku: | build-deps build rln-deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble test
wakunode2: | $(NIMBLEDEPS_STAMP) build deps librln
wakunode2: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble wakunode2
benchmarks: | $(NIMBLEDEPS_STAMP) build deps librln
benchmarks: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble benchmarks
testwakunode2: | $(NIMBLEDEPS_STAMP) build deps librln
testwakunode2: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble testwakunode2
example2: | $(NIMBLEDEPS_STAMP) build deps librln
example2: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble example2
chat2: | $(NIMBLEDEPS_STAMP) build deps librln
chat2: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble chat2
chat2mix: | $(NIMBLEDEPS_STAMP) build deps librln
chat2mix: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble chat2mix
rln-db-inspector: | $(NIMBLEDEPS_STAMP) build deps librln
rln-db-inspector: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble rln_db_inspector
chat2bridge: | $(NIMBLEDEPS_STAMP) build deps librln
chat2bridge: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble chat2bridge
liteprotocoltester: | $(NIMBLEDEPS_STAMP) build deps librln
liteprotocoltester: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble liteprotocoltester
lightpushwithmix: | $(NIMBLEDEPS_STAMP) build deps librln
lightpushwithmix: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble lightpushwithmix
api_example: | $(NIMBLEDEPS_STAMP) build deps librln
api_example: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
$(ENV_SCRIPT) nim api_example $(NIM_PARAMS) waku.nims
build/%: | $(NIMBLEDEPS_STAMP) build deps librln
build/%: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$*" && \
nimble buildone $*
compile-test: | $(NIMBLEDEPS_STAMP) build deps librln
compile-test: | build-deps build deps librln
echo -e $(BUILD_MSG) "$(TEST_FILE)" "\"$(TEST_NAME)\"" && \
nimble buildTest $(TEST_FILE) && \
nimble execTest $(TEST_FILE) "\"$(TEST_NAME)\""
@ -276,11 +275,11 @@ compile-test: | $(NIMBLEDEPS_STAMP) build deps librln
tools: networkmonitor wakucanary
wakucanary: | $(NIMBLEDEPS_STAMP) build deps librln
wakucanary: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble wakucanary
networkmonitor: | $(NIMBLEDEPS_STAMP) build deps librln
networkmonitor: | build-deps build deps librln
echo -e $(BUILD_MSG) "build/$@" && \
nimble networkmonitor
@ -424,10 +423,10 @@ else ifeq ($(detected_OS),Linux)
BUILD_COMMAND := $(BUILD_COMMAND)Linux
endif
libwaku: | $(NIMBLEDEPS_STAMP) librln
libwaku: | build-deps librln
nimble --verbose libwaku$(BUILD_COMMAND) waku.nimble
liblogosdelivery: | $(NIMBLEDEPS_STAMP) librln
liblogosdelivery: | build-deps librln
nimble --verbose liblogosdelivery$(BUILD_COMMAND) waku.nimble
logosdelivery_example: | build liblogosdelivery

19
Nat.mk
View File

@ -9,7 +9,7 @@
## nat-libs (nimbledeps) ##
###########################
# Builds miniupnpc and libnatpmp from the package installed by nimble under
# nimbledeps/pkgs2/. Used by `make update` / $(NIMBLEDEPS_STAMP).
# nimbledeps/pkgs2/. Invoked via $(NIMBLEDEPS_STAMP) / build-deps.
#
# NAT_TRAVERSAL_NIMBLEDEPS_DIR is evaluated at parse time, so targets that
# depend on it must be invoked via a recursive $(MAKE) call so the sub-make
@ -28,20 +28,11 @@ else
PORTABLE_NAT_MARCH :=
endif
.PHONY: clean-cross-nimbledeps rebuild-nat-libs-nimbledeps
.PHONY: rebuild-nat-libs-nimbledeps
clean-cross-nimbledeps:
rebuild-nat-libs-nimbledeps:
ifeq ($(NAT_TRAVERSAL_NIMBLEDEPS_DIR),)
$(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make update' first)
endif
+ [ -e "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/miniupnp/miniupnpc" ] && \
"$(MAKE)" -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/miniupnp/miniupnpc" CC=$(CC) clean $(HANDLE_OUTPUT) || true
+ [ -e "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" ] && \
"$(MAKE)" -C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" CC=$(CC) clean $(HANDLE_OUTPUT) || true
rebuild-nat-libs-nimbledeps: | clean-cross-nimbledeps
ifeq ($(NAT_TRAVERSAL_NIMBLEDEPS_DIR),)
$(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make update' first)
$(error No nat_traversal package found under nimbledeps/pkgs2/ — run 'make build-deps' first)
endif
@echo "Rebuilding nat-libs from $(NAT_TRAVERSAL_NIMBLEDEPS_DIR)"
ifeq ($(OS), Windows_NT)
@ -58,4 +49,4 @@ else
+ "$(MAKE)" CFLAGS="-Wall -Wno-cpp -Os -fPIC $(PORTABLE_NAT_MARCH) -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4 $(CFLAGS)" \
-C "$(NAT_TRAVERSAL_NIMBLEDEPS_DIR)/vendor/libnatpmp-upstream" \
CC=$(CC) libnatpmp.a $(HANDLE_OUTPUT)
endif
endif

View File

@ -1,7 +1,7 @@
{.push raises: [].}
import
std/[tables, times, strutils, hashes, sequtils, json],
std/[tables, times, strutils, hashes, sequtils, json, options],
chronos,
confutils,
chronicles,
@ -267,10 +267,16 @@ when isMainModule:
else:
nodev2ExtPort
let nodev2Key =
if conf.nodekey.isSome():
conf.nodekey.get()
else:
crypto.PrivateKey.random(Secp256k1, rng[]).tryGet()
let bridge = Chat2Matterbridge.new(
mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)),
mbGateway = conf.mbGateway,
nodev2Key = conf.nodekey,
nodev2Key = nodev2Key,
nodev2BindIp = conf.listenAddress,
nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift),
nodev2ExtIp = nodev2ExtIp,

View File

@ -1,4 +1,5 @@
import
std/options,
confutils,
confutils/defs,
confutils/std/net,
@ -45,7 +46,7 @@ type Chat2MatterbridgeConf* = object
metricsServerAddress* {.
desc: "Listening address of the metrics server",
defaultValue: parseIpAddress("127.0.0.1"),
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
name: "metrics-server-address"
.}: IpAddress
@ -62,10 +63,8 @@ type Chat2MatterbridgeConf* = object
.}: seq[string]
nodekey* {.
desc: "P2P node private key as hex",
defaultValue: crypto.PrivateKey.random(Secp256k1, newRng()[]).tryGet(),
name: "nodekey"
.}: crypto.PrivateKey
desc: "P2P node private key as hex", defaultValueDesc: "random", name: "nodekey"
.}: Option[crypto.PrivateKey]
store* {.
desc: "Flag whether to start store protocol", defaultValue: true, name: "store"
@ -94,7 +93,7 @@ type Chat2MatterbridgeConf* = object
# Matterbridge options
mbHostAddress* {.
desc: "Listening address of the Matterbridge host",
defaultValue: parseIpAddress("127.0.0.1"),
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
name: "mb-host-address"
.}: IpAddress

View File

@ -162,7 +162,8 @@ type
metricsServerAddress* {.
desc: "Listening address of the metrics server.",
defaultValue: parseIpAddress("127.0.0.1"),
defaultValue:
IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
name: "metrics-server-address"
.}: IpAddress
@ -194,7 +195,10 @@ type
dnsDiscoveryNameServers* {.
desc: "DNS name server IPs to query. Argument may be repeated.",
defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
defaultValue: @[
IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1]),
IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 0, 0, 1]),
],
name: "dns-discovery-name-server"
.}: seq[IpAddress]

View File

@ -133,7 +133,7 @@ type LiteProtocolTesterConf* = object
## Tester REST service configuration
restAddress* {.
desc: "Listening address of the REST HTTP server.",
defaultValue: parseIpAddress("127.0.0.1"),
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
name: "rest-address"
.}: IpAddress

View File

@ -116,7 +116,7 @@ type NetworkMonitorConf* = object
metricsServerAddress* {.
desc: "Listening address of the metrics server.",
defaultValue: parseIpAddress("127.0.0.1"),
defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]),
name: "metrics-server-address"
.}: IpAddress

View File

@ -0,0 +1,25 @@
## Optional encryption hooks for the Reliable Channel API.
##
## Modelled as `RequestBroker`s: the broker pattern lets the channel
## delegate work to a provider that may live in any module without
## introducing a direct dependency. If no provider is registered the
## broker returns an error, so installing the noop providers from
## `noop_encryption` is required when the application does not want
## actual encryption.
##
## Applied per-segment after SDS processing on outgoing, and before
## SDS processing on incoming. No specific scheme is mandated.
##
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
import brokers/request_broker
export request_broker
RequestBroker:
type Encrypt* = seq[byte]
proc signature*(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.}
RequestBroker:
type Decrypt* = seq[byte]
proc signature*(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.}

View File

@ -0,0 +1,18 @@
## No-op encryption providers. Install these when the application does
## not want actual encryption so the `Encrypt` / `Decrypt` brokers have
## something to dispatch to.
import results
import chronos
import ./encryption
proc setNoopEncryption*() =
discard Encrypt.setProvider(
proc(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} =
return ok(Encrypt(payload))
)
discard Decrypt.setProvider(
proc(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} =
return ok(Decrypt(payload))
)

23
channels/events.nim Normal file
View File

@ -0,0 +1,23 @@
## Reliable Channel event types emitted to API consumers.
##
## Lifecycle events for individual segments (sent / propagated / errored)
## are the same as the network-level ones the MessagingClient already
## emits — `requestId` is shared across layers — so we just re-export
## `waku/events/message_events` and avoid declaring duplicates.
##
## Only the channel-level `MessageReceivedEvent` carries data that has
## no analogue in the lower layer (reassembled application payload,
## senderId, channelId), so it lives here.
import waku/events/message_events as waku_message_events
import brokers/event_broker
import ./types as channel_types
export waku_message_events, channel_types, event_broker
EventBroker:
type ChannelMessageReceivedEvent* = object
channelId*: ChannelId
senderId*: SdsParticipantID
payload*: seq[byte]

View File

@ -0,0 +1,80 @@
## Rate Limit Manager for the Reliable Channel API.
##
## Tracks messages sent per RLN epoch and delays dispatch when the
## limit is approached, ensuring RLN compliance on enforcing relays.
##
## For the skeleton this is a pass-through: messages are immediately
## released as ready-to-send. Real epoch budgeting will be added later.
##
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
import std/times
import message
import brokers/event_broker
import brokers/broker_context
export event_broker, broker_context
export message.SdsChannelID
const
DefaultEpochPeriodSec* = 600
DefaultMessagesPerEpoch* = 1
EventBroker:
## Emitted by `enqueueToSend` carrying the batch of opaque message
## blobs that may now leave the rate limiter and continue down the
## outgoing pipeline (encryption -> dispatch). Bytes only: the rate
## limiter is intentionally agnostic of SDS, so anything serialisable
## can flow through it.
##
## `channelId` lets listeners filter to their own channel, since all
## reliable channels share the underlying Waku node's broker context.
type ReadyToSendEvent* = object
channelId*: SdsChannelID
msgs*: seq[seq[byte]]
type
RateLimitConfig* = object
enabled*: bool ## spec: rate limiting opt-in; SHOULD be true when RLN active
epochPeriodSec*: int
messagesPerEpoch*: int
RateLimitManager* = ref object
config*: RateLimitConfig
queue*: seq[seq[byte]]
currentEpochStart*: Time
sentInCurrentEpoch*: int
channelId*: SdsChannelID ## tag for the emitted `ReadyToSendEvent`
brokerCtx: BrokerContext
proc new*(
T: type RateLimitManager,
config: RateLimitConfig,
channelId: SdsChannelID,
brokerCtx: BrokerContext = globalBrokerContext(),
): T =
return T(
config: config,
queue: @[],
currentEpochStart: getTime(),
sentInCurrentEpoch: 0,
channelId: channelId,
brokerCtx: brokerCtx,
)
proc enqueueToSend*(self: RateLimitManager, msg: seq[byte]) =
## Skeleton behaviour: enqueue and immediately release as a single
## ready batch. Real per-epoch budgeting will park messages on
## `self.queue` and emit only when the budget allows.
ReadyToSendEvent.emit(
self.brokerCtx, ReadyToSendEvent(channelId: self.channelId, msgs: @[msg])
)
proc dequeueReady*(self: RateLimitManager): seq[seq[byte]] =
## Returns the set of queued messages that may be dispatched now
## without exceeding the configured rate limit.
discard
proc resetEpoch*(self: RateLimitManager) =
self.currentEpochStart = getTime()
self.sentInCurrentEpoch = 0

View File

@ -0,0 +1,264 @@
## Reliable Channel type.
##
## A `ReliableChannel` orchestrates segmentation, SDS (end-to-end
## reliability), optional encryption, and rate-limited dispatch on top
## of the Messaging API for a single channel.
##
## Outgoing pipeline: Segment -> SDS -> Rate Limit -> Encrypt -> Dispatch
## Incoming pipeline: Decrypt -> SDS -> Reassemble -> Emit event
##
## Channels are owned by a `ReliableChannelManager`. Lifecycle and send
## operations are addressed by `ChannelId`, so callers only need to keep
## an opaque handle around.
##
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
import std/[options, tables]
import results
import chronos
import bearssl/rand
import stew/byteutils
import libp2p/crypto/crypto as libp2p_crypto
import waku/messaging_client
import waku/node/delivery_service/send_service
import waku/waku_core/topics
import ./events
import ./segmentation/segmentation
import ./scalable_data_sync/scalable_data_sync
import ./rate_limit_manager/rate_limit_manager
import ./encryption/encryption
export
messaging_client, send_service, events, segmentation, scalable_data_sync,
rate_limit_manager, encryption
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## Wire-format spec marker for the Reliable Channel layer, as defined
## in the reliable-channel-api LIP (`Wire Format / Spec Marker`).
## A `WakuMessage` whose `meta` field does not equal these bytes is
## not addressed to this layer and is silently dropped on ingress.
## The trailing `/N` is the wire-format version and is bumped only
## on breaking on-the-wire changes; implementations pin one version.
type ReliableChannel* = ref object
## Spec-defined public type. Fields are private so callers cannot
## mutate internals and break invariants. Getters are added below
## for the few values consumers may need.
messagingClient: MessagingClient
channelId: ChannelId
contentTopic: ContentTopic
senderId: SdsParticipantID
rng: ref HmacDrbgContext
segmentation: SegmentationHandler
sdsHandler: SdsHandler
rateLimit: RateLimitManager
requestIds: Table[RequestId, seq[RequestId]]
pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]]
brokerCtx: BrokerContext
## Captured here so the channel emits `ChannelMessageReceivedEvent`
## on the same broker context the owning manager registered its
## listeners on. Without this, an emit via `globalBrokerContext()`
## would land on whatever context happens to be thread-local at
## emit time, which is not necessarily the manager's.
func getChannelId*(self: ReliableChannel): ChannelId {.inline.} =
self.channelId
func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} =
self.contentTopic
func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} =
self.senderId
proc onReadyToSend(
self: ReliableChannel, msgs: seq[seq[byte]]
) {.async: (raises: []).} =
## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent`
## listener once `rate_limit_manager` releases a batch of opaque
## blobs (already-encoded SDS messages):
##
## ... -> rate_limit_manager -> [encryption] -> dispatch
for m in msgs:
## Each `m` was preceded by exactly one push onto `pendingRequests`
## in `send`, so this pop is always safe in the current skeleton.
let pending = self.pendingRequests[0]
self.pendingRequests.delete(0)
## TODO: revisit which fields of the SDS message must be encrypted.
## Encrypting the whole encoded blob forces every receiver to attempt
## decryption before it can route, which breaks selective dispatch.
## Leave routing metadata (channelId, causal-history references) in
## clear and encrypt only the application payload.
let encRes = await Encrypt.request(m)
let encrypted = encRes.valueOr:
MessageErrorEvent.emit(
self.brokerCtx,
MessageErrorEvent(
requestId: pending.parent,
messageHash: "",
error: "encryption failed: " & error,
),
)
continue
let wireBytes = seq[byte](encrypted)
let envelope = MessageEnvelope(
contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral
)
let deliveryReqId = RequestId.new(self.rng)
let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr:
## TODO: emit waku `MessageErrorEvent` for the parent request id.
continue
## Stamp the Reliable Channel wire-format spec marker so the ingress
## side of any peer can route this WakuMessage to its Reliable
## Channel layer. Done on the constructed WakuMessage rather than
## via the envelope because `MessageEnvelope` does not expose a
## `meta` field.
deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes()
asyncSpawn self.messagingClient.sendService.send(deliveryTask)
self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId)
proc send*(
self: ReliableChannel, payload: seq[byte], ephemeral: bool = false
): Result[RequestId, string] =
## Single application-level send. The first three stages of the
## outgoing pipeline are chained explicitly so the flow is visible
## at a glance:
##
## segmentation -> sds -> rate_limit_manager
##
## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with
## the SDS messages cleared for transmission; the channel's listener
## then runs the final stage (encryption -> dispatch). The `ephemeral`
## flag is carried alongside each segment in `pendingRequests` and
## stamped onto the eventual `MessageEnvelope`.
##
## The returned `RequestId` is the parent of one-or-more
## delivery-service `RequestId`s; the mapping is recorded in
## `self.requestIds`.
if payload.len == 0:
return err("empty payload")
let parentReqId = RequestId.new(self.rng)
self.requestIds[parentReqId] = @[]
for segmentBytes in self.segmentation.performSegmentation(payload):
## Segments arrive already encoded; the segmentation module owns
## the wire format so SDS only ever sees opaque bytes.
let sdsBytes = self.sdsHandler.wrapOutgoing(
self.channelId, self.senderId, segmentBytes
).valueOr:
return err("SDS wrap failed: " & error)
self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral))
self.rateLimit.enqueueToSend(sdsBytes)
return ok(parentReqId)
proc onMessageReceived(
self: ReliableChannel, messageHash: string, payload: seq[byte]
) {.async: (raises: []).} =
## Ingress pipeline made visible:
##
## payload -> decrypt -> sds -> reassemble -> emit
##
## Invoked from this channel's `MessageReceivedEvent` listener, which
## already filtered on the spec marker and on `contentTopic`. The
## channel only sees the raw payload bytes for itself.
## Notice that the following "request" is implemented implicitly as a broker call to
## the `Decrypt` request broker.
let decRes = await Decrypt.request(payload)
let plaintext = decRes.valueOr:
MessageErrorEvent.emit(
self.brokerCtx,
MessageErrorEvent(
requestId: RequestId(""),
messageHash: messageHash,
error: "decryption failed: " & error,
),
)
return
let plaintextBytes = seq[byte](plaintext)
let unwrapped = self.sdsHandler.handleIncoming(plaintextBytes)
if unwrapped.isErr():
return
let reassembled = self.segmentation.handleIncomingSegment(unwrapped.get().content)
if reassembled.isSome():
## Emit on the captured `brokerCtx` (the manager's), so the
## application listener that the manager has set up on that same
## context picks the event up.
ChannelMessageReceivedEvent.emit(
self.brokerCtx,
ChannelMessageReceivedEvent(
channelId: self.channelId,
senderId: self.senderId,
payload: reassembled.get().payload,
),
)
proc new*(
T: type ReliableChannel,
messagingClient: MessagingClient,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
segConfig: SegmentationConfig,
sdsConfig: SdsConfig,
rateConfig: RateLimitConfig,
brokerCtx: BrokerContext = globalBrokerContext(),
): T =
## Pipeline handlers (segmentation/SDS/rate-limit) are constructed
## inside the channel rather than handed in by the caller — they are
## implementation details of the channel, not knobs the API consumer
## should be wiring up. Encryption is delegated to the `Encrypt`/
## `Decrypt` request brokers, so the channel keeps no per-instance
## encryption state either.
let chn = T(
messagingClient: messagingClient,
channelId: channelId,
contentTopic: contentTopic,
senderId: senderId,
rng: libp2p_crypto.newRng(),
segmentation: SegmentationHandler.new(segConfig),
sdsHandler: SdsHandler.new(sdsConfig, senderId),
rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx),
requestIds: initTable[RequestId, seq[RequestId]](),
pendingRequests: @[],
brokerCtx: brokerCtx,
)
## Each channel owns its own egress + ingress listeners on
## `chn.brokerCtx`, filtered to traffic addressed to this channel.
## Keeping the listeners (and the procs they call) inside the
## channel lets `onReadyToSend` and `onMessageReceived` stay private
## — the manager doesn't need to know about them.
discard ReadyToSendEvent.listen(
chn.brokerCtx,
proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} =
if evt.channelId == chn.channelId:
await chn.onReadyToSend(evt.msgs)
,
)
discard MessageReceivedEvent.listen(
chn.brokerCtx,
proc(evt: MessageReceivedEvent): Future[void] {.async: (raises: []).} =
## Drop foreign traffic (non-Reliable-Channel `meta`) and traffic
## for other channels before doing any decode work.
if string.fromBytes(evt.message.meta) != LipWireReliableChannelVersion:
return
if evt.message.contentTopic != chn.contentTopic:
return
await chn.onMessageReceived(evt.messageHash, evt.message.payload)
,
)
return chn

View File

@ -0,0 +1,133 @@
## Reliable Channel API entry point.
##
## Owns the set of `ReliableChannel` instances and exposes lifecycle and
## send/receive operations addressed by `ChannelId`.
##
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
import std/tables
import results
import chronos
import stew/byteutils
import brokers/broker_context
import waku/events/message_events as waku_message_events
import waku/messaging_client
import waku/waku_core/topics
import ./reliable_channel
import ./encryption/noop_encryption
export reliable_channel
type ReliableChannelManager* = ref object
channels: Table[ChannelId, ReliableChannel]
messagingClient: MessagingClient
## Borrowed from the owning `Waku`. The ownership chain is
## Waku -> ReliableChannelManager -> MessagingClient (also Waku-owned).
## Hidden so callers can't substitute their own and bypass the
## manager's pipeline.
brokerCtx: BrokerContext
proc new*(
T: type ReliableChannelManager,
messagingClient: MessagingClient,
brokerCtx: BrokerContext = globalBrokerContext(),
): Result[T, string] =
if messagingClient.isNil():
return err("messaging client is required")
ok(
T(
channels: initTable[ChannelId, ReliableChannel](),
messagingClient: messagingClient,
brokerCtx: brokerCtx,
)
)
proc start*(self: ReliableChannelManager): Result[void, string] =
## Placeholder: per-channel listeners are installed in `ReliableChannel.new`,
## so the manager has nothing to start at this layer. Kept for symmetry
## with the `Waku` mount/start lifecycle and as a hook for future state.
discard
ok()
proc stop*(self: ReliableChannelManager) {.async.} =
## Placeholder mirror of `start`.
discard
proc createReliableChannel*(
self: ReliableChannelManager,
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
): Result[ChannelId, string] =
## Spec entry point. The `MessagingClient` and `rng` the channel needs
## are sourced from the owning `ReliableChannelManager` rather than
## passed per call. Encryption is wired up through the `Encrypt`/
## `Decrypt` request brokers — the application installs its own
## providers (or `setNoopEncryption()`) before traffic flows.
##
## Segmentation, SDS and rate-limit configs will eventually be read
## from the node's `NodeConfig`. Defaults for now.
if self.channels.hasKey(channelId):
return err("channel already exists: " & channelId)
let segConfig = SegmentationConfig(
segmentSizeBytes: DefaultSegmentSizeBytes,
enableReedSolomon: false,
persistence: nil,
)
let sdsConfig = SdsConfig(
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
maxRetransmissions: DefaultMaxRetransmissions,
causalHistorySize: DefaultCausalHistorySize,
persistence: nil,
)
let rateConfig = RateLimitConfig(
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
)
let chn = ReliableChannel.new(
messagingClient = self.messagingClient,
channelId = channelId,
contentTopic = contentTopic,
senderId = senderId,
segConfig = segConfig,
sdsConfig = sdsConfig,
rateConfig = rateConfig,
brokerCtx = self.brokerCtx,
)
self.channels[channelId] = chn
return ok(channelId)
proc closeChannel*(
self: ReliableChannelManager, channelId: ChannelId
): Result[void, string] =
## Flush state, persist outstanding SDS buffers, release resources.
if not self.channels.hasKey(channelId):
return err("unknown channel: " & channelId)
self.channels.del(channelId)
return ok()
proc send*(
self: ReliableChannelManager,
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Result[RequestId, string] =
## Spec-level entry point. Looks the channel up by id and delegates
## to `ReliableChannel.send`, which exposes the visible pipeline
## segmentation -> sds -> rate_limit_manager -> encryption.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
return chn.send(appPayload, ephemeral)
## Inbound messages are not handed to the manager by direct call. Each
## `ReliableChannel` installs its own `MessageReceivedEvent` listener
## in `ReliableChannel.new`, filters by spec marker and `contentTopic`,
## and routes to its private `onMessageReceived`. This keeps the lower
## layer (MessagingClient/Waku) unaware of the existence of ReliableChannel
## and keeps the manager out of per-channel event dispatch.

View File

@ -0,0 +1,62 @@
## Scalable Data Sync (SDS) component for the Reliable Channel API.
##
## Provides end-to-end delivery guarantees via causal history tracking,
## acknowledgements, and retransmission of unacknowledged segments.
##
## Skeleton: `wrapOutgoing` and `handleIncoming` are pass-throughs so
## the send/receive circuit can exercise the surrounding pipeline.
## Real SDS wrapping will plug in via `nim-sds` later.
##
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
import results
import message as sds_message
import ./sds_persistence
export sds_message, sds_persistence
const
DefaultAcknowledgementTimeoutMs* = 5_000
DefaultMaxRetransmissions* = 5
DefaultCausalHistorySize* = 2
type
SdsConfig* = object
acknowledgementTimeoutMs*: int
maxRetransmissions*: int
causalHistorySize*: int
persistence*: SdsPersistence
SdsHandler* = ref object
config*: SdsConfig
participantId*: SdsParticipantID
proc new*(
T: type SdsHandler,
config: SdsConfig,
participantId: SdsParticipantID = SdsParticipantID(""),
): T =
return T(config: config, participantId: participantId)
proc wrapOutgoing*(
self: SdsHandler,
channelId: SdsChannelID,
senderId: SdsParticipantID,
payload: seq[byte],
): Result[seq[byte], string] =
## Stage 2 of the outgoing pipeline (segmentation -> sds -> rate_limit_manager -> encryption).
## Skeleton: pass the encoded segment through unchanged. Real causal
## history / lamport / bloom-filter population will replace this.
return ok(payload)
proc handleIncoming*(
self: SdsHandler, msg: seq[byte]
): Result[tuple[content: seq[byte], channelId: SdsChannelID], string] =
## Skeleton: pass the bytes through; channel id is left empty until
## the real wire format provides it.
return ok((content: msg, channelId: SdsChannelID("")))
proc tickRetransmissions*(self: SdsHandler) =
## Drives retransmissions of unacknowledged messages.
discard

View File

@ -0,0 +1,25 @@
## Persistence backend for SDS outgoing buffer and causal history.
##
## TODO (raised in PR review): this surface is duplicating concerns that
## should come from the SDS module itself. Once the SDS module exposes a
## complete persistence contract, drop this file and import that surface
## instead of re-declaring it here.
import message
type
SdsPersistenceKind* {.pure.} = enum
InMemory
Sqlite
SdsPersistence* = ref object of RootObj
kind*: SdsPersistenceKind
method storeOutgoing*(self: SdsPersistence, msg: SdsMessage) {.base.} =
discard
method markAcknowledged*(self: SdsPersistence, messageId: SdsMessageID) {.base.} =
discard
method unackedOlderThan*(self: SdsPersistence, ageMs: int): seq[SdsMessage] {.base.} =
discard

View File

@ -0,0 +1,34 @@
## Wire format for a single segment, per the Reliable Channel API spec.
##
## Skeleton: encode/decode treat the segment as just its payload bytes,
## since for now we only ever produce a single segment per send.
type SegmentMessageProto* = object
entireMessageHash*: seq[byte] ## Keccak256(original payload), 32 bytes
dataSegmentIndex*: uint32 ## zero-indexed sequence number for data segments
dataSegmentCount*: uint32 ## number of data segments (>= 1)
payload*: seq[byte] ## segment payload (data or parity shard)
paritySegmentIndex*: uint32 ## zero-based sequence number for parity segments
paritySegmentCount*: uint32 ## number of parity segments
isParity*: bool ## true for parity segments, false (default) for data segments
proc isParityMessage*(self: SegmentMessageProto): bool =
self.isParity
proc isValid*(self: SegmentMessageProto): bool =
## Validates hash length (32 bytes), segment indices and counts.
discard
proc encode*(self: SegmentMessageProto): seq[byte] =
self.payload
proc decode*(T: type SegmentMessageProto, buf: seq[byte]): T =
T(
entireMessageHash: @[],
dataSegmentIndex: 0,
dataSegmentCount: 1,
payload: buf,
paritySegmentIndex: 0,
paritySegmentCount: 0,
isParity: false,
)

View File

@ -0,0 +1,70 @@
## Segmentation component for the Reliable Channel API.
##
## Splits large application payloads into transmittable segments and
## reassembles them on reception. Supports optional Reed-Solomon parity
## segments for loss recovery, as per the Reliable Channel API spec.
##
## For the skeleton everything fits in a single segment: real chunking
## and Reed-Solomon parity will be plugged in later.
##
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
import std/options
import ./segment_message_proto
import ./segmentation_persistence
export segment_message_proto, segmentation_persistence
const
DefaultSegmentSizeBytes* = 102_400
SegmentsParityRate* = 0.125
SegmentsReedSolomonMaxCount* = 256
type
SegmentationConfig* = object
segmentSizeBytes*: int
enableReedSolomon*: bool
persistence*: SegmentationPersistence
SegmentationHandler* = ref object
config*: SegmentationConfig
ReassemblyResult* = object
payload*: seq[byte]
entireMessageHash*: seq[byte]
proc new*(T: type SegmentationHandler, config: SegmentationConfig): T =
return T(config: config)
proc performSegmentation*(
self: SegmentationHandler, payload: seq[byte]
): seq[seq[byte]] =
## Skeleton behaviour: emit exactly one segment carrying the whole
## payload. Real chunking and Reed-Solomon parity will replace this.
let segment = SegmentMessageProto(
entireMessageHash: @[],
dataSegmentIndex: 0,
dataSegmentCount: 1,
payload: payload,
paritySegmentIndex: 0,
paritySegmentCount: 0,
isParity: false,
)
return @[segment.encode()]
proc handleIncomingSegment*(
self: SegmentationHandler, segmentBytes: seq[byte]
): Option[ReassemblyResult] =
## Skeleton behaviour: every segment is already a complete message
## (since `performSegmentation` always emits one), so just hand the
## payload straight back.
let segment = SegmentMessageProto.decode(segmentBytes)
return some(
ReassemblyResult(
payload: segment.payload, entireMessageHash: segment.entireMessageHash
)
)
proc cleanupSegments*(self: SegmentationHandler) =
## Drop expired partial-reassembly state.
discard

View File

@ -0,0 +1,20 @@
## Persistence backend interface for segmentation reassembly state.
##
## Allows partial reassembly state to survive process restarts.
type
SegmentationPersistenceKind* {.pure.} = enum
InMemory
Sqlite
SegmentationPersistence* = ref object of RootObj
kind*: SegmentationPersistenceKind
method put*(self: SegmentationPersistence, key: seq[byte], value: seq[byte]) {.base.} =
discard
method get*(self: SegmentationPersistence, key: seq[byte]): seq[byte] {.base.} =
discard
method delete*(self: SegmentationPersistence, key: seq[byte]) {.base.} =
discard

15
channels/types.nim Normal file
View File

@ -0,0 +1,15 @@
## Core identifier types for the Reliable Channel API.
import std/hashes
import waku/api/types as api_types
import ./scalable_data_sync/scalable_data_sync
export scalable_data_sync
export api_types
type ChannelId* = SdsChannelID
proc hash*(r: RequestId): Hash =
## Allows `RequestId` to be used as a `Table` key.
hash(string(r))

View File

@ -89,8 +89,15 @@
inherit zerokitRln;
gitVersion = "v${nimbleVersion}-g${builtins.substring 0 6 shortRev}";
};
wakucanary = pkgs.callPackage ./nix/default.nix {
inherit pkgs;
src = ./.;
targets = ["wakucanary"];
zerokitRln = zerokit.packages.${system}.rln;
};
in {
inherit liblogosdelivery;
inherit liblogosdelivery wakucanary;
# Expose the cargoHash-corrected librln so downstream consumers
# (e.g. logos-delivery-module) bundle the exact same librln this
# build links, instead of pulling zerokit's rln directly — whose

View File

@ -177,6 +177,11 @@ proc logosdelivery_start_node(
chronicles.error "mountMessagingClient failed", err = errMsg
return err("failed to mount messaging: " & errMsg)
ctx.myLib[].mountReliableChannelManager().isOkOr:
let errMsg = $error
chronicles.error "mountReliableChannelManager failed", err = errMsg
return err("failed to mount reliable channel manager: " & errMsg)
(await ctx.myLib[].start()).isOkOr:
let errMsg = $error
chronicles.error "START_NODE failed", err = errMsg

View File

@ -1,6 +1,7 @@
{ pkgs
, src
, zerokitRln
, targets ? []
, gitVersion ? "n/a"
, enablePostgres ? true
, enableNimDebugDlOpen ? true
@ -10,6 +11,8 @@
let
deps = import ./deps.nix { inherit pkgs; };
buildWakucanary = builtins.elem "wakucanary" targets;
nimDefineArgs = pkgs.lib.concatStringsSep " \\\n " (
[ "--define:disable_libbacktrace"
"--define:git_version=${gitVersion}" ]
@ -34,9 +37,29 @@ let
if pkgs.stdenv.hostPlatform.isWindows then "dll"
else if pkgs.stdenv.hostPlatform.isDarwin then "dylib"
else "so";
# Shared `nim c` invocation. Callers vary the output, the source file and a
# few mode-specific flags (e.g. --app:lib, --noMain, --header); everything
# else (paths, defines, threading, gc, nimcache, rln linkage) is constant.
# $NAT_TRAV and $NIMCACHE are shell variables defined in buildPhase.
nimCompile = { outFile, sourceFile, extraArgs ? [] }: ''
nim c \
--noNimblePath \
${pathArgs} \
--path:$NAT_TRAV \
--path:$NAT_TRAV/src \
--passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \
${nimDefineArgs} \
--threads:on \
--mm:refc \
--nimcache:$NIMCACHE \
--out:${outFile} \
${pkgs.lib.concatStringsSep " \\\n " extraArgs} \
${sourceFile}
'';
in
pkgs.stdenv.mkDerivation {
pname = "liblogosdelivery";
pname = if buildWakucanary then "wakucanary" else "liblogosdelivery";
version = "dev";
inherit src;
@ -71,45 +94,47 @@ pkgs.stdenv.mkDerivation {
make -C $NAT_TRAV/vendor/libnatpmp-upstream \
CFLAGS="-Wall -Os -fPIC -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4" libnatpmp.a
${if buildWakucanary then ''
echo "== Building wakucanary =="
${nimCompile {
outFile = "build/wakucanary";
sourceFile = "apps/wakucanary/wakucanary.nim";
extraArgs = [ "--path:." ];
}}
'' else ''
echo "== Building liblogosdelivery (dynamic) =="
nim c \
--noNimblePath \
${pathArgs} \
--path:$NAT_TRAV \
--path:$NAT_TRAV/src \
--passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \
${nimDefineArgs} \
--out:build/liblogosdelivery.${libExt} \
--app:lib \
--threads:on \
--opt:size \
--noMain \
--mm:refc \
--header \
--nimMainPrefix:liblogosdelivery \
--nimcache:$NIMCACHE \
liblogosdelivery/liblogosdelivery.nim
${nimCompile {
outFile = "build/liblogosdelivery.${libExt}";
sourceFile = "liblogosdelivery/liblogosdelivery.nim";
extraArgs = [
"--app:lib"
"--opt:size"
"--noMain"
"--header"
"--nimMainPrefix:liblogosdelivery"
];
}}
echo "== Building liblogosdelivery (static) =="
nim c \
--noNimblePath \
${pathArgs} \
--path:$NAT_TRAV \
--path:$NAT_TRAV/src \
--passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \
${nimDefineArgs} \
--out:build/liblogosdelivery.a \
--app:staticlib \
--threads:on \
--opt:size \
--noMain \
--mm:refc \
--nimMainPrefix:liblogosdelivery \
--nimcache:$NIMCACHE \
liblogosdelivery/liblogosdelivery.nim
${nimCompile {
outFile = "build/liblogosdelivery.a";
sourceFile = "liblogosdelivery/liblogosdelivery.nim";
extraArgs = [
"--app:staticlib"
"--opt:size"
"--noMain"
"--nimMainPrefix:liblogosdelivery"
];
}}
''}
'';
installPhase = ''
installPhase = if buildWakucanary then ''
runHook preInstall
mkdir -p $out/bin $out/lib
cp build/wakucanary $out/bin/
runHook postInstall
'' else ''
runHook preInstall
mkdir -p $out/lib $out/include
cp build/liblogosdelivery.${libExt} $out/lib/ 2>/dev/null || true
@ -118,21 +143,47 @@ pkgs.stdenv.mkDerivation {
runHook postInstall
'';
# Bundle librln alongside liblogosdelivery so the output is self-contained.
# Bundle librln alongside the produced artifact so the output is self-contained.
# Use --add-rpath (not --set-rpath) so fixupPhase's stdenv RUNPATH injection
# for libstdc++ is preserved.
postInstall =
pkgs.lib.optionalString pkgs.stdenv.isDarwin ''
cp ${zerokitRln}/lib/librln.dylib $out/lib/
chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib
install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib
install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib
old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln)
install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib
install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib
''
+ pkgs.lib.optionalString pkgs.stdenv.isLinux ''
cp ${zerokitRln}/lib/librln.so $out/lib/
patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so
'';
if buildWakucanary then
pkgs.lib.optionalString pkgs.stdenv.isDarwin ''
cp ${zerokitRln}/lib/librln.dylib $out/lib/
chmod +w $out/lib/librln.dylib $out/bin/wakucanary
install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib
old=$(otool -L $out/bin/wakucanary | awk 'NR>1{print $1}' | grep librln || true)
if [ -n "$old" ]; then
install_name_tool -change "$old" @rpath/librln.dylib $out/bin/wakucanary
fi
install_name_tool -add_rpath @loader_path/../lib $out/bin/wakucanary
''
+ pkgs.lib.optionalString pkgs.stdenv.isLinux ''
cp ${zerokitRln}/lib/librln.so $out/lib/
patchelf --add-rpath '$ORIGIN/../lib' $out/bin/wakucanary
''
else
pkgs.lib.optionalString pkgs.stdenv.isDarwin ''
cp ${zerokitRln}/lib/librln.dylib $out/lib/
chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib
install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib
install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib
old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln)
install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib
install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib
''
+ pkgs.lib.optionalString pkgs.stdenv.isLinux ''
cp ${zerokitRln}/lib/librln.so $out/lib/
patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so
'';
meta = with pkgs.lib; {
description =
if buildWakucanary
then "Waku network canary tool"
else "logos-delivery shared/static library";
homepage = "https://github.com/logos-messaging/logos-delivery";
license = licenses.mit;
platforms = platforms.unix;
};
}

View File

@ -17,26 +17,36 @@ if [ -z "${NIM_VERSION}" ]; then
exit 1
fi
# Check if the right version is already installed
NIM_DEST="${HOME}/.nim/nim-${NIM_VERSION}"
# 1. A matching Nim is already on PATH (e.g. provided by CI's setup-nim-action,
# choosenim, or a previous run of this script). Use it as-is: installing over it
# would symlink a freshly downloaded Nim into ~/.nimble/bin (first on PATH) and
# shadow a known-good toolchain, which has caused C-backend build failures.
nim_ver=$(nim --version 2>/dev/null | head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1 || true)
if [ "${nim_ver}" = "${NIM_VERSION}" ]; then
echo "Nim ${NIM_VERSION} already installed, skipping."
echo "Nim ${NIM_VERSION} already on PATH ($(command -v nim)), skipping install."
exit 0
fi
# 2. Already installed at our expected location from a previous run, but not on PATH.
# Re-link binaries into ~/.nimble/bin.
if [ -f "${NIM_DEST}/lib/system.nim" ]; then
echo "Nim ${NIM_VERSION} already installed at ${NIM_DEST}, re-linking binaries."
mkdir -p "${HOME}/.nimble/bin"
for bin_path in "${NIM_DEST}/bin/"*; do
ln -sf "${bin_path}" "${HOME}/.nimble/bin/$(basename "${bin_path}")"
done
exit 0
fi
if [ -n "${nim_ver}" ]; then
newer=$(printf '%s\n%s\n' "${NIM_VERSION}" "${nim_ver}" | sort -V | tail -1)
if [ "${newer}" = "${nim_ver}" ]; then
echo "WARNING: Nim ${nim_ver} is installed; this repo is validated against ${NIM_VERSION}." >&2
echo "WARNING: The build will proceed but may behave differently." >&2
exit 0
fi
echo "INFO: Nim ${nim_ver} found in PATH; installing Nim ${NIM_VERSION} to ${NIM_DEST}." >&2
fi
OS=$(uname -s | tr 'A-Z' 'a-z' | sed 's/darwin/macosx/')
ARCH=$(uname -m | sed 's/x86_64/x64/;s/aarch64/arm64/')
NIM_DEST="${HOME}/.nim/nim-${NIM_VERSION}"
BINARY_URL="https://nim-lang.org/download/nim-${NIM_VERSION}-${OS}_${ARCH}.tar.xz"
WORK_DIR=$(mktemp -d)
trap 'rm -rf "${WORK_DIR}"' EXIT
@ -48,9 +58,7 @@ if [ "${HTTP_STATUS}" = "200" ]; then
echo "Downloading pre-built binary from ${BINARY_URL}..."
curl -fL "${BINARY_URL}" -o "${WORK_DIR}/nim.tar.xz"
tar -xJf "${WORK_DIR}/nim.tar.xz" -C "${WORK_DIR}"
rm -rf "${NIM_DEST}"
mkdir -p "${HOME}/.nim"
cp -r "${WORK_DIR}/nim-${NIM_VERSION}" "${NIM_DEST}"
SRC_DIR="${WORK_DIR}/nim-${NIM_VERSION}"
else
echo "No pre-built binary found for ${OS}_${ARCH}. Building from source..."
SRC_URL="https://github.com/nim-lang/Nim/archive/refs/tags/v${NIM_VERSION}.tar.gz"
@ -58,15 +66,19 @@ else
tar -xzf "${WORK_DIR}/nim-src.tar.gz" -C "${WORK_DIR}"
cd "${WORK_DIR}/Nim-${NIM_VERSION}"
sh build_all.sh
rm -rf "${NIM_DEST}"
mkdir -p "${HOME}/.nim"
cp -r "${WORK_DIR}/Nim-${NIM_VERSION}" "${NIM_DEST}"
SRC_DIR="${WORK_DIR}/Nim-${NIM_VERSION}"
fi
# rm -rf can fail with "Directory not empty" on overlay filesystems (e.g. Docker).
# Using cp -r src/. dst/ handles both cases: dst absent (clean) or partially present.
rm -rf "${NIM_DEST}" 2>/dev/null || true
mkdir -p "${NIM_DEST}"
cp -r "${SRC_DIR}/." "${NIM_DEST}/"
mkdir -p "${HOME}/.nimble/bin"
for bin_path in "${NIM_DEST}/bin/"*; do
ln -sf "${bin_path}" "${HOME}/.nimble/bin/$(basename "${bin_path}")"
done
echo "Nim ${NIM_VERSION} installed to ${NIM_DEST}"
echo "Binaries symlinked in ~/.nimble/bin — ensure it is in your PATH."
echo "Binaries symlinked in ~/.nimble/bin — ensure it is in your PATH."

70
scripts/install_nimble.sh Executable file
View File

@ -0,0 +1,70 @@
#!/usr/bin/env bash
# Installs a specific nimble version without using `nimble install nimble`.
#
# `nimble install nimble` is inherently fragile:
# - ETXTBSY: overwriting the running nimble binary in pkgs2/
# - JSON parse failures with older nimble versions reading packages_official.json
#
# Strategy:
# 1. If the right version is already at ~/.nimble/bin/nimble → done.
# 2. If a previously-compiled binary exists in pkgs2/ → re-link it.
# 3. Otherwise: clone the nimble git repo, init submodules, build with nim,
# and atomically replace the target (mv avoids ETXTBSY on the old binary).
set -e
NIMBLE_VERSION="${1:-}"
if [ -z "${NIMBLE_VERSION}" ]; then
echo "Usage: $0 <nimble-version>" >&2
exit 1
fi
NIMBLE_BIN="${HOME}/.nimble/bin/nimble"
# 1. Already installed at the right version?
if [ -x "${NIMBLE_BIN}" ]; then
nimble_ver=$("${NIMBLE_BIN}" --version 2>/dev/null \
| head -1 | grep -oE '[0-9]+\.[0-9]+\.[0-9]+' | head -1 || true)
if [ "${nimble_ver}" = "${NIMBLE_VERSION}" ]; then
echo "Nimble ${NIMBLE_VERSION} already installed, skipping."
exit 0
fi
fi
# 2. Already compiled into pkgs2/ from a previous (possibly partial) run?
PKGS2_NIMBLE=$(ls -dt "${HOME}/.nimble/pkgs2/nimble-${NIMBLE_VERSION}-"*/nimble \
2>/dev/null | head -1 || true)
if [ -n "${PKGS2_NIMBLE}" ] && [ -x "${PKGS2_NIMBLE}" ]; then
echo "Nimble ${NIMBLE_VERSION} found in pkgs2, re-linking to ${NIMBLE_BIN}."
mkdir -p "${HOME}/.nimble/bin"
ln -sf "${PKGS2_NIMBLE}" "${NIMBLE_BIN}"
exit 0
fi
# 3. Build from source.
NIM_BIN="${HOME}/.nimble/bin/nim"
if [ ! -x "${NIM_BIN}" ]; then
NIM_BIN="$(command -v nim)"
fi
WORK_DIR="$(mktemp -d)"
trap 'rm -rf "${WORK_DIR}"' EXIT
echo "Cloning nimble v${NIMBLE_VERSION} with submodules..."
git clone --depth=1 --branch "v${NIMBLE_VERSION}" \
--recurse-submodules --shallow-submodules \
https://github.com/nim-lang/nimble.git \
"${WORK_DIR}/nimble"
echo "Building nimble ${NIMBLE_VERSION} with $("${NIM_BIN}" --version | head -1)..."
cd "${WORK_DIR}/nimble"
# nim reads nim.cfg / config.nims in the current dir, which sets vendor paths.
"${NIM_BIN}" c -d:release --path:src \
-o:"${WORK_DIR}/nimble_new" src/nimble.nim
mkdir -p "${HOME}/.nimble/bin"
# Atomic rename: avoids ETXTBSY when the old binary at NIMBLE_BIN is still running.
cp "${WORK_DIR}/nimble_new" "${NIMBLE_BIN}.new.$$"
mv -f "${NIMBLE_BIN}.new.$$" "${NIMBLE_BIN}"
echo "Nimble ${NIMBLE_VERSION} installed to ${NIMBLE_BIN}"

View File

@ -88,3 +88,6 @@ import ./tools/test_all
# Persistency library tests
import ./persistency/test_all
# Reliable Channel API tests
import ./channels/test_all

View File

@ -0,0 +1,3 @@
{.used.}
import ./test_reliable_channel_send_receive

View File

@ -0,0 +1,153 @@
{.used.}
import std/[net]
import chronos, testutils/unittests, stew/byteutils
import brokers/broker_context
import ../testlib/[common, wakucore, wakunode, testasync]
import waku
import waku/[waku_node, waku_core]
import waku/factory/waku_conf
import waku/events/message_events as waku_message_events
import tools/confutils/cli_args
import channels/reliable_channel_manager
import channels/encryption/noop_encryption
const TestTimeout = chronos.seconds(15)
proc createApiNodeConf(): WakuNodeConf =
var conf = defaultWakuNodeConf().valueOr:
raiseAssert error
conf.mode = cli_args.WakuMode.Core
conf.listenAddress = parseIpAddress("0.0.0.0")
conf.tcpPort = Port(0)
conf.discv5UdpPort = Port(0)
conf.clusterId = 3'u16
conf.numShardsInNetwork = 1
conf.reliabilityEnabled = true
conf.rest = false
return conf
suite "Reliable Channel - ingress":
asyncTest "manager dispatches marked WakuMessage to the right channel":
## Unit test for the receive side of the API: instead of standing
## up two libp2p nodes and a relay mesh, we drive the manager
## directly by emitting a `MessageReceivedEvent` (the exact event
## the MessagingClient emits when a `WakuMessage` arrives off the
## wire). The manager must:
## - drop traffic missing the Reliable Channel spec marker
## - dispatch the matching channel's `onMessageReceived`
## - emit `ChannelMessageReceivedEvent` with the payload
const
channelId = ChannelId("test-channel")
contentTopic = ContentTopic("/reliable-channel/test/proto")
let appPayload = "hello reliable channel".toBytes()
var waku: Waku
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
## Noop encryption providers so the Encrypt/Decrypt brokers have
## something to dispatch to; without this the channel falls back to
## plaintext anyway, but installing them is the documented setup.
setNoopEncryption()
discard manager
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
let received = newFuture[seq[byte]]("channel-message-received")
discard ChannelMessageReceivedEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
if not received.finished() and evt.channelId == channelId:
received.complete(evt.payload)
,
)
.expect("listen ChannelMessageReceivedEvent")
## Build a `WakuMessage` that looks like one that came in off the
## wire from a peer: the spec marker on `meta` plus the right content
## topic. The manager's ingress listener should pick it up,
## decrypt (noop), unwrap SDS (pass-through), reassemble (one
## segment), and finally emit `ChannelMessageReceivedEvent`.
let inboundMsg = WakuMessage(
payload: appPayload,
contentTopic: contentTopic,
version: 0,
meta: LipWireReliableChannelVersion.toBytes(),
)
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg),
)
let arrived = await received.withTimeout(TestTimeout)
check arrived
if arrived:
check received.read() == appPayload
discard await waku.stop()
asyncTest "manager drops unmarked WakuMessage":
## Mirror of the above: same content topic, but `meta` is empty
## (i.e. foreign traffic). The channel-level event must NOT fire.
const
channelId = ChannelId("test-channel-2")
contentTopic = ContentTopic("/reliable-channel/test/proto")
let appPayload = "foreign payload".toBytes()
var waku: Waku
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption()
discard manager
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
var fired = false
discard ChannelMessageReceivedEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
if evt.channelId == channelId:
fired = true
,
)
.expect("listen ChannelMessageReceivedEvent")
let inboundMsg = WakuMessage(
payload: appPayload,
contentTopic: contentTopic,
version: 0,
meta: @[], ## no Reliable Channel spec marker
)
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg),
)
## Give the event broker a chance to fan out.
await sleepAsync(100.milliseconds)
check not fired
discard await waku.stop()

View File

@ -1,11 +1,11 @@
import std/[options], stew/results, testutils/unittests
import std/[options], results, testutils/unittests
import
waku/node/peer_manager/peer_store/migrations,
../../waku_archive/archive_utils,
../../testlib/[simple_mock]
import std/[tables, strutils, os], stew/results, chronicles
import std/[tables, strutils, os], results, chronicles
import waku/common/databases/db_sqlite, waku/common/databases/common

View File

@ -1,4 +1,4 @@
import stew/results, testutils/unittests
import results, testutils/unittests
import waku/node/peer_manager/peer_store/peer_storage, waku/waku_core/peers

View File

@ -9,7 +9,8 @@ import
libp2p/peerId,
libp2p/crypto/crypto,
eth/keys,
eth/p2p/discoveryv5/enr
eth/p2p/discoveryv5/enr,
brokers/broker_context
import
waku/[
@ -184,114 +185,115 @@ suite "Waku Peer Exchange":
suite "Waku Peer Exchange with discv5":
asyncTest "Node successfully exchanges px peers with real discv5":
## Given (copied from test_waku_discv5.nim)
let
# todo: px flag
flags = CapabilitiesBitfield.init(
lightpush = false, filter = false, store = false, relay = true
)
bindIp = parseIpAddress("0.0.0.0")
extIp = parseIpAddress("127.0.0.1")
lockNewGlobalBrokerContext:
## Given (copied from test_waku_discv5.nim)
let
# todo: px flag
flags = CapabilitiesBitfield.init(
lightpush = false, filter = false, store = false, relay = true
)
bindIp = parseIpAddress("0.0.0.0")
extIp = parseIpAddress("127.0.0.1")
nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000)
node1 = newTestWakuNode(
nodeKey1,
bindIp,
nodeTcpPort1,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort1),
nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000)
node1 = newTestWakuNode(
nodeKey1,
bindIp,
nodeTcpPort1,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort1),
)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode(
nodeKey2,
bindIp,
nodeTcpPort2,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2),
)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 = newTestWakuNode(
nodeKey3,
bindIp,
nodeTcpPort3,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3),
)
# discv5
let conf1 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true,
)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode(
nodeKey2,
bindIp,
nodeTcpPort2,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2),
let disc1 =
WakuDiscoveryV5.new(node1.rng, conf1, some(node1.enr), some(node1.peerManager))
let conf2 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort2,
privateKey: keys.PrivateKey(nodeKey2.skkey),
bootstrapRecords: @[disc1.protocol.getRecord()],
autoupdateRecord: true,
)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 = newTestWakuNode(
nodeKey3,
bindIp,
nodeTcpPort3,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3),
let disc2 =
WakuDiscoveryV5.new(node2.rng, conf2, some(node2.enr), some(node2.peerManager))
await allFutures(node1.start(), node2.start(), node3.start())
let resultDisc1StartRes = await disc1.start()
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = await disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
## When
var attempts = 10
while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
# node2 can be connected, so will be returned by peer exchange
require (
await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo())
)
# discv5
let conf1 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true,
)
# Mount peer exchange
await node1.mountPeerExchange()
await node3.mountPeerExchange()
await node3.mountPeerExchangeClient()
let disc1 =
WakuDiscoveryV5.new(node1.rng, conf1, some(node1.enr), some(node1.peerManager))
let dialResponse =
await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo())
let conf2 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort2,
privateKey: keys.PrivateKey(nodeKey2.skkey),
bootstrapRecords: @[disc1.protocol.getRecord()],
autoupdateRecord: true,
)
check dialResponse.isOk
let disc2 =
WakuDiscoveryV5.new(node2.rng, conf2, some(node2.enr), some(node2.peerManager))
let
requestPeers = 1
currentPeers = node3.peerManager.switch.peerStore.peers.len
let res = await node3.fetchPeerExchangePeers(1)
check res.tryGet() == 1
await allFutures(node1.start(), node2.start(), node3.start())
let resultDisc1StartRes = await disc1.start()
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = await disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
# Then node3 has received 1 peer from node1
check:
node3.peerManager.switch.peerStore.peers.len == currentPeers + requestPeers
## When
var attempts = 10
while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
# node2 can be connected, so will be returned by peer exchange
require (
await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo())
)
# Mount peer exchange
await node1.mountPeerExchange()
await node3.mountPeerExchange()
await node3.mountPeerExchangeClient()
let dialResponse =
await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo())
check dialResponse.isOk
let
requestPeers = 1
currentPeers = node3.peerManager.switch.peerStore.peers.len
let res = await node3.fetchPeerExchangePeers(1)
check res.tryGet() == 1
# Then node3 has received 1 peer from node1
check:
node3.peerManager.switch.peerStore.peers.len == currentPeers + requestPeers
await allFutures(
[node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]
)
await allFutures(
[node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]
)

View File

@ -2,7 +2,7 @@
import
std/[tempfiles, strutils, options],
stew/results,
results,
testutils/unittests,
chronos,
libp2p/switch,

View File

@ -1,7 +1,7 @@
{.used.}
import testutils/unittests
import stew/results, waku/waku_core/message, waku/waku_core/time, ./testlib/common
import results, waku/waku_core/message, waku/waku_core/time, ./testlib/common
suite "Waku Payload":
test "Encode/Decode waku message with timestamp":

View File

@ -1,7 +1,7 @@
{.used.}
import
stew/results,
results,
chronos,
testutils/unittests,
libp2p/crypto/crypto as libp2p_keys,

View File

@ -5,7 +5,8 @@ import
testutils/unittests,
chronos,
libp2p/[switch, peerId, crypto/crypto],
eth/[keys, p2p/discoveryv5/enr]
eth/[keys, p2p/discoveryv5/enr],
brokers/broker_context
import
waku/[
@ -31,110 +32,113 @@ suite "Waku Peer Exchange":
suite "request":
asyncTest "Retrieve and provide peer exchange peers from discv5":
## Given (copied from test_waku_discv5.nim)
let
# todo: px flag
flags = CapabilitiesBitfield.init(
lightpush = false, filter = false, store = false, relay = true
)
bindIp = parseIpAddress("0.0.0.0")
extIp = parseIpAddress("127.0.0.1")
lockNewGlobalBrokerContext:
## Given (copied from test_waku_discv5.nim)
let
# todo: px flag
flags = CapabilitiesBitfield.init(
lightpush = false, filter = false, store = false, relay = true
)
bindIp = parseIpAddress("0.0.0.0")
extIp = parseIpAddress("127.0.0.1")
nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000)
node1 = newTestWakuNode(
nodeKey1,
bindIp,
nodeTcpPort1,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort1),
nodeKey1 = generateSecp256k1Key()
nodeTcpPort1 = Port(64010)
nodeUdpPort1 = Port(9000)
node1 = newTestWakuNode(
nodeKey1,
bindIp,
nodeTcpPort1,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort1),
)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode(
nodeKey2,
bindIp,
nodeTcpPort2,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2),
)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 = newTestWakuNode(
nodeKey3,
bindIp,
nodeTcpPort3,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3),
)
# discv5
let conf1 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true,
)
nodeKey2 = generateSecp256k1Key()
nodeTcpPort2 = Port(64012)
nodeUdpPort2 = Port(9002)
node2 = newTestWakuNode(
nodeKey2,
bindIp,
nodeTcpPort2,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort2),
let disc1 = WakuDiscoveryV5.new(
node1.rng, conf1, some(node1.enr), some(node1.peerManager)
)
nodeKey3 = generateSecp256k1Key()
nodeTcpPort3 = Port(64014)
nodeUdpPort3 = Port(9004)
node3 = newTestWakuNode(
nodeKey3,
bindIp,
nodeTcpPort3,
some(extIp),
wakuFlags = some(flags),
discv5UdpPort = some(nodeUdpPort3),
let conf2 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort2,
privateKey: keys.PrivateKey(nodeKey2.skkey),
bootstrapRecords: @[disc1.protocol.getRecord()],
autoupdateRecord: true,
)
# discv5
let conf1 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort1,
privateKey: keys.PrivateKey(nodeKey1.skkey),
bootstrapRecords: @[],
autoupdateRecord: true,
)
let disc2 = WakuDiscoveryV5.new(
node2.rng, conf2, some(node2.enr), some(node2.peerManager)
)
let disc1 =
WakuDiscoveryV5.new(node1.rng, conf1, some(node1.enr), some(node1.peerManager))
await allFutures(node1.start(), node2.start(), node3.start())
let resultDisc1StartRes = await disc1.start()
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = await disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
let conf2 = WakuDiscoveryV5Config(
discv5Config: none(DiscoveryConfig),
address: bindIp,
port: nodeUdpPort2,
privateKey: keys.PrivateKey(nodeKey2.skkey),
bootstrapRecords: @[disc1.protocol.getRecord()],
autoupdateRecord: true,
)
## When
var attempts = 10
while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
let disc2 =
WakuDiscoveryV5.new(node2.rng, conf2, some(node2.enr), some(node2.peerManager))
# node2 can be connected, so will be returned by peer exchange
require (
await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo())
)
await allFutures(node1.start(), node2.start(), node3.start())
let resultDisc1StartRes = await disc1.start()
assert resultDisc1StartRes.isOk(), resultDisc1StartRes.error
let resultDisc2StartRes = await disc2.start()
assert resultDisc2StartRes.isOk(), resultDisc2StartRes.error
# Mount peer exchange
await node1.mountPeerExchange()
await node3.mountPeerExchange()
## When
var attempts = 10
while (disc1.protocol.nodesDiscovered < 1 or disc2.protocol.nodesDiscovered < 1) and
attempts > 0:
await sleepAsync(1.seconds)
attempts -= 1
let dialResponse =
await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo())
let response = dialResponse.get()
# node2 can be connected, so will be returned by peer exchange
require (
await node1.peerManager.connectPeer(node2.switch.peerInfo.toRemotePeerInfo())
)
## Then
check:
response.get().peerInfos.len == 1
response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw
# Mount peer exchange
await node1.mountPeerExchange()
await node3.mountPeerExchange()
let dialResponse =
await node3.dialForPeerExchange(node1.switch.peerInfo.toRemotePeerInfo())
let response = dialResponse.get()
## Then
check:
response.get().peerInfos.len == 1
response.get().peerInfos[0].enr == disc2.protocol.localNode.record.raw
await allFutures(
[node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]
)
await allFutures(
[node1.stop(), node2.stop(), node3.stop(), disc1.stop(), disc2.stop()]
)
asyncTest "Request returns some discovered peers":
let

View File

@ -48,6 +48,7 @@ import
factory/app_callbacks,
persistency/persistency,
],
channels/reliable_channel_manager,
./waku_conf,
./waku_state_info
@ -75,6 +76,8 @@ type Waku* = ref object
messagingClient*: MessagingClient
reliableChannelManager*: ReliableChannelManager
restServer*: WakuRestServerRef
metricsServer*: MetricsHttpServerRef
appCallbacks*: AppCallbacks
@ -367,6 +370,19 @@ proc mountMessagingClient*(waku: Waku): Result[void, string] =
return err("could not create messaging client: " & $error)
return ok()
proc mountReliableChannelManager*(waku: Waku): Result[void, string] =
if not waku.reliableChannelManager.isNil():
return err("reliable channel manager already mounted")
if waku.messagingClient.isNil():
return err("reliable channel manager requires a mounted messaging client")
if waku.node.started:
return err("cannot mount reliable channel manager on a started node")
waku.reliableChannelManager = ReliableChannelManager.new(
waku.messagingClient, waku.brokerCtx
).valueOr:
return err("could not create reliable channel manager: " & $error)
return ok()
proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if waku.node.started:
warn "start: waku node already started"
@ -522,6 +538,10 @@ proc start*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
waku.messagingClient.start().isOkOr:
return err("failed to start messaging client: " & $error)
if not waku.reliableChannelManager.isNil():
waku.reliableChannelManager.start().isOkOr:
return err("failed to start reliable channel manager: " & $error)
return ok()
proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
@ -539,6 +559,9 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if not waku.wakuDiscv5.isNil():
await waku.wakuDiscv5.stop()
if not waku.reliableChannelManager.isNil():
await waku.reliableChannelManager.stop()
if not waku.messagingClient.isNil():
await waku.messagingClient.stop()

View File

@ -110,5 +110,7 @@ type
edgeFilterSubLoopFut*: Future[void]
edgeFilterConnectionLoopFut*: Future[void]
peerEventListener*: WakuPeerEventListener
ownsEdgeShardHealthProvider*: bool
ownsEdgeFilterPeerCountProvider*: bool
{.pop.}

View File

@ -627,16 +627,19 @@ proc stopEdgeFilterLoops(self: SubscriptionManager) {.async: (raises: []).} =
await WakuPeerEvent.dropListener(self.node.brokerCtx, self.peerEventListener)
proc start*(self: SubscriptionManager): Result[void, string] =
RequestEdgeShardHealth.setProvider(
let edgeShardHealthRes = RequestEdgeShardHealth.setProvider(
self.node.brokerCtx,
proc(shard: PubsubTopic): Result[RequestEdgeShardHealth, string] =
self.edgeFilterSubStates.withValue(shard, state):
return ok(RequestEdgeShardHealth(health: state.currentHealth))
return ok(RequestEdgeShardHealth(health: TopicHealth.NOT_SUBSCRIBED)),
).isOkOr:
error "Can't set provider for RequestEdgeShardHealth", error = error
)
self.ownsEdgeShardHealthProvider = edgeShardHealthRes.isOk()
if edgeShardHealthRes.isErr():
error "Can't set provider for RequestEdgeShardHealth",
error = edgeShardHealthRes.error
RequestEdgeFilterPeerCount.setProvider(
let edgeFilterPeerCountRes = RequestEdgeFilterPeerCount.setProvider(
self.node.brokerCtx,
proc(): Result[RequestEdgeFilterPeerCount, string] =
var minPeers = high(int)
@ -645,21 +648,31 @@ proc start*(self: SubscriptionManager): Result[void, string] =
if minPeers == high(int):
minPeers = 0
return ok(RequestEdgeFilterPeerCount(peerCount: minPeers)),
).isOkOr:
error "Can't set provider for RequestEdgeFilterPeerCount", error = error
)
self.ownsEdgeFilterPeerCountProvider = edgeFilterPeerCountRes.isOk()
if edgeFilterPeerCountRes.isErr():
error "Can't set provider for RequestEdgeFilterPeerCount",
error = edgeFilterPeerCountRes.error
# Start Edge workers if node is in Edge mode (which is
# currently mutually-exclusive with relay being mounted).
if self.node.wakuRelay.isNil():
# Start Edge workers only when we are in Edge mode (relay not mounted)
# AND the filter client is mounted (otherwise the loops have nothing
# to talk to and just spam "filter client is nil" warnings).
if self.node.wakuRelay.isNil() and not self.node.wakuFilterClient.isNil():
return self.startEdgeFilterLoops()
return ok()
proc stop*(self: SubscriptionManager) {.async: (raises: []).} =
# Stop Edge workers if node is in Edge mode (which is
# currently mutually-exclusive with relay being mounted).
if self.node.wakuRelay.isNil():
# Stop Edge workers if we started them in `start` (Edge mode + filter client).
if self.node.wakuRelay.isNil() and not self.node.wakuFilterClient.isNil():
await self.stopEdgeFilterLoops()
RequestEdgeShardHealth.clearProvider(self.node.brokerCtx)
RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)
# Only clear providers we actually registered: another SubscriptionManager
# sharing this brokerCtx may have won the race, and clearing its provider
# would leave the broker silently provider-less.
if self.ownsEdgeShardHealthProvider:
RequestEdgeShardHealth.clearProvider(self.node.brokerCtx)
self.ownsEdgeShardHealthProvider = false
if self.ownsEdgeFilterPeerCountProvider:
RequestEdgeFilterPeerCount.clearProvider(self.node.brokerCtx)
self.ownsEdgeFilterPeerCountProvider = false