diff --git a/.github/workflows/ci-daily.yml b/.github/workflows/ci-daily.yml index d52775ae2..dea30ab7e 100644 --- a/.github/workflows/ci-daily.yml +++ b/.github/workflows/ci-daily.yml @@ -3,11 +3,13 @@ name: Daily logos-delivery CI on: schedule: - cron: '30 6 * * *' + workflow_dispatch: env: NPROC: 2 MAKEFLAGS: "-j${NPROC}" - NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none" + NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative" + NIM_PARAMS: "-d:disableMarchNative" jobs: build: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 52d20157a..9ddf904ef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,8 @@ concurrency: env: NPROC: 2 MAKEFLAGS: "-j${NPROC}" - NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none" + NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative" + NIM_PARAMS: "-d:disableMarchNative" NIM_VERSION: '2.2.4' NIMBLE_VERSION: '0.22.3' @@ -35,6 +36,9 @@ jobs: - 'nimble.lock' - 'waku.nimble' - 'Makefile' + - 'scripts/**' + - 'flake.nix' + - 'flake.lock' - 'library/**' - 'liblogosdelivery/**' v2: @@ -43,6 +47,7 @@ jobs: - 'tools/**' - 'tests/all_tests_v2.nim' - 'tests/**' + - 'channels/**' docker: - 'docker/**' @@ -156,7 +161,7 @@ jobs: fi export MAKEFLAGS="-j1" - export NIMFLAGS="--colors:off -d:chronicles_colors:none" + export NIMFLAGS="--colors:off -d:chronicles_colors:none -d:disableMarchNative" export USE_LIBBACKTRACE=0 make V=1 POSTGRES=$postgres_enabled test @@ -176,20 +181,6 @@ jobs: secrets: inherit - js-waku-node: - needs: build-docker-image - uses: logos-messaging/logos-delivery-js/.github/workflows/test-node.yml@master - with: - nim_wakunode_image: ${{ needs.build-docker-image.outputs.image }} - test_type: node - - js-waku-node-optional: - needs: build-docker-image - uses: logos-messaging/logos-delivery-js/.github/workflows/test-node.yml@master - with: - nim_wakunode_image: ${{ needs.build-docker-image.outputs.image }} - test_type: node-optional - lint: name: "Lint" runs-on: ubuntu-22.04 diff --git a/.github/workflows/version-check.yml b/.github/workflows/version-check.yml index e3ad958ba..ee01a9f1a 100644 --- a/.github/workflows/version-check.yml +++ b/.github/workflows/version-check.yml @@ -28,8 +28,11 @@ jobs: set -euo pipefail NIMBLE_VERSION=$(grep -m1 '^version = ' waku.nimble | sed -E 's/version = "([^"]+)"/\1/') # Nearest tag reachable from HEAD; --abbrev=0 drops the --g - # suffix so we get the bare tag (e.g. v0.38.0). - BASE_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "") + # suffix so we get the bare tag (e.g. v0.38.0). `--match 'v*'` skips + # the moving `nightly` tag (auto-updated by the daily CI to point at + # master HEAD), which would otherwise be picked as the nearest tag + # and break the version-sort comparison below. + BASE_TAG=$(git describe --tags --abbrev=0 --match 'v*' 2>/dev/null || echo "") BASE_TAG=${BASE_TAG#v} # Compare on the base version, ignoring any -rc.N prerelease suffix. BASE_TAG=${BASE_TAG%%-*} diff --git a/Makefile b/Makefile index 43bcf61cd..ea1bf66f0 100644 --- a/Makefile +++ b/Makefile @@ -24,6 +24,7 @@ export PATH := $(HOME)/.nimble/bin:$(PATH) # NIM binary location NIM_BINARY := $(shell which nim 2>/dev/null) NPH := $(HOME)/.nimble/bin/nph +NIMBLE := $(HOME)/.nimble/bin/nimble NIMBLEDEPS_STAMP := nimbledeps/.nimble-setup # Compilation parameters @@ -71,7 +72,7 @@ waku.nims: ln -s waku.nimble $@ $(NIMBLEDEPS_STAMP): nimble.lock | install-nimble build-nph waku.nims - nimble setup --localdeps + $(NIMBLE) setup --localdeps touch $@ # Must be phony so the recipe always runs and the sub-make re-evaluates @@ -92,10 +93,14 @@ REQUIRED_NIM_VERSION := $(shell grep -E '^const RequiredNimVersion\s*=' waku. REQUIRED_NIMBLE_VERSION := $(shell grep -E '^const RequiredNimbleVersion\s*=' waku.nimble | grep -oE '"[0-9]+\.[0-9]+\.[0-9]+"' | tr -d '"') install-nim: +ifneq ($(detected_OS),Windows) scripts/install_nim.sh $(REQUIRED_NIM_VERSION) +endif install-nimble: install-nim +ifneq ($(detected_OS),Windows) scripts/install_nimble.sh $(REQUIRED_NIMBLE_VERSION) +endif build: mkdir -p build diff --git a/apps/chat2bridge/chat2bridge.nim b/apps/chat2bridge/chat2bridge.nim index 53eb5d04b..eeeea328b 100644 --- a/apps/chat2bridge/chat2bridge.nim +++ b/apps/chat2bridge/chat2bridge.nim @@ -1,7 +1,7 @@ {.push raises: [].} import - std/[tables, times, strutils, hashes, sequtils, json], + std/[tables, times, strutils, hashes, sequtils, json, options], chronos, confutils, chronicles, @@ -267,10 +267,16 @@ when isMainModule: else: nodev2ExtPort + let nodev2Key = + if conf.nodekey.isSome(): + conf.nodekey.get() + else: + crypto.PrivateKey.random(Secp256k1, rng[]).tryGet() + let bridge = Chat2Matterbridge.new( mbHostUri = "http://" & $initTAddress(conf.mbHostAddress, Port(conf.mbHostPort)), mbGateway = conf.mbGateway, - nodev2Key = conf.nodekey, + nodev2Key = nodev2Key, nodev2BindIp = conf.listenAddress, nodev2BindPort = Port(uint16(conf.libp2pTcpPort) + conf.portsShift), nodev2ExtIp = nodev2ExtIp, diff --git a/apps/chat2bridge/config_chat2bridge.nim b/apps/chat2bridge/config_chat2bridge.nim index abb5e329f..048fc4d87 100644 --- a/apps/chat2bridge/config_chat2bridge.nim +++ b/apps/chat2bridge/config_chat2bridge.nim @@ -1,4 +1,5 @@ import + std/options, confutils, confutils/defs, confutils/std/net, @@ -45,7 +46,7 @@ type Chat2MatterbridgeConf* = object metricsServerAddress* {. desc: "Listening address of the metrics server", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress @@ -62,10 +63,8 @@ type Chat2MatterbridgeConf* = object .}: seq[string] nodekey* {. - desc: "P2P node private key as hex", - defaultValue: crypto.PrivateKey.random(Secp256k1, newRng()[]).tryGet(), - name: "nodekey" - .}: crypto.PrivateKey + desc: "P2P node private key as hex", defaultValueDesc: "random", name: "nodekey" + .}: Option[crypto.PrivateKey] store* {. desc: "Flag whether to start store protocol", defaultValue: true, name: "store" @@ -94,7 +93,7 @@ type Chat2MatterbridgeConf* = object # Matterbridge options mbHostAddress* {. desc: "Listening address of the Matterbridge host", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "mb-host-address" .}: IpAddress diff --git a/apps/chat2mix/config_chat2mix.nim b/apps/chat2mix/config_chat2mix.nim index 4e5a32e6d..639e14986 100644 --- a/apps/chat2mix/config_chat2mix.nim +++ b/apps/chat2mix/config_chat2mix.nim @@ -162,7 +162,8 @@ type metricsServerAddress* {. desc: "Listening address of the metrics server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: + IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress @@ -194,7 +195,10 @@ type dnsDiscoveryNameServers* {. desc: "DNS name server IPs to query. Argument may be repeated.", - defaultValue: @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")], + defaultValue: @[ + IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 1, 1, 1]), + IpAddress(family: IpAddressFamily.IPv4, address_v4: [1'u8, 0, 0, 1]), + ], name: "dns-discovery-name-server" .}: seq[IpAddress] diff --git a/apps/liteprotocoltester/tester_config.nim b/apps/liteprotocoltester/tester_config.nim index dee918b8c..1f4bedaa8 100644 --- a/apps/liteprotocoltester/tester_config.nim +++ b/apps/liteprotocoltester/tester_config.nim @@ -133,7 +133,7 @@ type LiteProtocolTesterConf* = object ## Tester REST service configuration restAddress* {. desc: "Listening address of the REST HTTP server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "rest-address" .}: IpAddress diff --git a/apps/networkmonitor/networkmonitor_config.nim b/apps/networkmonitor/networkmonitor_config.nim index f67fb09a8..b5bcfbd96 100644 --- a/apps/networkmonitor/networkmonitor_config.nim +++ b/apps/networkmonitor/networkmonitor_config.nim @@ -116,7 +116,7 @@ type NetworkMonitorConf* = object metricsServerAddress* {. desc: "Listening address of the metrics server.", - defaultValue: parseIpAddress("127.0.0.1"), + defaultValue: IpAddress(family: IpAddressFamily.IPv4, address_v4: [127'u8, 0, 0, 1]), name: "metrics-server-address" .}: IpAddress diff --git a/channels/encryption/encryption.nim b/channels/encryption/encryption.nim new file mode 100644 index 000000000..5cb53be2f --- /dev/null +++ b/channels/encryption/encryption.nim @@ -0,0 +1,25 @@ +## Optional encryption hooks for the Reliable Channel API. +## +## Modelled as `RequestBroker`s: the broker pattern lets the channel +## delegate work to a provider that may live in any module without +## introducing a direct dependency. If no provider is registered the +## broker returns an error, so installing the noop providers from +## `noop_encryption` is required when the application does not want +## actual encryption. +## +## Applied per-segment after SDS processing on outgoing, and before +## SDS processing on incoming. No specific scheme is mandated. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import brokers/request_broker + +export request_broker + +RequestBroker: + type Encrypt* = seq[byte] + proc signature*(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} + +RequestBroker: + type Decrypt* = seq[byte] + proc signature*(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} diff --git a/channels/encryption/noop_encryption.nim b/channels/encryption/noop_encryption.nim new file mode 100644 index 000000000..f09ed9cf4 --- /dev/null +++ b/channels/encryption/noop_encryption.nim @@ -0,0 +1,18 @@ +## No-op encryption providers. Install these when the application does +## not want actual encryption so the `Encrypt` / `Decrypt` brokers have +## something to dispatch to. + +import results +import chronos +import ./encryption + +proc setNoopEncryption*() = + discard Encrypt.setProvider( + proc(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} = + return ok(Encrypt(payload)) + ) + + discard Decrypt.setProvider( + proc(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} = + return ok(Decrypt(payload)) + ) diff --git a/channels/events.nim b/channels/events.nim new file mode 100644 index 000000000..904a34dc6 --- /dev/null +++ b/channels/events.nim @@ -0,0 +1,39 @@ +## Reliable Channel event types emitted to API consumers. +## +## Lifecycle events for individual segments (sent / propagated / errored) +## are the same as the network-level ones the DeliveryService already +## emits — `requestId` is shared across layers — so we just re-export +## `waku/events/message_events` and avoid declaring duplicates. +## +## Only the channel-level `MessageReceivedEvent` carries data that has +## no analogue in the lower layer (reassembled application payload, +## senderId, channelId), so it lives here. + +import waku/events/message_events as waku_message_events +import brokers/event_broker + +import ./types as channel_types + +export waku_message_events, channel_types, event_broker + +EventBroker: + type ChannelMessageReceivedEvent* = object + channelId*: ChannelId + senderId*: SdsParticipantID + payload*: seq[byte] + +EventBroker: + ## Emitted when every segment of a channel-level `send()` reached + ## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the + ## `requestId` is the channel-layer parent returned by `send()`. + type ChannelMessageSentEvent* = object + channelId*: ChannelId + requestId*: RequestId + +EventBroker: + ## Emitted when a channel-level `send()` finalises with at least one + ## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`. + type ChannelMessageErrorEvent* = object + channelId*: ChannelId + requestId*: RequestId + error*: string diff --git a/channels/rate_limit_manager/rate_limit_manager.nim b/channels/rate_limit_manager/rate_limit_manager.nim new file mode 100644 index 000000000..ab5a9f67b --- /dev/null +++ b/channels/rate_limit_manager/rate_limit_manager.nim @@ -0,0 +1,80 @@ +## Rate Limit Manager for the Reliable Channel API. +## +## Tracks messages sent per RLN epoch and delays dispatch when the +## limit is approached, ensuring RLN compliance on enforcing relays. +## +## For the skeleton this is a pass-through: messages are immediately +## released as ready-to-send. Real epoch budgeting will be added later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/times +import message +import brokers/event_broker +import brokers/broker_context + +export event_broker, broker_context +export message.SdsChannelID + +const + DefaultEpochPeriodSec* = 600 + DefaultMessagesPerEpoch* = 1 + +EventBroker: + ## Emitted by `enqueueToSend` carrying the batch of opaque message + ## blobs that may now leave the rate limiter and continue down the + ## outgoing pipeline (encryption -> dispatch). Bytes only: the rate + ## limiter is intentionally agnostic of SDS, so anything serialisable + ## can flow through it. + ## + ## `channelId` lets listeners filter to their own channel, since all + ## reliable channels share the underlying Waku node's broker context. + type ReadyToSendEvent* = ref object + channelId*: SdsChannelID + msgs*: seq[seq[byte]] + +type + RateLimitConfig* = object + enabled*: bool ## spec: rate limiting opt-in; SHOULD be true when RLN active + epochPeriodSec*: int + messagesPerEpoch*: int + + RateLimitManager* = ref object + config*: RateLimitConfig + queue*: seq[seq[byte]] + currentEpochStart*: Time + sentInCurrentEpoch*: int + channelId*: SdsChannelID ## tag for the emitted `ReadyToSendEvent` + brokerCtx: BrokerContext + +proc new*( + T: type RateLimitManager, + config: RateLimitConfig, + channelId: SdsChannelID, + brokerCtx: BrokerContext = globalBrokerContext(), +): T = + return T( + config: config, + queue: @[], + currentEpochStart: getTime(), + sentInCurrentEpoch: 0, + channelId: channelId, + brokerCtx: brokerCtx, + ) + +proc enqueueToSend*(self: RateLimitManager, msg: seq[byte]) = + ## Skeleton behaviour: enqueue and immediately release as a single + ## ready batch. Real per-epoch budgeting will park messages on + ## `self.queue` and emit only when the budget allows. + ReadyToSendEvent.emit( + self.brokerCtx, ReadyToSendEvent(channelId: self.channelId, msgs: @[msg]) + ) + +proc dequeueReady*(self: RateLimitManager): seq[seq[byte]] = + ## Returns the set of queued messages that may be dispatched now + ## without exceeding the configured rate limit. + discard + +proc resetEpoch*(self: RateLimitManager) = + self.currentEpochStart = getTime() + self.sentInCurrentEpoch = 0 diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim new file mode 100644 index 000000000..c3fbe5d77 --- /dev/null +++ b/channels/reliable_channel.nim @@ -0,0 +1,453 @@ +## Reliable Channel type. +## +## A `ReliableChannel` orchestrates segmentation, SDS (end-to-end +## reliability), optional encryption, and rate-limited dispatch on top +## of the Messaging API for a single channel. +## +## Outgoing pipeline: Segment -> SDS -> Rate Limit -> Encrypt -> Dispatch +## Incoming pipeline: Decrypt -> SDS -> Reassemble -> Emit event +## +## Channels are owned by a `ReliableChannelManager`. Lifecycle and send +## operations are addressed by `ChannelId`, so callers only need to keep +## an opaque handle around. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/[options, sets, tables] +import results +import chronos +import bearssl/rand +import stew/byteutils +import libp2p/crypto/crypto as libp2p_crypto + +import waku/api/api +import waku/factory/waku as waku_factory +import waku/node/delivery_service/send_service +import waku/waku_core/topics + +import ./events +import ./segmentation/segmentation +import ./scalable_data_sync/scalable_data_sync +import ./rate_limit_manager/rate_limit_manager +import ./encryption/encryption + +export + api, waku_factory, events, segmentation, scalable_data_sync, rate_limit_manager, + encryption + +const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" + ## Wire-format spec marker for the Reliable Channel layer, as defined + ## in the reliable-channel-api LIP (`Wire Format / Spec Marker`). + ## A `WakuMessage` whose `meta` field does not equal these bytes is + ## not addressed to this layer and is silently dropped on ingress. + ## The trailing `/N` is the wire-format version and is bumped only + ## on breaking on-the-wire changes; implementations pin one version. + +type + SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. + async: (raises: [CatchableError]), gcsafe + .} + ## Egress dispatch boundary. Defaults to `waku.send`; tests inject a + ## fake that records calls and returns canned `RequestId`s so the + ## send state machine can be exercised end-to-end without a network. + + MessagePersistence {.pure.} = enum + Persistent + Ephemeral + + SegmentSendState {.pure.} = enum + ## Lifecycle of a single segment as tracked by the channel. The + ## messaging layer has its own richer `DeliveryState` (retries, + ## propagated-vs-validated); here we only model what's needed to + ## decide when a `channelReqId` is fully accounted for. + AwaitingRateLimit ## Pushed by `send`; not yet released by rate_limit_manager. + InFlight + ## Released by rate_limit_manager and handed to delivery_service; + ## `messagingReqId` is now set. + Confirmed ## `MessageSentEvent` arrived for `messagingReqId`. + Failed + ## `MessageErrorEvent` arrived for `messagingReqId`, or the local + ## delivery-task construction failed before any id was reachable. + + PendingMessagingRequest = object + ## One entry per segment (i.e. per messaging-layer request). The + ## relative order of `AwaitingRateLimit` entries must match the + ## order in which `rate_limit_manager` re-emits messages, which is + ## FIFO with `send()`. + channelReqId*: RequestId + ## The channel-layer parent id returned to the caller of `send()` in channel layer. + ## One channel request maps to N pending messaging requests. + messagingReqId*: Option[RequestId] + ## Per-segment messaging layer id. `none` until `onReadyToSend` assigns it. + persistenceReqType: MessagePersistence + segmentSendState*: SegmentSendState + + ReliableChannel* = ref object + ## Spec-defined public type. Fields are private so callers cannot + ## mutate internals and break invariants. Getters are added below + ## for the few values consumers may need. + sendHandler: SendHandler + channelId: ChannelId + contentTopic: ContentTopic + senderId: SdsParticipantID + rng: ref HmacDrbgContext + segmentation: SegmentationHandler + sdsHandler: SdsHandler + rateLimit: RateLimitManager + + requestIds: Table[RequestId, seq[RequestId]] + pendingMessagingRequests: seq[PendingMessagingRequest] + ## Entries are kept until the matching segment reaches a final + ## state (`Confirmed` or `Failed`); a whole channel request is + ## then pruned in one pass once all its segments are final. + brokerCtx: BrokerContext + +func getChannelId*(self: ReliableChannel): ChannelId {.inline.} = + self.channelId + +func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} = + self.contentTopic + +func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = + self.senderId + +func isFinal(state: SegmentSendState): bool {.inline.} = + return state in {SegmentSendState.Confirmed, SegmentSendState.Failed} + +proc pruneCompletedChannelReqs(self: ReliableChannel) = + ## Drop every `pendingMessagingRequests` entry whose `channelReqId` + ## has all of its segments in a final state. A single failing + ## segment doesn't trigger a drop on its own — we wait until siblings + ## are also accounted for, so the channel-level outcome is decided + ## from a complete picture. For each fully-final `channelReqId`, emit + ## the channel-level final event before the entries are dropped: + ## `ChannelMessageSentEvent` if every sibling Confirmed, + ## `ChannelMessageErrorEvent` if any sibling Failed. + var hasPending = initHashSet[RequestId]() + var anyFailed = initHashSet[RequestId]() + for entry in self.pendingMessagingRequests: + if not entry.segmentSendState.isFinal(): + hasPending.incl(entry.channelReqId) + elif entry.segmentSendState == SegmentSendState.Failed: + anyFailed.incl(entry.channelReqId) + + var emitted = initHashSet[RequestId]() + for entry in self.pendingMessagingRequests: + if entry.channelReqId in hasPending or entry.channelReqId in emitted: + continue + emitted.incl(entry.channelReqId) + if entry.channelReqId in anyFailed: + ChannelMessageErrorEvent.emit( + self.brokerCtx, + ChannelMessageErrorEvent( + channelId: self.channelId, + requestId: entry.channelReqId, + error: "one or more segments failed", + ), + ) + else: + ChannelMessageSentEvent.emit( + self.brokerCtx, + ChannelMessageSentEvent( + channelId: self.channelId, requestId: entry.channelReqId + ), + ) + + self.pendingMessagingRequests.keepItIf(it.channelReqId in hasPending) + +proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) = + ## Invoked from this channel's `MessageSentEvent` listener. Flips + ## the matching `InFlight` segment to `Confirmed` and prunes. The + ## listener routes every event through here; entries that don't + ## belong to this channel simply don't match and are no-ops. + self.pendingMessagingRequests.applyItIf( + it.segmentSendState == SegmentSendState.InFlight and + it.messagingReqId == some(messagingReqId) + ): + it.segmentSendState = SegmentSendState.Confirmed + self.pruneCompletedChannelReqs() + +proc onMessageError(self: ReliableChannel, messagingReqId: RequestId) = + ## Symmetric to `onMessageSent` but for `MessageErrorEvent`. + self.pendingMessagingRequests.applyItIf( + it.segmentSendState == SegmentSendState.InFlight and + it.messagingReqId == some(messagingReqId) + ): + it.segmentSendState = SegmentSendState.Failed + self.pruneCompletedChannelReqs() + +proc onReadyToSend( + self: ReliableChannel, readyToSendEvent: ReadyToSendEvent +) {.async: (raises: []).} = + ## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent` + ## listener once `rate_limit_manager` releases a batch of opaque + ## blobs (already-encoded SDS messages): + ## + ## ... -> rate_limit_manager -> [encryption] -> dispatch + var idx = 0 + for m in readyToSendEvent.msgs: + ## The first `AwaitingRateLimit` entry in push order is the one + ## this `m` belongs to: `send()` adds one entry per segment, and + ## `rate_limit_manager` re-emits them in the same FIFO order, so + ## the two sequences advance in lockstep. Earlier entries may + ## already be `InFlight` / `Confirmed` / `Failed` because they + ## live on until every sibling of their `channelReqId` is final, + ## so we walk past those to find the next one that was awaiting for this batch. + while idx < self.pendingMessagingRequests.len and + self.pendingMessagingRequests[idx].segmentSendState != + SegmentSendState.AwaitingRateLimit + : + idx.inc() + if idx >= self.pendingMessagingRequests.len: + ## rate_limit_manager emitted more messages than we have pending — + ## should not happen given `send` pushes one entry per enqueued + ## SDS payload. Drop silently rather than corrupt state. + break + + let channelReqId = self.pendingMessagingRequests[idx].channelReqId + let isEphemeral = + self.pendingMessagingRequests[idx].persistenceReqType == + MessagePersistence.Ephemeral + + ## TODO: revisit which fields of the SDS message must be encrypted. + ## Encrypting the whole encoded blob forces every receiver to attempt + ## decryption before it can route, which breaks selective dispatch. + ## Leave routing metadata (channelId, causal-history references) in + ## clear and encrypt only the application payload. + let encRes = await Encrypt.request(m) + let encrypted = encRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: channelReqId, messageHash: "", error: "encryption failed: " & error + ), + ) + ## Encryption failed *before* we could hand the segment to the + ## delivery layer — no `messagingReqId` was minted and no + ## `DeliveryTask` was queued on `sendService`. The delivery + ## layer will therefore never emit a `MessageSentEvent` / + ## `MessageErrorEvent` for this segment, so `onMessageError` + ## won't fire either. Advance the state machine inline so the + ## parent `channelReqId` can still be pruned once its siblings + ## are also final. + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed + idx.inc() + continue + let wireBytes = seq[byte](encrypted) + + ## The `meta` field carries the Reliable Channel wire-format spec + ## marker so the ingress side of any peer can route this WakuMessage + ## to its Reliable Channel layer. + let envelope = MessageEnvelope( + contentTopic: self.contentTopic, + payload: wireBytes, + ephemeral: isEphemeral, + meta: LipWireReliableChannelVersion.toBytes(), + ) + + ## `waku.send` is not annotated `(raises: [])`, but this listener is. + ## Convert any raise to a Result error so the state machine handles + ## both failure modes (Result.err and exception) through one path. + let sendRes = + try: + await self.sendHandler(envelope) + except CatchableError as e: + Result[RequestId, string].err("waku send raised: " & e.msg) + + let messagingReqId = sendRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: channelReqId, messageHash: "", error: "waku send failed: " & error + ), + ) + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed + idx.inc() + continue + + self.pendingMessagingRequests[idx].messagingReqId = some(messagingReqId) + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.InFlight + self.requestIds.mgetOrPut(channelReqId, @[]).add(messagingReqId) + idx.inc() + + self.pruneCompletedChannelReqs() + +proc send*( + self: ReliableChannel, payload: seq[byte], ephemeral: bool = false +): Result[RequestId, string] = + ## Single application-level send. The first three stages of the + ## outgoing pipeline are chained explicitly so the flow is visible + ## at a glance: + ## + ## segmentation -> sds -> rate_limit_manager + ## + ## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with + ## the SDS messages cleared for transmission; the channel's listener + ## then runs the final stage (encryption -> dispatch). The + ## `persistenceReqType` is carried alongside each segment in + ## `pendingMessagingRequests` and stamped onto the eventual + ## `MessageEnvelope`. + ## + ## The returned `RequestId` is the channel-level parent of one-or-more + ## messaging-layer `RequestId`s; the mapping is recorded in + ## `self.requestIds`. + if payload.len == 0: + return err("empty payload") + + let channelReqId = RequestId.new(self.rng) + self.requestIds[channelReqId] = @[] + + let persistenceReqType = + if ephemeral: MessagePersistence.Ephemeral else: MessagePersistence.Persistent + + for segmentBytes in self.segmentation.performSegmentation(payload): + ## Segments arrive already encoded; the segmentation module owns + ## the wire format so SDS only ever sees opaque bytes. + let sdsBytes = self.sdsHandler.wrapOutgoing( + self.channelId, self.senderId, segmentBytes + ).valueOr: + return err("SDS wrap failed: " & error) + self.pendingMessagingRequests.add( + PendingMessagingRequest( + channelReqId: channelReqId, + messagingReqId: none(RequestId), + persistenceReqType: persistenceReqType, + segmentSendState: SegmentSendState.AwaitingRateLimit, + ) + ) + self.rateLimit.enqueueToSend(sdsBytes) + + return ok(channelReqId) + +proc onMessageReceived( + self: ReliableChannel, messageHash: string, payload: seq[byte] +) {.async: (raises: []).} = + ## Ingress pipeline made visible: + ## + ## payload -> decrypt -> sds -> reassemble -> emit + ## + ## Invoked from this channel's `MessageReceivedEvent` listener, which + ## already filtered on the spec marker and on `contentTopic`. The + ## channel only sees the raw payload bytes for itself. + + ## Notice that the following "request" is implemented implicitly as a broker call to + ## the `Decrypt` request broker. + let decRes = await Decrypt.request(payload) + let plaintext = decRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: RequestId(""), + messageHash: messageHash, + error: "decryption failed: " & error, + ), + ) + return + let plaintextBytes = seq[byte](plaintext) + + let unwrapped = self.sdsHandler.handleIncoming(plaintextBytes) + if unwrapped.isErr(): + return + + let reassembled = self.segmentation.handleIncomingSegment(unwrapped.get().content) + if reassembled.isSome(): + ## Emit on the captured `brokerCtx` (the manager's), so the + ## application listener that the manager has set up on that same + ## context picks the event up. + ChannelMessageReceivedEvent.emit( + self.brokerCtx, + ChannelMessageReceivedEvent( + channelId: self.channelId, + senderId: self.senderId, + payload: reassembled.get().payload, + ), + ) + +proc new*( + T: type ReliableChannel, + waku: Waku, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, + segConfig: SegmentationConfig, + sdsConfig: SdsConfig, + rateConfig: RateLimitConfig, + brokerCtx: BrokerContext = globalBrokerContext(), + sendHandler: SendHandler = nil, +): T = + ## Pipeline handlers (segmentation/SDS/rate-limit) are constructed + ## inside the channel rather than handed in by the caller — they are + ## implementation details of the channel, not knobs the API consumer + ## should be wiring up. Encryption is delegated to the `Encrypt`/ + ## `Decrypt` request brokers, so the channel keeps no per-instance + ## encryption state either. + ## + ## `sendHandler` defaults to `waku.send`; tests pass a fake to drive + ## the send state machine without touching the network. + let resolvedSendHandler = + if sendHandler.isNil(): + proc( + envelope: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + return await waku.send(envelope) + else: + sendHandler + + let chn = T( + sendHandler: resolvedSendHandler, + channelId: channelId, + contentTopic: contentTopic, + senderId: senderId, + rng: libp2p_crypto.newRng(), + segmentation: SegmentationHandler.new(segConfig), + sdsHandler: SdsHandler.new(sdsConfig, senderId), + rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx), + requestIds: initTable[RequestId, seq[RequestId]](), + pendingMessagingRequests: @[], + brokerCtx: brokerCtx, + ) + + ## Each channel owns its own egress + ingress + send-completion + ## listeners on `chn.brokerCtx`, filtered to traffic addressed to + ## this channel. Keeping the listeners (and the handler procs they + ## call) inside the channel lets `onReadyToSend` / + ## `onMessageReceived` / `onMessageSent` / `onMessageError` stay + ## private — the manager doesn't need to know about them. + discard ReadyToSendEvent.listen( + chn.brokerCtx, + proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} = + if evt.channelId == chn.channelId: + await chn.onReadyToSend(evt) + , + ) + + discard MessageReceivedEvent.listen( + chn.brokerCtx, + proc(evt: MessageReceivedEvent): Future[void] {.async: (raises: []).} = + ## Drop foreign traffic (non-Reliable-Channel `meta`) and traffic + ## for other channels before doing any decode work. + if string.fromBytes(evt.message.meta) != LipWireReliableChannelVersion: + return + if evt.message.contentTopic != chn.contentTopic: + return + await chn.onMessageReceived(evt.messageHash, evt.message.payload) + , + ) + + ## Send-completion events are tagged with the per-segment messaging + ## `requestId` — globally unique, so we don't need any channel filter + ## up front. The handler scans this channel's pending entries for a + ## match and is a no-op when the id belongs to a different channel. + discard MessageSentEvent.listen( + chn.brokerCtx, + proc(evt: MessageSentEvent): Future[void] {.async: (raises: []).} = + chn.onMessageSent(evt.requestId), + ) + + discard MessageErrorEvent.listen( + chn.brokerCtx, + proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} = + chn.onMessageError(evt.requestId), + ) + + return chn diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim new file mode 100644 index 000000000..747f755b4 --- /dev/null +++ b/channels/reliable_channel_manager.nim @@ -0,0 +1,141 @@ +## Reliable Channel API entry point. +## +## Owns the set of `ReliableChannel` instances and exposes lifecycle and +## send/receive operations addressed by `ChannelId`. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/tables +import results +import chronos +import stew/byteutils + +import waku/api/api +import waku/api/api_conf +import waku/events/message_events as waku_message_events +import waku/factory/waku as waku_factory +import waku/node/delivery_service/delivery_service +import waku/waku_core/topics + +import ./reliable_channel +import ./encryption/noop_encryption + +export reliable_channel + +type ReliableChannelManager* = ref object + channels: Table[ChannelId, ReliableChannel] + waku: Waku + ## Owned by the manager. The channel layer reaches the messaging + ## API through `waku.send(envelope)`; constructing DeliveryTasks + ## directly would breach the layer boundary. + brokerCtx: BrokerContext + +proc new*( + T: type ReliableChannelManager, + conf: WakuNodeConf, + brokerCtx: BrokerContext = globalBrokerContext(), +): Future[Result[T, string]] {.async.} = + ## TODO !! The proper ownership chain is: + ## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode, + ## and this will be implemented in the future. For now, `createNode` + ## is called here to get a Waku instance, and the WakuNode is immediately discarded. + ## This is a temporary workaround to get the API + + let waku = ?(await createNode(conf)) + + let manager = T( + channels: initTable[ChannelId, ReliableChannel](), waku: waku, brokerCtx: brokerCtx + ) + + return ok(manager) + +proc start*(self: ReliableChannelManager): Result[void, string] = + ## Bring the owned DeliveryService up. Separated from `new` so callers + ## can register encryption providers / create channels before traffic + ## starts flowing. + self.waku.deliveryService.startDeliveryService() + +proc stop*(self: ReliableChannelManager) {.async.} = + if not self.waku.isNil(): + await self.waku.deliveryService.stopDeliveryService() + +proc createReliableChannel*( + self: ReliableChannelManager, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, + sendHandler: SendHandler = nil, +): Result[ChannelId, string] = + ## Spec entry point. The `DeliveryService` and `rng` the channel needs + ## are sourced from the owning `ReliableChannelManager` rather than + ## passed per call. Encryption is wired up through the `Encrypt`/ + ## `Decrypt` request brokers — the application installs its own + ## providers (or `setNoopEncryption()`) before traffic flows. + ## + ## Segmentation, SDS and rate-limit configs will eventually be read + ## from the node's `NodeConfig`. Defaults for now. + ## + ## `sendHandler` is left `nil` in production so the channel uses the + ## owned `waku.send`; tests pass a fake to bypass the network. + if self.channels.hasKey(channelId): + return err("channel already exists: " & channelId) + + let segConfig = SegmentationConfig( + segmentSizeBytes: DefaultSegmentSizeBytes, + enableReedSolomon: false, + persistence: nil, + ) + let sdsConfig = SdsConfig( + acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, + maxRetransmissions: DefaultMaxRetransmissions, + causalHistorySize: DefaultCausalHistorySize, + persistence: nil, + ) + let rateConfig = RateLimitConfig( + epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch + ) + + let chn = ReliableChannel.new( + waku = self.waku, + channelId = channelId, + contentTopic = contentTopic, + senderId = senderId, + segConfig = segConfig, + sdsConfig = sdsConfig, + rateConfig = rateConfig, + brokerCtx = self.brokerCtx, + sendHandler = sendHandler, + ) + + self.channels[channelId] = chn + return ok(channelId) + +proc closeChannel*( + self: ReliableChannelManager, channelId: ChannelId +): Result[void, string] = + ## Flush state, persist outstanding SDS buffers, release resources. + if not self.channels.hasKey(channelId): + return err("unknown channel: " & channelId) + self.channels.del(channelId) + return ok() + +proc send*( + self: ReliableChannelManager, + channelId: ChannelId, + appPayload: seq[byte], + ephemeral: bool = false, +): Result[RequestId, string] = + ## Spec-level entry point. Looks the channel up by id and delegates + ## to `ReliableChannel.send`, which exposes the visible pipeline + ## segmentation -> sds -> rate_limit_manager -> encryption. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): + return err("unknown channel: " & channelId) + return chn.send(appPayload, ephemeral) + +## Inbound messages are not handed to the manager by direct call. Each +## `ReliableChannel` installs its own `MessageReceivedEvent` listener +## in `ReliableChannel.new`, filters by spec marker and `contentTopic`, +## and routes to its private `onMessageReceived`. This keeps the lower +## layer (MessagingAPI/Waku) unaware of the existence of ReliableChannel +## and keeps the manager out of per-channel event dispatch. diff --git a/channels/scalable_data_sync/scalable_data_sync.nim b/channels/scalable_data_sync/scalable_data_sync.nim new file mode 100644 index 000000000..30ad0e02b --- /dev/null +++ b/channels/scalable_data_sync/scalable_data_sync.nim @@ -0,0 +1,62 @@ +## Scalable Data Sync (SDS) component for the Reliable Channel API. +## +## Provides end-to-end delivery guarantees via causal history tracking, +## acknowledgements, and retransmission of unacknowledged segments. +## +## Skeleton: `wrapOutgoing` and `handleIncoming` are pass-throughs so +## the send/receive circuit can exercise the surrounding pipeline. +## Real SDS wrapping will plug in via `nim-sds` later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import results +import message as sds_message + +import ./sds_persistence + +export sds_message, sds_persistence + +const + DefaultAcknowledgementTimeoutMs* = 5_000 + DefaultMaxRetransmissions* = 5 + DefaultCausalHistorySize* = 2 + +type + SdsConfig* = object + acknowledgementTimeoutMs*: int + maxRetransmissions*: int + causalHistorySize*: int + persistence*: SdsPersistence + + SdsHandler* = ref object + config*: SdsConfig + participantId*: SdsParticipantID + +proc new*( + T: type SdsHandler, + config: SdsConfig, + participantId: SdsParticipantID = SdsParticipantID(""), +): T = + return T(config: config, participantId: participantId) + +proc wrapOutgoing*( + self: SdsHandler, + channelId: SdsChannelID, + senderId: SdsParticipantID, + payload: seq[byte], +): Result[seq[byte], string] = + ## Stage 2 of the outgoing pipeline (segmentation -> sds -> rate_limit_manager -> encryption). + ## Skeleton: pass the encoded segment through unchanged. Real causal + ## history / lamport / bloom-filter population will replace this. + return ok(payload) + +proc handleIncoming*( + self: SdsHandler, msg: seq[byte] +): Result[tuple[content: seq[byte], channelId: SdsChannelID], string] = + ## Skeleton: pass the bytes through; channel id is left empty until + ## the real wire format provides it. + return ok((content: msg, channelId: SdsChannelID(""))) + +proc tickRetransmissions*(self: SdsHandler) = + ## Drives retransmissions of unacknowledged messages. + discard diff --git a/channels/scalable_data_sync/sds_persistence.nim b/channels/scalable_data_sync/sds_persistence.nim new file mode 100644 index 000000000..8089595ea --- /dev/null +++ b/channels/scalable_data_sync/sds_persistence.nim @@ -0,0 +1,25 @@ +## Persistence backend for SDS outgoing buffer and causal history. +## +## TODO (raised in PR review): this surface is duplicating concerns that +## should come from the SDS module itself. Once the SDS module exposes a +## complete persistence contract, drop this file and import that surface +## instead of re-declaring it here. + +import message + +type + SdsPersistenceKind* {.pure.} = enum + InMemory + Sqlite + + SdsPersistence* = ref object of RootObj + kind*: SdsPersistenceKind + +method storeOutgoing*(self: SdsPersistence, msg: SdsMessage) {.base.} = + discard + +method markAcknowledged*(self: SdsPersistence, messageId: SdsMessageID) {.base.} = + discard + +method unackedOlderThan*(self: SdsPersistence, ageMs: int): seq[SdsMessage] {.base.} = + discard diff --git a/channels/segmentation/segment_message_proto.nim b/channels/segmentation/segment_message_proto.nim new file mode 100644 index 000000000..f19cdc27f --- /dev/null +++ b/channels/segmentation/segment_message_proto.nim @@ -0,0 +1,34 @@ +## Wire format for a single segment, per the Reliable Channel API spec. +## +## Skeleton: encode/decode treat the segment as just its payload bytes, +## since for now we only ever produce a single segment per send. + +type SegmentMessageProto* = object + entireMessageHash*: seq[byte] ## Keccak256(original payload), 32 bytes + dataSegmentIndex*: uint32 ## zero-indexed sequence number for data segments + dataSegmentCount*: uint32 ## number of data segments (>= 1) + payload*: seq[byte] ## segment payload (data or parity shard) + paritySegmentIndex*: uint32 ## zero-based sequence number for parity segments + paritySegmentCount*: uint32 ## number of parity segments + isParity*: bool ## true for parity segments, false (default) for data segments + +proc isParityMessage*(self: SegmentMessageProto): bool = + self.isParity + +proc isValid*(self: SegmentMessageProto): bool = + ## Validates hash length (32 bytes), segment indices and counts. + discard + +proc encode*(self: SegmentMessageProto): seq[byte] = + self.payload + +proc decode*(T: type SegmentMessageProto, buf: seq[byte]): T = + T( + entireMessageHash: @[], + dataSegmentIndex: 0, + dataSegmentCount: 1, + payload: buf, + paritySegmentIndex: 0, + paritySegmentCount: 0, + isParity: false, + ) diff --git a/channels/segmentation/segmentation.nim b/channels/segmentation/segmentation.nim new file mode 100644 index 000000000..9fc7964c0 --- /dev/null +++ b/channels/segmentation/segmentation.nim @@ -0,0 +1,70 @@ +## Segmentation component for the Reliable Channel API. +## +## Splits large application payloads into transmittable segments and +## reassembles them on reception. Supports optional Reed-Solomon parity +## segments for loss recovery, as per the Reliable Channel API spec. +## +## For the skeleton everything fits in a single segment: real chunking +## and Reed-Solomon parity will be plugged in later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/options +import ./segment_message_proto +import ./segmentation_persistence + +export segment_message_proto, segmentation_persistence + +const + DefaultSegmentSizeBytes* = 102_400 + SegmentsParityRate* = 0.125 + SegmentsReedSolomonMaxCount* = 256 + +type + SegmentationConfig* = object + segmentSizeBytes*: int + enableReedSolomon*: bool + persistence*: SegmentationPersistence + + SegmentationHandler* = ref object + config*: SegmentationConfig + + ReassemblyResult* = object + payload*: seq[byte] + entireMessageHash*: seq[byte] + +proc new*(T: type SegmentationHandler, config: SegmentationConfig): T = + return T(config: config) + +proc performSegmentation*( + self: SegmentationHandler, payload: seq[byte] +): seq[seq[byte]] = + ## Skeleton behaviour: emit exactly one segment carrying the whole + ## payload. Real chunking and Reed-Solomon parity will replace this. + let segment = SegmentMessageProto( + entireMessageHash: @[], + dataSegmentIndex: 0, + dataSegmentCount: 1, + payload: payload, + paritySegmentIndex: 0, + paritySegmentCount: 0, + isParity: false, + ) + return @[segment.encode()] + +proc handleIncomingSegment*( + self: SegmentationHandler, segmentBytes: seq[byte] +): Option[ReassemblyResult] = + ## Skeleton behaviour: every segment is already a complete message + ## (since `performSegmentation` always emits one), so just hand the + ## payload straight back. + let segment = SegmentMessageProto.decode(segmentBytes) + return some( + ReassemblyResult( + payload: segment.payload, entireMessageHash: segment.entireMessageHash + ) + ) + +proc cleanupSegments*(self: SegmentationHandler) = + ## Drop expired partial-reassembly state. + discard diff --git a/channels/segmentation/segmentation_persistence.nim b/channels/segmentation/segmentation_persistence.nim new file mode 100644 index 000000000..cc34c36d2 --- /dev/null +++ b/channels/segmentation/segmentation_persistence.nim @@ -0,0 +1,20 @@ +## Persistence backend interface for segmentation reassembly state. +## +## Allows partial reassembly state to survive process restarts. + +type + SegmentationPersistenceKind* {.pure.} = enum + InMemory + Sqlite + + SegmentationPersistence* = ref object of RootObj + kind*: SegmentationPersistenceKind + +method put*(self: SegmentationPersistence, key: seq[byte], value: seq[byte]) {.base.} = + discard + +method get*(self: SegmentationPersistence, key: seq[byte]): seq[byte] {.base.} = + discard + +method delete*(self: SegmentationPersistence, key: seq[byte]) {.base.} = + discard diff --git a/channels/types.nim b/channels/types.nim new file mode 100644 index 000000000..4070ed620 --- /dev/null +++ b/channels/types.nim @@ -0,0 +1,15 @@ +## Core identifier types for the Reliable Channel API. + +import std/hashes +import waku/api/types as api_types + +import ./scalable_data_sync/scalable_data_sync + +export scalable_data_sync +export api_types + +type ChannelId* = SdsChannelID + +proc hash*(r: RequestId): Hash = + ## Allows `RequestId` to be used as a `Table` key. + hash(string(r)) diff --git a/flake.lock b/flake.lock index 8d0db9269..411bf2430 100644 --- a/flake.lock +++ b/flake.lock @@ -19,8 +19,7 @@ "root": { "inputs": { "nixpkgs": "nixpkgs", - "rust-overlay": "rust-overlay", - "zerokit": "zerokit" + "rust-overlay": "rust-overlay" } }, "rust-overlay": { @@ -42,47 +41,6 @@ "repo": "rust-overlay", "type": "github" } - }, - "rust-overlay_2": { - "inputs": { - "nixpkgs": [ - "zerokit", - "nixpkgs" - ] - }, - "locked": { - "lastModified": 1771211437, - "narHash": "sha256-lcNK438i4DGtyA+bPXXyVLHVmJjYpVKmpux9WASa3ro=", - "owner": "oxalica", - "repo": "rust-overlay", - "rev": "c62195b3d6e1bb11e0c2fb2a494117d3b55d410f", - "type": "github" - }, - "original": { - "owner": "oxalica", - "repo": "rust-overlay", - "type": "github" - } - }, - "zerokit": { - "inputs": { - "nixpkgs": [ - "nixpkgs" - ], - "rust-overlay": "rust-overlay_2" - }, - "locked": { - "owner": "vacp2p", - "repo": "zerokit", - "rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63", - "type": "github" - }, - "original": { - "owner": "vacp2p", - "repo": "zerokit", - "rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63", - "type": "github" - } } }, "root": "root", diff --git a/flake.nix b/flake.nix index 077668b9a..b32a53455 100644 --- a/flake.nix +++ b/flake.nix @@ -17,19 +17,9 @@ url = "github:oxalica/rust-overlay"; inputs.nixpkgs.follows = "nixpkgs"; }; - - # External flake input: Zerokit pinned to a specific commit. - # Update the rev here when a new zerokit version is needed. - zerokit = { - # Pinned to v2.0.2 (5e64cb8822bee65eed6cf459f95ae72b80c6ba63) to match - # the vendor/zerokit submodule. Keep these two in sync: the nix build - # links librln from this input, the Makefile build from the submodule. - url = "github:vacp2p/zerokit/5e64cb8822bee65eed6cf459f95ae72b80c6ba63"; - inputs.nixpkgs.follows = "nixpkgs"; - }; }; - outputs = { self, nixpkgs, rust-overlay, zerokit }: + outputs = { self, nixpkgs, rust-overlay }: let systems = [ "x86_64-linux" "aarch64-linux" @@ -69,19 +59,78 @@ inherit system; overlays = [ (import rust-overlay) nimbleOverlay ]; }; + + # Prebuilt zerokit librln, fetched from the upstream GitHub release + # rather than compiled from source. Compiling zerokit makes Nix download + # its many crate dependencies from crates.io in one parallel burst, which + # crates.io intermittently rejects with HTTP 403 (rate limiting from the + # self-hosted runners' shared IP), breaking the nix build. The release + # ships the exact `stateless` library this project links (see + # scripts/build_rln.sh), so we use it directly — no Rust toolchain and + # no crates.io access needed. + # + # Keep `rlnVersion` aligned with `LIBRLN_VERSION` in the Makefile and the + # vendor/zerokit submodule. Each hash is the sha256 of the release tarball + # for that platform; refresh all four when bumping the version. + rlnVersion = "v2.0.2"; + rlnAssets = { + "x86_64-linux" = { triple = "x86_64-unknown-linux-gnu"; hash = "sha256-qbrUdaetYKFhjzxUP/QcwD3JHWJ8qk/tCMK3yXceIAk="; }; + "aarch64-linux" = { triple = "aarch64-unknown-linux-gnu"; hash = "sha256-s4bWrmCcNTWHNyJwV73ilWNp58ZdAVG+TAgtWN1cTQs="; }; + "x86_64-darwin" = { triple = "x86_64-apple-darwin"; hash = "sha256-ZaHP5CApN66FYY7jxwOmGcF9kJR78Fng3k1qE2W08Mk="; }; + "aarch64-darwin" = { triple = "aarch64-apple-darwin"; hash = "sha256-f2YppkPsKFdN00j+IY8fpvsebWTIb9lW/V1/vOTiVKU="; }; + }; + + mkZerokitRln = system: pkgs: + let + asset = rlnAssets.${system} or + (throw "zerokit ${rlnVersion} has no prebuilt rln asset for system '${system}'"); + in pkgs.stdenv.mkDerivation { + pname = "librln"; + version = lib.removePrefix "v" rlnVersion; + + src = pkgs.fetchurl { + url = "https://github.com/vacp2p/zerokit/releases/download/" + + "${rlnVersion}/${asset.triple}-stateless-rln.tar.gz"; + hash = asset.hash; + }; + + # The tarball lays its files out under release/. + sourceRoot = "release"; + dontConfigure = true; + dontBuild = true; + + # The release .so was linked outside Nix, so it references system + # libraries (libgcc_s, libstdc++, glibc) by bare name. autoPatchelfHook + # points those at the Nix versions so the library loads correctly when + # used by the Nix build. It does nothing for the static .a, and the + # step is skipped on macOS (dylib paths are fixed in nix/default.nix). + nativeBuildInputs = + pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.autoPatchelfHook ]; + buildInputs = + pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.stdenv.cc.cc.lib ]; + + installPhase = '' + runHook preInstall + mkdir -p $out/lib + cp librln.a $out/lib/ 2>/dev/null || true + cp librln.so $out/lib/ 2>/dev/null || true + cp librln.dylib $out/lib/ 2>/dev/null || true + runHook postInstall + ''; + + meta = with pkgs.lib; { + description = "Prebuilt zerokit RLN library (stateless flavor)"; + homepage = "https://github.com/vacp2p/zerokit"; + license = with licenses; [ mit asl20 ]; + platforms = builtins.attrNames rlnAssets; + }; + }; in { packages = forAllSystems (system: let pkgs = pkgsFor system; - # HACK: Fix for stale cargoHash in 2.0.2 release. - zerokitRln = zerokit.packages.${system}.rln.overrideAttrs (old: { - cargoDeps = old.cargoDeps.overrideAttrs (oldCargoDeps: { - vendorStaging = oldCargoDeps.vendorStaging.overrideAttrs (_: { - outputHash = "sha256-PNwEdZLgGQPqQDrEK2hsQtSybVfBbD6xn4K47fPFJUU="; - }); - }); - }); + zerokitRln = mkZerokitRln system pkgs; liblogosdelivery = pkgs.callPackage ./nix/default.nix { inherit pkgs; @@ -89,12 +138,18 @@ inherit zerokitRln; gitVersion = "v${nimbleVersion}-g${builtins.substring 0 6 shortRev}"; }; + + wakucanary = pkgs.callPackage ./nix/default.nix { + inherit pkgs; + src = ./.; + targets = ["wakucanary"]; + inherit zerokitRln; + }; in { - inherit liblogosdelivery; - # Expose the cargoHash-corrected librln so downstream consumers + inherit liblogosdelivery wakucanary; + # Expose the prebuilt librln so downstream consumers # (e.g. logos-delivery-module) bundle the exact same librln this - # build links, instead of pulling zerokit's rln directly — whose - # committed cargoHash is stale for v2.0.2 (see zerokitRln above). + # build links against. rln = zerokitRln; default = liblogosdelivery; } diff --git a/nix/default.nix b/nix/default.nix index 7b7989e1a..ec9e0542c 100644 --- a/nix/default.nix +++ b/nix/default.nix @@ -1,6 +1,7 @@ { pkgs , src , zerokitRln +, targets ? [] , gitVersion ? "n/a" , enablePostgres ? true , enableNimDebugDlOpen ? true @@ -10,6 +11,8 @@ let deps = import ./deps.nix { inherit pkgs; }; + buildWakucanary = builtins.elem "wakucanary" targets; + nimDefineArgs = pkgs.lib.concatStringsSep " \\\n " ( [ "--define:disable_libbacktrace" "--define:git_version=${gitVersion}" ] @@ -34,9 +37,29 @@ let if pkgs.stdenv.hostPlatform.isWindows then "dll" else if pkgs.stdenv.hostPlatform.isDarwin then "dylib" else "so"; + + # Shared `nim c` invocation. Callers vary the output, the source file and a + # few mode-specific flags (e.g. --app:lib, --noMain, --header); everything + # else (paths, defines, threading, gc, nimcache, rln linkage) is constant. + # $NAT_TRAV and $NIMCACHE are shell variables defined in buildPhase. + nimCompile = { outFile, sourceFile, extraArgs ? [] }: '' + nim c \ + --noNimblePath \ + ${pathArgs} \ + --path:$NAT_TRAV \ + --path:$NAT_TRAV/src \ + --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ + ${nimDefineArgs} \ + --threads:on \ + --mm:refc \ + --nimcache:$NIMCACHE \ + --out:${outFile} \ + ${pkgs.lib.concatStringsSep " \\\n " extraArgs} \ + ${sourceFile} + ''; in pkgs.stdenv.mkDerivation { - pname = "liblogosdelivery"; + pname = if buildWakucanary then "wakucanary" else "liblogosdelivery"; version = "dev"; inherit src; @@ -71,45 +94,47 @@ pkgs.stdenv.mkDerivation { make -C $NAT_TRAV/vendor/libnatpmp-upstream \ CFLAGS="-Wall -Os -fPIC -DENABLE_STRNATPMPERR -DNATPMP_MAX_RETRIES=4" libnatpmp.a + ${if buildWakucanary then '' + echo "== Building wakucanary ==" + ${nimCompile { + outFile = "build/wakucanary"; + sourceFile = "apps/wakucanary/wakucanary.nim"; + extraArgs = [ "--path:." ]; + }} + '' else '' echo "== Building liblogosdelivery (dynamic) ==" - nim c \ - --noNimblePath \ - ${pathArgs} \ - --path:$NAT_TRAV \ - --path:$NAT_TRAV/src \ - --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ - ${nimDefineArgs} \ - --out:build/liblogosdelivery.${libExt} \ - --app:lib \ - --threads:on \ - --opt:size \ - --noMain \ - --mm:refc \ - --header \ - --nimMainPrefix:liblogosdelivery \ - --nimcache:$NIMCACHE \ - liblogosdelivery/liblogosdelivery.nim + ${nimCompile { + outFile = "build/liblogosdelivery.${libExt}"; + sourceFile = "liblogosdelivery/liblogosdelivery.nim"; + extraArgs = [ + "--app:lib" + "--opt:size" + "--noMain" + "--header" + "--nimMainPrefix:liblogosdelivery" + ]; + }} echo "== Building liblogosdelivery (static) ==" - nim c \ - --noNimblePath \ - ${pathArgs} \ - --path:$NAT_TRAV \ - --path:$NAT_TRAV/src \ - --passL:"-L${zerokitRln}/lib -lrln${pkgs.lib.optionalString pkgs.stdenv.isLinux " -lstdc++"}" \ - ${nimDefineArgs} \ - --out:build/liblogosdelivery.a \ - --app:staticlib \ - --threads:on \ - --opt:size \ - --noMain \ - --mm:refc \ - --nimMainPrefix:liblogosdelivery \ - --nimcache:$NIMCACHE \ - liblogosdelivery/liblogosdelivery.nim + ${nimCompile { + outFile = "build/liblogosdelivery.a"; + sourceFile = "liblogosdelivery/liblogosdelivery.nim"; + extraArgs = [ + "--app:staticlib" + "--opt:size" + "--noMain" + "--nimMainPrefix:liblogosdelivery" + ]; + }} + ''} ''; - installPhase = '' + installPhase = if buildWakucanary then '' + runHook preInstall + mkdir -p $out/bin $out/lib + cp build/wakucanary $out/bin/ + runHook postInstall + '' else '' runHook preInstall mkdir -p $out/lib $out/include cp build/liblogosdelivery.${libExt} $out/lib/ 2>/dev/null || true @@ -118,21 +143,47 @@ pkgs.stdenv.mkDerivation { runHook postInstall ''; - # Bundle librln alongside liblogosdelivery so the output is self-contained. + # Bundle librln alongside the produced artifact so the output is self-contained. # Use --add-rpath (not --set-rpath) so fixupPhase's stdenv RUNPATH injection # for libstdc++ is preserved. postInstall = - pkgs.lib.optionalString pkgs.stdenv.isDarwin '' - cp ${zerokitRln}/lib/librln.dylib $out/lib/ - chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib - old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln) - install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib - install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib - '' - + pkgs.lib.optionalString pkgs.stdenv.isLinux '' - cp ${zerokitRln}/lib/librln.so $out/lib/ - patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so - ''; + if buildWakucanary then + pkgs.lib.optionalString pkgs.stdenv.isDarwin '' + cp ${zerokitRln}/lib/librln.dylib $out/lib/ + chmod +w $out/lib/librln.dylib $out/bin/wakucanary + install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib + old=$(otool -L $out/bin/wakucanary | awk 'NR>1{print $1}' | grep librln || true) + if [ -n "$old" ]; then + install_name_tool -change "$old" @rpath/librln.dylib $out/bin/wakucanary + fi + install_name_tool -add_rpath @loader_path/../lib $out/bin/wakucanary + '' + + pkgs.lib.optionalString pkgs.stdenv.isLinux '' + cp ${zerokitRln}/lib/librln.so $out/lib/ + patchelf --add-rpath '$ORIGIN/../lib' $out/bin/wakucanary + '' + else + pkgs.lib.optionalString pkgs.stdenv.isDarwin '' + cp ${zerokitRln}/lib/librln.dylib $out/lib/ + chmod +w $out/lib/librln.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -id @rpath/liblogosdelivery.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -id @rpath/librln.dylib $out/lib/librln.dylib + old=$(otool -L $out/lib/liblogosdelivery.dylib | awk 'NR>1{print $1}' | grep librln) + install_name_tool -change "$old" @rpath/librln.dylib $out/lib/liblogosdelivery.dylib + install_name_tool -add_rpath @loader_path $out/lib/liblogosdelivery.dylib + '' + + pkgs.lib.optionalString pkgs.stdenv.isLinux '' + cp ${zerokitRln}/lib/librln.so $out/lib/ + patchelf --add-rpath '$ORIGIN' $out/lib/liblogosdelivery.so + ''; + + meta = with pkgs.lib; { + description = + if buildWakucanary + then "Waku network canary tool" + else "logos-delivery shared/static library"; + homepage = "https://github.com/logos-messaging/logos-delivery"; + license = licenses.mit; + platforms = platforms.unix; + }; } diff --git a/scripts/build_rln.sh b/scripts/build_rln.sh index 35b5b8953..b028885e2 100755 --- a/scripts/build_rln.sh +++ b/scripts/build_rln.sh @@ -1,8 +1,15 @@ #!/usr/bin/env bash -# This script is used to build the rln library for the current platform. -# Previously downloaded prebuilt binaries, but due to compatibility issues -# we now always build from source. +# Provides the rln static library for the current platform. +# +# If zerokit publishes a prebuilt `stateless` release asset for this platform, +# download and use it: that is faster than compiling and avoids fetching +# zerokit's many crate dependencies from crates.io. The asset is selected by +# the Rust host target triple (the platform identifier reported by rustc, +# e.g. x86_64-unknown-linux-gnu or aarch64-apple-darwin). +# +# When no matching asset exists (e.g. Windows), build from the vendored +# zerokit submodule instead. set -e @@ -15,8 +22,26 @@ output_filename=$3 [[ -z "${rln_version}" ]] && { echo "No rln version specified"; exit 1; } [[ -z "${output_filename}" ]] && { echo "No output filename specified"; exit 1; } -echo "Building RLN library from source (version ${rln_version})..." +# --- Prefer the prebuilt release asset -------------------------------------- +# Host target triple, e.g. x86_64-unknown-linux-gnu / aarch64-apple-darwin. +host_triplet=$(rustc --version --verbose | awk '/host:/{print $2}') +tarball="${host_triplet}-stateless-rln.tar.gz" +url="https://github.com/vacp2p/zerokit/releases/download/${rln_version}/${tarball}" +echo "Looking for prebuilt RLN: ${url}" +if curl --silent --fail-with-body -L "${url}" -o "${tarball}"; then + echo "Downloaded prebuilt ${tarball}" + tar -xzf "${tarball}" + mv "release/librln.a" "${output_filename}" + rm -rf "${tarball}" release + echo "Using prebuilt ${output_filename}" + exit 0 +fi +# curl --fail-with-body writes the error body to the file on HTTP failure. +rm -f "${tarball}" +echo "No prebuilt asset for ${host_triplet} at ${rln_version}; building from source." + +# --- Fall back to building from the vendored submodule ---------------------- # Check if submodule version = version in Makefile cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" @@ -33,7 +58,6 @@ if [[ "v${submodule_version}" != "${rln_version}" ]]; then exit 1 fi -# Build rln from source. # `stateless` feature: logos-delivery does not maintain a local Merkle tree # (post-PR #3312); the contract is the source of truth and the path is fetched # via getMerkleProof(index). The stateless build compiles out tree code. diff --git a/scripts/install_nim.sh b/scripts/install_nim.sh index 3802fb85b..b75826378 100755 --- a/scripts/install_nim.sh +++ b/scripts/install_nim.sh @@ -45,6 +45,13 @@ if [ -n "${nim_ver}" ]; then echo "INFO: Nim ${nim_ver} found in PATH; installing Nim ${NIM_VERSION} to ${NIM_DEST}." >&2 fi +<<<<<<< HEAD +======= +OS=$(uname -s | tr 'A-Z' 'a-z' | sed 's/darwin/macosx/') +ARCH=$(uname -m | sed 's/x86_64/x64/;s/aarch64/arm64/') + +BINARY_URL="https://nim-lang.org/download/nim-${NIM_VERSION}-${OS}_${ARCH}.tar.xz" +>>>>>>> master WORK_DIR=$(mktemp -d) trap 'rm -rf "${WORK_DIR}"' EXIT @@ -58,6 +65,7 @@ MINGW* | MSYS* | CYGWIN*) esac BINARY_URL="https://nim-lang.org/download/nim-${NIM_VERSION}_${WIN_ARCH}.zip" +<<<<<<< HEAD echo "Downloading pre-built Nim ${NIM_VERSION} (windows_${WIN_ARCH}) from ${BINARY_URL}..." curl -fL "${BINARY_URL}" -o "${WORK_DIR}/nim.zip" unzip -q "${WORK_DIR}/nim.zip" -d "${WORK_DIR}" @@ -89,6 +97,28 @@ MINGW* | MSYS* | CYGWIN*) ;; esac +# rm -rf can fail with "Directory not empty" on overlay filesystems (e.g. Docker). +# Using cp -r src/. dst/ handles both cases: dst absent (clean) or partially present. +rm -rf "${NIM_DEST}" 2>/dev/null || true +mkdir -p "${NIM_DEST}" +cp -r "${SRC_DIR}/." "${NIM_DEST}/" +======= +if [ "${HTTP_STATUS}" = "200" ]; then + echo "Downloading pre-built binary from ${BINARY_URL}..." + curl -fL "${BINARY_URL}" -o "${WORK_DIR}/nim.tar.xz" + tar -xJf "${WORK_DIR}/nim.tar.xz" -C "${WORK_DIR}" + SRC_DIR="${WORK_DIR}/nim-${NIM_VERSION}" +else + echo "No pre-built binary found for ${OS}_${ARCH}. Building from source..." + SRC_URL="https://github.com/nim-lang/Nim/archive/refs/tags/v${NIM_VERSION}.tar.gz" + curl -fL "${SRC_URL}" -o "${WORK_DIR}/nim-src.tar.gz" + tar -xzf "${WORK_DIR}/nim-src.tar.gz" -C "${WORK_DIR}" + cd "${WORK_DIR}/Nim-${NIM_VERSION}" + sh build_all.sh + SRC_DIR="${WORK_DIR}/Nim-${NIM_VERSION}" +fi +>>>>>>> master + # rm -rf can fail with "Directory not empty" on overlay filesystems (e.g. Docker). # Using cp -r src/. dst/ handles both cases: dst absent (clean) or partially present. rm -rf "${NIM_DEST}" 2>/dev/null || true diff --git a/scripts/install_nimble.sh b/scripts/install_nimble.sh index c43011e7b..871f6d784 100755 --- a/scripts/install_nimble.sh +++ b/scripts/install_nimble.sh @@ -19,6 +19,7 @@ if [ -z "${NIMBLE_VERSION}" ]; then exit 1 fi +<<<<<<< HEAD # On Windows (MSYS2) the binaries carry a .exe extension. EXE="" case "$(uname -s)" in @@ -26,6 +27,9 @@ MINGW* | MSYS* | CYGWIN*) EXE=".exe" ;; esac NIMBLE_BIN="${HOME}/.nimble/bin/nimble${EXE}" +======= +NIMBLE_BIN="${HOME}/.nimble/bin/nimble" +>>>>>>> master # 1. Already installed at the right version? if [ -x "${NIMBLE_BIN}" ]; then @@ -38,7 +42,11 @@ if [ -x "${NIMBLE_BIN}" ]; then fi # 2. Already compiled into pkgs2/ from a previous (possibly partial) run? +<<<<<<< HEAD PKGS2_NIMBLE=$(ls -dt "${HOME}/.nimble/pkgs2/nimble-${NIMBLE_VERSION}-"*/nimble${EXE} \ +======= +PKGS2_NIMBLE=$(ls -dt "${HOME}/.nimble/pkgs2/nimble-${NIMBLE_VERSION}-"*/nimble \ +>>>>>>> master 2>/dev/null | head -1 || true) if [ -n "${PKGS2_NIMBLE}" ] && [ -x "${PKGS2_NIMBLE}" ]; then echo "Nimble ${NIMBLE_VERSION} found in pkgs2, re-linking to ${NIMBLE_BIN}." @@ -48,7 +56,11 @@ if [ -n "${PKGS2_NIMBLE}" ] && [ -x "${PKGS2_NIMBLE}" ]; then fi # 3. Build from source. +<<<<<<< HEAD NIM_BIN="${HOME}/.nimble/bin/nim${EXE}" +======= +NIM_BIN="${HOME}/.nimble/bin/nim" +>>>>>>> master if [ ! -x "${NIM_BIN}" ]; then NIM_BIN="$(command -v nim)" fi @@ -66,11 +78,19 @@ echo "Building nimble ${NIMBLE_VERSION} with $("${NIM_BIN}" --version | head -1) cd "${WORK_DIR}/nimble" # nim reads nim.cfg / config.nims in the current dir, which sets vendor paths. "${NIM_BIN}" c -d:release --path:src \ +<<<<<<< HEAD -o:"${WORK_DIR}/nimble_new${EXE}" src/nimble.nim mkdir -p "${HOME}/.nimble/bin" # Atomic rename: avoids ETXTBSY when the old binary at NIMBLE_BIN is still running. cp "${WORK_DIR}/nimble_new${EXE}" "${NIMBLE_BIN}.new.$$" +======= + -o:"${WORK_DIR}/nimble_new" src/nimble.nim + +mkdir -p "${HOME}/.nimble/bin" +# Atomic rename: avoids ETXTBSY when the old binary at NIMBLE_BIN is still running. +cp "${WORK_DIR}/nimble_new" "${NIMBLE_BIN}.new.$$" +>>>>>>> master mv -f "${NIMBLE_BIN}.new.$$" "${NIMBLE_BIN}" echo "Nimble ${NIMBLE_VERSION} installed to ${NIMBLE_BIN}" \ No newline at end of file diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index e64922f4c..963a948a3 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -88,3 +88,6 @@ import ./tools/test_all # Persistency library tests import ./persistency/test_all + +# Reliable Channel API tests +import ./channels/test_all diff --git a/tests/channels/test_all.nim b/tests/channels/test_all.nim new file mode 100644 index 000000000..04b448707 --- /dev/null +++ b/tests/channels/test_all.nim @@ -0,0 +1,3 @@ +{.used.} + +import ./test_reliable_channel_send_receive diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim new file mode 100644 index 000000000..2f49182a2 --- /dev/null +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -0,0 +1,317 @@ +{.used.} + +import std/[net] +import chronos, testutils/unittests, stew/byteutils +import brokers/broker_context + +import ../testlib/[common, wakucore, wakunode, testasync] + +import waku +import waku/[waku_node, waku_core] +import waku/factory/waku_conf +import waku/events/message_events as waku_message_events +import tools/confutils/cli_args + +import channels/reliable_channel_manager +import channels/encryption/noop_encryption + +const TestTimeout = chronos.seconds(15) + +proc createApiNodeConf(): WakuNodeConf = + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = cli_args.WakuMode.Core + conf.listenAddress = parseIpAddress("0.0.0.0") + conf.tcpPort = Port(0) + conf.discv5UdpPort = Port(0) + conf.clusterId = 3'u16 + conf.numShardsInNetwork = 1 + conf.reliabilityEnabled = true + conf.rest = false + return conf + +suite "Reliable Channel - ingress": + asyncTest "manager dispatches marked WakuMessage to the right channel": + ## Unit test for the receive side of the API: instead of standing + ## up two libp2p nodes and a relay mesh, we drive the manager + ## directly by emitting a `MessageReceivedEvent` (the exact event + ## the DeliveryService emits when a `WakuMessage` arrives off the + ## wire). The manager must: + ## - drop traffic missing the Reliable Channel spec marker + ## - dispatch the matching channel's `onMessageReceived` + ## - emit `ChannelMessageReceivedEvent` with the payload + const + channelId = ChannelId("test-channel") + contentTopic = ContentTopic("/reliable-channel/test/proto") + let appPayload = "hello reliable channel".toBytes() + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + ## Noop encryption providers so the Encrypt/Decrypt brokers have + ## something to dispatch to; without this the channel falls back to + ## plaintext anyway, but installing them is the documented setup. + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + let received = newFuture[seq[byte]]("channel-message-received") + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if not received.finished() and evt.channelId == channelId: + received.complete(evt.payload) + , + ) + .expect("listen ChannelMessageReceivedEvent") + + ## Build a `WakuMessage` that looks like one that came in off the + ## wire from a peer: the spec marker on `meta` plus the right content + ## topic. The manager's ingress listener should pick it up, + ## decrypt (noop), unwrap SDS (pass-through), reassemble (one + ## segment), and finally emit `ChannelMessageReceivedEvent`. + let inboundMsg = WakuMessage( + payload: appPayload, + contentTopic: contentTopic, + version: 0, + meta: LipWireReliableChannelVersion.toBytes(), + ) + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + ) + + let arrived = await received.withTimeout(TestTimeout) + check arrived + if arrived: + check received.read() == appPayload + + await manager.stop() + + asyncTest "manager drops unmarked WakuMessage": + ## Mirror of the above: same content topic, but `meta` is empty + ## (i.e. foreign traffic). The channel-level event must NOT fire. + const + channelId = ChannelId("test-channel-2") + contentTopic = ContentTopic("/reliable-channel/test/proto") + let appPayload = "foreign payload".toBytes() + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + var fired = false + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + fired = true + , + ) + .expect("listen ChannelMessageReceivedEvent") + + let inboundMsg = WakuMessage( + payload: appPayload, + contentTopic: contentTopic, + version: 0, + meta: @[], ## no Reliable Channel spec marker + ) + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + ) + + ## Give the event broker a chance to fan out. + await sleepAsync(100.milliseconds) + check not fired + + await manager.stop() + +suite "Reliable Channel - send state machine": + asyncTest "MessageSentEvent finalises the channelReqId as Sent": + ## Drives the real send pipeline (`send` -> segmentation -> SDS -> + ## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that + ## returns a canned `RequestId` instead of hitting the network. + ## Emitting the delivery-layer `MessageSentEvent` must drive the + ## channel-level state machine through `Confirmed` and produce a + ## `ChannelMessageSentEvent` (channel-level terminal event) for the + ## `channelReqId` returned by `send()`. + const + channelId = ChannelId("sm-success-channel") + contentTopic = ContentTopic("/reliable-channel/test/sm-success") + fakeMsgReqId = RequestId("fake-msg-req-1") + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + var sendCalls = 0 + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + sendCalls.inc + return ok(fakeMsgReqId) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + let sentFut = newFuture[RequestId]("channel-sent") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + + let channelReqId = manager.send(channelId, "hello".toBytes()).expect("send") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and sendCalls == 0: + await sleepAsync(5.milliseconds) + check sendCalls == 1 + + waku_message_events.MessageSentEvent.emit( + brokerCtx, + waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""), + ) + + let finalised = await sentFut.withTimeout(1.seconds) + check finalised + if finalised: + check sentFut.read() == channelReqId + + await manager.stop() + + asyncTest "two independent channelReqIds are finalised independently": + ## Two `send()` calls -> two independent `channelReqId`s, each with + ## one segment under the current segmentation skeleton + ## (`performSegmentation` always emits exactly one segment). The + ## fake `SendHandler` returns distinct `messagingReqId`s; finalising + ## the first emits `ChannelMessageSentEvent` for its `channelReqId`, + ## finalising the second as a failure emits `ChannelMessageErrorEvent` + ## for the other. + const + channelId = ChannelId("sm-multi-channel") + contentTopic = ContentTopic("/reliable-channel/test/sm-multi") + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + var msgReqIds: seq[RequestId] + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1)) + msgReqIds.add(id) + return ok(id) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + let sentFut = newFuture[RequestId]("channel-sent") + let erroredFut = newFuture[RequestId]("channel-errored") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + discard ChannelMessageErrorEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageErrorEvent) {.async: (raises: []).} = + if not erroredFut.finished() and evt.channelId == channelId: + erroredFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageErrorEvent") + + let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1") + let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and msgReqIds.len < 2: + await sleepAsync(5.milliseconds) + check msgReqIds.len == 2 + + waku_message_events.MessageSentEvent.emit( + brokerCtx, + waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""), + ) + let sentArrived = await sentFut.withTimeout(1.seconds) + check sentArrived + if sentArrived: + check sentFut.read() == channelReqId1 + ## The second `channelReqId` must NOT have finalised yet — its + ## segment is still `InFlight`. + check not erroredFut.finished() + + waku_message_events.MessageErrorEvent.emit( + brokerCtx, + waku_message_events.MessageErrorEvent( + requestId: msgReqIds[1], messageHash: "", error: "synthetic" + ), + ) + let erroredArrived = await erroredFut.withTimeout(1.seconds) + check erroredArrived + if erroredArrived: + check erroredFut.read() == channelReqId2 + + await manager.stop() + + asyncTest "TODO: channelReqId not pruned until ALL its segments are final": + ## Placeholder for the multi-sibling prune rule. Today's + ## `performSegmentation` (segmentation skeleton) always emits + ## exactly one segment per `send()`, so multiple siblings under one + ## `channelReqId` cannot be produced through the real pipeline. + ## Implement once segmentation does real chunking: send a payload + ## larger than `DefaultSegmentSizeBytes`, capture the N + ## `messagingReqId`s from a fake `SendHandler`, finalise some, and + ## assert prune only fires once every sibling is final. + skip() diff --git a/tests/node/peer_manager/peer_store/test_migrations.nim b/tests/node/peer_manager/peer_store/test_migrations.nim index a20d065ec..d6b86a15b 100644 --- a/tests/node/peer_manager/peer_store/test_migrations.nim +++ b/tests/node/peer_manager/peer_store/test_migrations.nim @@ -1,11 +1,11 @@ -import std/[options], stew/results, testutils/unittests +import std/[options], results, testutils/unittests import waku/node/peer_manager/peer_store/migrations, ../../waku_archive/archive_utils, ../../testlib/[simple_mock] -import std/[tables, strutils, os], stew/results, chronicles +import std/[tables, strutils, os], results, chronicles import waku/common/databases/db_sqlite, waku/common/databases/common diff --git a/tests/node/peer_manager/peer_store/test_peer_storage.nim b/tests/node/peer_manager/peer_store/test_peer_storage.nim index c8a479178..871df8644 100644 --- a/tests/node/peer_manager/peer_store/test_peer_storage.nim +++ b/tests/node/peer_manager/peer_store/test_peer_storage.nim @@ -1,4 +1,4 @@ -import stew/results, testutils/unittests +import results, testutils/unittests import waku/node/peer_manager/peer_store/peer_storage, waku/waku_core/peers diff --git a/tests/node/test_wakunode_relay_rln.nim b/tests/node/test_wakunode_relay_rln.nim index c8ca9b43d..3a2a8a67c 100644 --- a/tests/node/test_wakunode_relay_rln.nim +++ b/tests/node/test_wakunode_relay_rln.nim @@ -2,7 +2,7 @@ import std/[tempfiles, strutils, options], - stew/results, + results, testutils/unittests, chronos, libp2p/switch, diff --git a/tests/test_utils_compat.nim b/tests/test_utils_compat.nim index 121efa4a5..1394982ef 100644 --- a/tests/test_utils_compat.nim +++ b/tests/test_utils_compat.nim @@ -1,7 +1,7 @@ {.used.} import testutils/unittests -import stew/results, waku/waku_core/message, waku/waku_core/time, ./testlib/common +import results, waku/waku_core/message, waku/waku_core/time, ./testlib/common suite "Waku Payload": test "Encode/Decode waku message with timestamp": diff --git a/tests/waku_enr/test_sharding.nim b/tests/waku_enr/test_sharding.nim index 344436d0e..789f8faec 100644 --- a/tests/waku_enr/test_sharding.nim +++ b/tests/waku_enr/test_sharding.nim @@ -1,7 +1,7 @@ {.used.} import - stew/results, + results, chronos, testutils/unittests, libp2p/crypto/crypto as libp2p_keys, diff --git a/waku/api/types.nim b/waku/api/types.nim index 9eae503c8..2b7edd616 100644 --- a/waku/api/types.nim +++ b/waku/api/types.nim @@ -11,6 +11,10 @@ type contentTopic*: ContentTopic payload*: seq[byte] ephemeral*: bool + meta*: seq[byte] + ## Opaque wire-format marker carried on the underlying WakuMessage. + ## Higher layers (e.g. Reliable Channel) stamp this so peers can route + ## ingress traffic to their corresponding layer. Empty by default. RequestId* = distinct string @@ -34,12 +38,18 @@ proc init*( contentTopic: ContentTopic, payload: seq[byte] | string, ephemeral: bool = false, + meta: seq[byte] = @[], ): MessageEnvelope = when payload is seq[byte]: - MessageEnvelope(contentTopic: contentTopic, payload: payload, ephemeral: ephemeral) + MessageEnvelope( + contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta + ) else: MessageEnvelope( - contentTopic: contentTopic, payload: payload.toBytes(), ephemeral: ephemeral + contentTopic: contentTopic, + payload: payload.toBytes(), + ephemeral: ephemeral, + meta: meta, ) proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = @@ -48,6 +58,7 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = contentTopic: envelope.contentTopic, payload: envelope.payload, ephemeral: envelope.ephemeral, + meta: envelope.meta, timestamp: getNowInNanosecondTime(), ) diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index 902f3aa1c..88ec802cf 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -26,7 +26,7 @@ logScope: # This useful util is missing from sequtils, this extends applyIt with predicate... template applyItIf*(varSeq, pred, op: untyped) = for i in low(varSeq) .. high(varSeq): - let it {.inject.} = varSeq[i] + var it {.inject.} = varSeq[i] if pred: op varSeq[i] = it