diff --git a/.github/workflows/ci-daily.yml b/.github/workflows/ci-daily.yml index 009e6c523..dea30ab7e 100644 --- a/.github/workflows/ci-daily.yml +++ b/.github/workflows/ci-daily.yml @@ -8,7 +8,8 @@ on: env: NPROC: 2 MAKEFLAGS: "-j${NPROC}" - NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none" + NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative" + NIM_PARAMS: "-d:disableMarchNative" jobs: build: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index f924d0f8b..9ddf904ef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,8 @@ concurrency: env: NPROC: 2 MAKEFLAGS: "-j${NPROC}" - NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none" + NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative" + NIM_PARAMS: "-d:disableMarchNative" NIM_VERSION: '2.2.4' NIMBLE_VERSION: '0.22.3' @@ -35,6 +36,9 @@ jobs: - 'nimble.lock' - 'waku.nimble' - 'Makefile' + - 'scripts/**' + - 'flake.nix' + - 'flake.lock' - 'library/**' - 'liblogosdelivery/**' v2: @@ -157,7 +161,7 @@ jobs: fi export MAKEFLAGS="-j1" - export NIMFLAGS="--colors:off -d:chronicles_colors:none" + export NIMFLAGS="--colors:off -d:chronicles_colors:none -d:disableMarchNative" export USE_LIBBACKTRACE=0 make V=1 POSTGRES=$postgres_enabled test diff --git a/.github/workflows/version-check.yml b/.github/workflows/version-check.yml index e3ad958ba..ee01a9f1a 100644 --- a/.github/workflows/version-check.yml +++ b/.github/workflows/version-check.yml @@ -28,8 +28,11 @@ jobs: set -euo pipefail NIMBLE_VERSION=$(grep -m1 '^version = ' waku.nimble | sed -E 's/version = "([^"]+)"/\1/') # Nearest tag reachable from HEAD; --abbrev=0 drops the --g - # suffix so we get the bare tag (e.g. v0.38.0). - BASE_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "") + # suffix so we get the bare tag (e.g. v0.38.0). `--match 'v*'` skips + # the moving `nightly` tag (auto-updated by the daily CI to point at + # master HEAD), which would otherwise be picked as the nearest tag + # and break the version-sort comparison below. + BASE_TAG=$(git describe --tags --abbrev=0 --match 'v*' 2>/dev/null || echo "") BASE_TAG=${BASE_TAG#v} # Compare on the base version, ignoring any -rc.N prerelease suffix. BASE_TAG=${BASE_TAG%%-*} diff --git a/channels/events.nim b/channels/events.nim index 5a17c99d2..904a34dc6 100644 --- a/channels/events.nim +++ b/channels/events.nim @@ -21,3 +21,19 @@ EventBroker: channelId*: ChannelId senderId*: SdsParticipantID payload*: seq[byte] + +EventBroker: + ## Emitted when every segment of a channel-level `send()` reached + ## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the + ## `requestId` is the channel-layer parent returned by `send()`. + type ChannelMessageSentEvent* = object + channelId*: ChannelId + requestId*: RequestId + +EventBroker: + ## Emitted when a channel-level `send()` finalises with at least one + ## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`. + type ChannelMessageErrorEvent* = object + channelId*: ChannelId + requestId*: RequestId + error*: string diff --git a/channels/rate_limit_manager/rate_limit_manager.nim b/channels/rate_limit_manager/rate_limit_manager.nim index 5ea6486a5..ab5a9f67b 100644 --- a/channels/rate_limit_manager/rate_limit_manager.nim +++ b/channels/rate_limit_manager/rate_limit_manager.nim @@ -29,7 +29,7 @@ EventBroker: ## ## `channelId` lets listeners filter to their own channel, since all ## reliable channels share the underlying Waku node's broker context. - type ReadyToSendEvent* = object + type ReadyToSendEvent* = ref object channelId*: SdsChannelID msgs*: seq[seq[byte]] diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim index 2a7d01d35..c3fbe5d77 100644 --- a/channels/reliable_channel.nim +++ b/channels/reliable_channel.nim @@ -13,14 +13,15 @@ ## ## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html -import std/[options, tables] +import std/[options, sets, tables] import results import chronos import bearssl/rand import stew/byteutils import libp2p/crypto/crypto as libp2p_crypto -import waku/node/delivery_service/delivery_service +import waku/api/api +import waku/factory/waku as waku_factory import waku/node/delivery_service/send_service import waku/waku_core/topics @@ -31,8 +32,8 @@ import ./rate_limit_manager/rate_limit_manager import ./encryption/encryption export - delivery_service, send_service, events, segmentation, scalable_data_sync, - rate_limit_manager, encryption + api, waku_factory, events, segmentation, scalable_data_sync, rate_limit_manager, + encryption const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## Wire-format spec marker for the Reliable Channel layer, as defined @@ -42,27 +43,64 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## The trailing `/N` is the wire-format version and is bumped only ## on breaking on-the-wire changes; implementations pin one version. -type ReliableChannel* = ref object - ## Spec-defined public type. Fields are private so callers cannot - ## mutate internals and break invariants. Getters are added below - ## for the few values consumers may need. - deliveryService: DeliveryService - channelId: ChannelId - contentTopic: ContentTopic - senderId: SdsParticipantID - rng: ref HmacDrbgContext - segmentation: SegmentationHandler - sdsHandler: SdsHandler - rateLimit: RateLimitManager +type + SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. + async: (raises: [CatchableError]), gcsafe + .} + ## Egress dispatch boundary. Defaults to `waku.send`; tests inject a + ## fake that records calls and returns canned `RequestId`s so the + ## send state machine can be exercised end-to-end without a network. - requestIds: Table[RequestId, seq[RequestId]] - pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]] - brokerCtx: BrokerContext - ## Captured here so the channel emits `ChannelMessageReceivedEvent` - ## on the same broker context the owning manager registered its - ## listeners on. Without this, an emit via `globalBrokerContext()` - ## would land on whatever context happens to be thread-local at - ## emit time, which is not necessarily the manager's. + MessagePersistence {.pure.} = enum + Persistent + Ephemeral + + SegmentSendState {.pure.} = enum + ## Lifecycle of a single segment as tracked by the channel. The + ## messaging layer has its own richer `DeliveryState` (retries, + ## propagated-vs-validated); here we only model what's needed to + ## decide when a `channelReqId` is fully accounted for. + AwaitingRateLimit ## Pushed by `send`; not yet released by rate_limit_manager. + InFlight + ## Released by rate_limit_manager and handed to delivery_service; + ## `messagingReqId` is now set. + Confirmed ## `MessageSentEvent` arrived for `messagingReqId`. + Failed + ## `MessageErrorEvent` arrived for `messagingReqId`, or the local + ## delivery-task construction failed before any id was reachable. + + PendingMessagingRequest = object + ## One entry per segment (i.e. per messaging-layer request). The + ## relative order of `AwaitingRateLimit` entries must match the + ## order in which `rate_limit_manager` re-emits messages, which is + ## FIFO with `send()`. + channelReqId*: RequestId + ## The channel-layer parent id returned to the caller of `send()` in channel layer. + ## One channel request maps to N pending messaging requests. + messagingReqId*: Option[RequestId] + ## Per-segment messaging layer id. `none` until `onReadyToSend` assigns it. + persistenceReqType: MessagePersistence + segmentSendState*: SegmentSendState + + ReliableChannel* = ref object + ## Spec-defined public type. Fields are private so callers cannot + ## mutate internals and break invariants. Getters are added below + ## for the few values consumers may need. + sendHandler: SendHandler + channelId: ChannelId + contentTopic: ContentTopic + senderId: SdsParticipantID + rng: ref HmacDrbgContext + segmentation: SegmentationHandler + sdsHandler: SdsHandler + rateLimit: RateLimitManager + + requestIds: Table[RequestId, seq[RequestId]] + pendingMessagingRequests: seq[PendingMessagingRequest] + ## Entries are kept until the matching segment reaches a final + ## state (`Confirmed` or `Failed`); a whole channel request is + ## then pruned in one pass once all its segments are final. + brokerCtx: BrokerContext func getChannelId*(self: ReliableChannel): ChannelId {.inline.} = self.channelId @@ -73,19 +111,103 @@ func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} = func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = self.senderId +func isFinal(state: SegmentSendState): bool {.inline.} = + return state in {SegmentSendState.Confirmed, SegmentSendState.Failed} + +proc pruneCompletedChannelReqs(self: ReliableChannel) = + ## Drop every `pendingMessagingRequests` entry whose `channelReqId` + ## has all of its segments in a final state. A single failing + ## segment doesn't trigger a drop on its own — we wait until siblings + ## are also accounted for, so the channel-level outcome is decided + ## from a complete picture. For each fully-final `channelReqId`, emit + ## the channel-level final event before the entries are dropped: + ## `ChannelMessageSentEvent` if every sibling Confirmed, + ## `ChannelMessageErrorEvent` if any sibling Failed. + var hasPending = initHashSet[RequestId]() + var anyFailed = initHashSet[RequestId]() + for entry in self.pendingMessagingRequests: + if not entry.segmentSendState.isFinal(): + hasPending.incl(entry.channelReqId) + elif entry.segmentSendState == SegmentSendState.Failed: + anyFailed.incl(entry.channelReqId) + + var emitted = initHashSet[RequestId]() + for entry in self.pendingMessagingRequests: + if entry.channelReqId in hasPending or entry.channelReqId in emitted: + continue + emitted.incl(entry.channelReqId) + if entry.channelReqId in anyFailed: + ChannelMessageErrorEvent.emit( + self.brokerCtx, + ChannelMessageErrorEvent( + channelId: self.channelId, + requestId: entry.channelReqId, + error: "one or more segments failed", + ), + ) + else: + ChannelMessageSentEvent.emit( + self.brokerCtx, + ChannelMessageSentEvent( + channelId: self.channelId, requestId: entry.channelReqId + ), + ) + + self.pendingMessagingRequests.keepItIf(it.channelReqId in hasPending) + +proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) = + ## Invoked from this channel's `MessageSentEvent` listener. Flips + ## the matching `InFlight` segment to `Confirmed` and prunes. The + ## listener routes every event through here; entries that don't + ## belong to this channel simply don't match and are no-ops. + self.pendingMessagingRequests.applyItIf( + it.segmentSendState == SegmentSendState.InFlight and + it.messagingReqId == some(messagingReqId) + ): + it.segmentSendState = SegmentSendState.Confirmed + self.pruneCompletedChannelReqs() + +proc onMessageError(self: ReliableChannel, messagingReqId: RequestId) = + ## Symmetric to `onMessageSent` but for `MessageErrorEvent`. + self.pendingMessagingRequests.applyItIf( + it.segmentSendState == SegmentSendState.InFlight and + it.messagingReqId == some(messagingReqId) + ): + it.segmentSendState = SegmentSendState.Failed + self.pruneCompletedChannelReqs() + proc onReadyToSend( - self: ReliableChannel, msgs: seq[seq[byte]] + self: ReliableChannel, readyToSendEvent: ReadyToSendEvent ) {.async: (raises: []).} = ## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent` ## listener once `rate_limit_manager` releases a batch of opaque ## blobs (already-encoded SDS messages): ## ## ... -> rate_limit_manager -> [encryption] -> dispatch - for m in msgs: - ## Each `m` was preceded by exactly one push onto `pendingRequests` - ## in `send`, so this pop is always safe in the current skeleton. - let pending = self.pendingRequests[0] - self.pendingRequests.delete(0) + var idx = 0 + for m in readyToSendEvent.msgs: + ## The first `AwaitingRateLimit` entry in push order is the one + ## this `m` belongs to: `send()` adds one entry per segment, and + ## `rate_limit_manager` re-emits them in the same FIFO order, so + ## the two sequences advance in lockstep. Earlier entries may + ## already be `InFlight` / `Confirmed` / `Failed` because they + ## live on until every sibling of their `channelReqId` is final, + ## so we walk past those to find the next one that was awaiting for this batch. + while idx < self.pendingMessagingRequests.len and + self.pendingMessagingRequests[idx].segmentSendState != + SegmentSendState.AwaitingRateLimit + : + idx.inc() + if idx >= self.pendingMessagingRequests.len: + ## rate_limit_manager emitted more messages than we have pending — + ## should not happen given `send` pushes one entry per enqueued + ## SDS payload. Drop silently rather than corrupt state. + break + + let channelReqId = self.pendingMessagingRequests[idx].channelReqId + let isEphemeral = + self.pendingMessagingRequests[idx].persistenceReqType == + MessagePersistence.Ephemeral ## TODO: revisit which fields of the SDS message must be encrypted. ## Encrypting the whole encoded blob forces every receiver to attempt @@ -97,32 +219,58 @@ proc onReadyToSend( MessageErrorEvent.emit( self.brokerCtx, MessageErrorEvent( - requestId: pending.parent, - messageHash: "", - error: "encryption failed: " & error, + requestId: channelReqId, messageHash: "", error: "encryption failed: " & error ), ) + ## Encryption failed *before* we could hand the segment to the + ## delivery layer — no `messagingReqId` was minted and no + ## `DeliveryTask` was queued on `sendService`. The delivery + ## layer will therefore never emit a `MessageSentEvent` / + ## `MessageErrorEvent` for this segment, so `onMessageError` + ## won't fire either. Advance the state machine inline so the + ## parent `channelReqId` can still be pruned once its siblings + ## are also final. + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed + idx.inc() continue let wireBytes = seq[byte](encrypted) + ## The `meta` field carries the Reliable Channel wire-format spec + ## marker so the ingress side of any peer can route this WakuMessage + ## to its Reliable Channel layer. let envelope = MessageEnvelope( - contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral + contentTopic: self.contentTopic, + payload: wireBytes, + ephemeral: isEphemeral, + meta: LipWireReliableChannelVersion.toBytes(), ) - let deliveryReqId = RequestId.new(self.rng) - let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr: - ## TODO: emit waku `MessageErrorEvent` for the parent request id. + ## `waku.send` is not annotated `(raises: [])`, but this listener is. + ## Convert any raise to a Result error so the state machine handles + ## both failure modes (Result.err and exception) through one path. + let sendRes = + try: + await self.sendHandler(envelope) + except CatchableError as e: + Result[RequestId, string].err("waku send raised: " & e.msg) + + let messagingReqId = sendRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: channelReqId, messageHash: "", error: "waku send failed: " & error + ), + ) + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed + idx.inc() continue - ## Stamp the Reliable Channel wire-format spec marker so the ingress - ## side of any peer can route this WakuMessage to its Reliable - ## Channel layer. Done on the constructed WakuMessage rather than - ## via the envelope because `MessageEnvelope` does not expose a - ## `meta` field. - deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes() + self.pendingMessagingRequests[idx].messagingReqId = some(messagingReqId) + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.InFlight + self.requestIds.mgetOrPut(channelReqId, @[]).add(messagingReqId) + idx.inc() - asyncSpawn self.deliveryService.sendService.send(deliveryTask) - self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId) + self.pruneCompletedChannelReqs() proc send*( self: ReliableChannel, payload: seq[byte], ephemeral: bool = false @@ -135,18 +283,22 @@ proc send*( ## ## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with ## the SDS messages cleared for transmission; the channel's listener - ## then runs the final stage (encryption -> dispatch). The `ephemeral` - ## flag is carried alongside each segment in `pendingRequests` and - ## stamped onto the eventual `MessageEnvelope`. + ## then runs the final stage (encryption -> dispatch). The + ## `persistenceReqType` is carried alongside each segment in + ## `pendingMessagingRequests` and stamped onto the eventual + ## `MessageEnvelope`. ## - ## The returned `RequestId` is the parent of one-or-more - ## delivery-service `RequestId`s; the mapping is recorded in + ## The returned `RequestId` is the channel-level parent of one-or-more + ## messaging-layer `RequestId`s; the mapping is recorded in ## `self.requestIds`. if payload.len == 0: return err("empty payload") - let parentReqId = RequestId.new(self.rng) - self.requestIds[parentReqId] = @[] + let channelReqId = RequestId.new(self.rng) + self.requestIds[channelReqId] = @[] + + let persistenceReqType = + if ephemeral: MessagePersistence.Ephemeral else: MessagePersistence.Persistent for segmentBytes in self.segmentation.performSegmentation(payload): ## Segments arrive already encoded; the segmentation module owns @@ -155,10 +307,17 @@ proc send*( self.channelId, self.senderId, segmentBytes ).valueOr: return err("SDS wrap failed: " & error) - self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral)) + self.pendingMessagingRequests.add( + PendingMessagingRequest( + channelReqId: channelReqId, + messagingReqId: none(RequestId), + persistenceReqType: persistenceReqType, + segmentSendState: SegmentSendState.AwaitingRateLimit, + ) + ) self.rateLimit.enqueueToSend(sdsBytes) - return ok(parentReqId) + return ok(channelReqId) proc onMessageReceived( self: ReliableChannel, messageHash: string, payload: seq[byte] @@ -206,7 +365,7 @@ proc onMessageReceived( proc new*( T: type ReliableChannel, - deliveryService: DeliveryService, + waku: Waku, channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, @@ -214,6 +373,7 @@ proc new*( sdsConfig: SdsConfig, rateConfig: RateLimitConfig, brokerCtx: BrokerContext = globalBrokerContext(), + sendHandler: SendHandler = nil, ): T = ## Pipeline handlers (segmentation/SDS/rate-limit) are constructed ## inside the channel rather than handed in by the caller — they are @@ -221,8 +381,20 @@ proc new*( ## should be wiring up. Encryption is delegated to the `Encrypt`/ ## `Decrypt` request brokers, so the channel keeps no per-instance ## encryption state either. + ## + ## `sendHandler` defaults to `waku.send`; tests pass a fake to drive + ## the send state machine without touching the network. + let resolvedSendHandler = + if sendHandler.isNil(): + proc( + envelope: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + return await waku.send(envelope) + else: + sendHandler + let chn = T( - deliveryService: deliveryService, + sendHandler: resolvedSendHandler, channelId: channelId, contentTopic: contentTopic, senderId: senderId, @@ -231,20 +403,21 @@ proc new*( sdsHandler: SdsHandler.new(sdsConfig, senderId), rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx), requestIds: initTable[RequestId, seq[RequestId]](), - pendingRequests: @[], + pendingMessagingRequests: @[], brokerCtx: brokerCtx, ) - ## Each channel owns its own egress + ingress listeners on - ## `chn.brokerCtx`, filtered to traffic addressed to this channel. - ## Keeping the listeners (and the procs they call) inside the - ## channel lets `onReadyToSend` and `onMessageReceived` stay private - ## — the manager doesn't need to know about them. + ## Each channel owns its own egress + ingress + send-completion + ## listeners on `chn.brokerCtx`, filtered to traffic addressed to + ## this channel. Keeping the listeners (and the handler procs they + ## call) inside the channel lets `onReadyToSend` / + ## `onMessageReceived` / `onMessageSent` / `onMessageError` stay + ## private — the manager doesn't need to know about them. discard ReadyToSendEvent.listen( chn.brokerCtx, proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} = if evt.channelId == chn.channelId: - await chn.onReadyToSend(evt.msgs) + await chn.onReadyToSend(evt) , ) @@ -261,4 +434,20 @@ proc new*( , ) + ## Send-completion events are tagged with the per-segment messaging + ## `requestId` — globally unique, so we don't need any channel filter + ## up front. The handler scans this channel's pending entries for a + ## match and is a no-op when the id belongs to a different channel. + discard MessageSentEvent.listen( + chn.brokerCtx, + proc(evt: MessageSentEvent): Future[void] {.async: (raises: []).} = + chn.onMessageSent(evt.requestId), + ) + + discard MessageErrorEvent.listen( + chn.brokerCtx, + proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} = + chn.onMessageError(evt.requestId), + ) + return chn diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim index ddbdb37a6..747f755b4 100644 --- a/channels/reliable_channel_manager.nim +++ b/channels/reliable_channel_manager.nim @@ -14,6 +14,7 @@ import waku/api/api import waku/api/api_conf import waku/events/message_events as waku_message_events import waku/factory/waku as waku_factory +import waku/node/delivery_service/delivery_service import waku/waku_core/topics import ./reliable_channel @@ -23,11 +24,10 @@ export reliable_channel type ReliableChannelManager* = ref object channels: Table[ChannelId, ReliableChannel] - deliveryService: DeliveryService - ## Owned by the manager. The ownership chain is - ## ReliableChannelManager -> DeliveryService -> Waku -> WakuNode. - ## Hidden so callers can't substitute their own and bypass the - ## manager's pipeline. + waku: Waku + ## Owned by the manager. The channel layer reaches the messaging + ## API through `waku.send(envelope)`; constructing DeliveryTasks + ## directly would breach the layer boundary. brokerCtx: BrokerContext proc new*( @@ -38,15 +38,13 @@ proc new*( ## TODO !! The proper ownership chain is: ## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode, ## and this will be implemented in the future. For now, `createNode` - ## is called here to get a DeliveryService instance, and the WakuNode is immediately discarded. + ## is called here to get a Waku instance, and the WakuNode is immediately discarded. ## This is a temporary workaround to get the API let waku = ?(await createNode(conf)) let manager = T( - channels: initTable[ChannelId, ReliableChannel](), - deliveryService: waku.deliveryService, - brokerCtx: brokerCtx, + channels: initTable[ChannelId, ReliableChannel](), waku: waku, brokerCtx: brokerCtx ) return ok(manager) @@ -55,17 +53,18 @@ proc start*(self: ReliableChannelManager): Result[void, string] = ## Bring the owned DeliveryService up. Separated from `new` so callers ## can register encryption providers / create channels before traffic ## starts flowing. - self.deliveryService.startDeliveryService() + self.waku.deliveryService.startDeliveryService() proc stop*(self: ReliableChannelManager) {.async.} = - if not self.deliveryService.isNil(): - await self.deliveryService.stopDeliveryService() + if not self.waku.isNil(): + await self.waku.deliveryService.stopDeliveryService() proc createReliableChannel*( self: ReliableChannelManager, channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, + sendHandler: SendHandler = nil, ): Result[ChannelId, string] = ## Spec entry point. The `DeliveryService` and `rng` the channel needs ## are sourced from the owning `ReliableChannelManager` rather than @@ -75,6 +74,9 @@ proc createReliableChannel*( ## ## Segmentation, SDS and rate-limit configs will eventually be read ## from the node's `NodeConfig`. Defaults for now. + ## + ## `sendHandler` is left `nil` in production so the channel uses the + ## owned `waku.send`; tests pass a fake to bypass the network. if self.channels.hasKey(channelId): return err("channel already exists: " & channelId) @@ -94,7 +96,7 @@ proc createReliableChannel*( ) let chn = ReliableChannel.new( - deliveryService = self.deliveryService, + waku = self.waku, channelId = channelId, contentTopic = contentTopic, senderId = senderId, @@ -102,6 +104,7 @@ proc createReliableChannel*( sdsConfig = sdsConfig, rateConfig = rateConfig, brokerCtx = self.brokerCtx, + sendHandler = sendHandler, ) self.channels[channelId] = chn diff --git a/flake.lock b/flake.lock index 8d0db9269..411bf2430 100644 --- a/flake.lock +++ b/flake.lock @@ -19,8 +19,7 @@ "root": { "inputs": { "nixpkgs": "nixpkgs", - "rust-overlay": "rust-overlay", - "zerokit": "zerokit" + "rust-overlay": "rust-overlay" } }, "rust-overlay": { @@ -42,47 +41,6 @@ "repo": "rust-overlay", "type": "github" } - }, - "rust-overlay_2": { - "inputs": { - "nixpkgs": [ - "zerokit", - "nixpkgs" - ] - }, - "locked": { - "lastModified": 1771211437, - "narHash": "sha256-lcNK438i4DGtyA+bPXXyVLHVmJjYpVKmpux9WASa3ro=", - "owner": "oxalica", - "repo": "rust-overlay", - "rev": "c62195b3d6e1bb11e0c2fb2a494117d3b55d410f", - "type": "github" - }, - "original": { - "owner": "oxalica", - "repo": "rust-overlay", - "type": "github" - } - }, - "zerokit": { - "inputs": { - "nixpkgs": [ - "nixpkgs" - ], - "rust-overlay": "rust-overlay_2" - }, - "locked": { - "owner": "vacp2p", - "repo": "zerokit", - "rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63", - "type": "github" - }, - "original": { - "owner": "vacp2p", - "repo": "zerokit", - "rev": "5e64cb8822bee65eed6cf459f95ae72b80c6ba63", - "type": "github" - } } }, "root": "root", diff --git a/flake.nix b/flake.nix index 6c283780d..b32a53455 100644 --- a/flake.nix +++ b/flake.nix @@ -17,19 +17,9 @@ url = "github:oxalica/rust-overlay"; inputs.nixpkgs.follows = "nixpkgs"; }; - - # External flake input: Zerokit pinned to a specific commit. - # Update the rev here when a new zerokit version is needed. - zerokit = { - # Pinned to v2.0.2 (5e64cb8822bee65eed6cf459f95ae72b80c6ba63) to match - # the vendor/zerokit submodule. Keep these two in sync: the nix build - # links librln from this input, the Makefile build from the submodule. - url = "github:vacp2p/zerokit/5e64cb8822bee65eed6cf459f95ae72b80c6ba63"; - inputs.nixpkgs.follows = "nixpkgs"; - }; }; - outputs = { self, nixpkgs, rust-overlay, zerokit }: + outputs = { self, nixpkgs, rust-overlay }: let systems = [ "x86_64-linux" "aarch64-linux" @@ -69,19 +59,78 @@ inherit system; overlays = [ (import rust-overlay) nimbleOverlay ]; }; + + # Prebuilt zerokit librln, fetched from the upstream GitHub release + # rather than compiled from source. Compiling zerokit makes Nix download + # its many crate dependencies from crates.io in one parallel burst, which + # crates.io intermittently rejects with HTTP 403 (rate limiting from the + # self-hosted runners' shared IP), breaking the nix build. The release + # ships the exact `stateless` library this project links (see + # scripts/build_rln.sh), so we use it directly — no Rust toolchain and + # no crates.io access needed. + # + # Keep `rlnVersion` aligned with `LIBRLN_VERSION` in the Makefile and the + # vendor/zerokit submodule. Each hash is the sha256 of the release tarball + # for that platform; refresh all four when bumping the version. + rlnVersion = "v2.0.2"; + rlnAssets = { + "x86_64-linux" = { triple = "x86_64-unknown-linux-gnu"; hash = "sha256-qbrUdaetYKFhjzxUP/QcwD3JHWJ8qk/tCMK3yXceIAk="; }; + "aarch64-linux" = { triple = "aarch64-unknown-linux-gnu"; hash = "sha256-s4bWrmCcNTWHNyJwV73ilWNp58ZdAVG+TAgtWN1cTQs="; }; + "x86_64-darwin" = { triple = "x86_64-apple-darwin"; hash = "sha256-ZaHP5CApN66FYY7jxwOmGcF9kJR78Fng3k1qE2W08Mk="; }; + "aarch64-darwin" = { triple = "aarch64-apple-darwin"; hash = "sha256-f2YppkPsKFdN00j+IY8fpvsebWTIb9lW/V1/vOTiVKU="; }; + }; + + mkZerokitRln = system: pkgs: + let + asset = rlnAssets.${system} or + (throw "zerokit ${rlnVersion} has no prebuilt rln asset for system '${system}'"); + in pkgs.stdenv.mkDerivation { + pname = "librln"; + version = lib.removePrefix "v" rlnVersion; + + src = pkgs.fetchurl { + url = "https://github.com/vacp2p/zerokit/releases/download/" + + "${rlnVersion}/${asset.triple}-stateless-rln.tar.gz"; + hash = asset.hash; + }; + + # The tarball lays its files out under release/. + sourceRoot = "release"; + dontConfigure = true; + dontBuild = true; + + # The release .so was linked outside Nix, so it references system + # libraries (libgcc_s, libstdc++, glibc) by bare name. autoPatchelfHook + # points those at the Nix versions so the library loads correctly when + # used by the Nix build. It does nothing for the static .a, and the + # step is skipped on macOS (dylib paths are fixed in nix/default.nix). + nativeBuildInputs = + pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.autoPatchelfHook ]; + buildInputs = + pkgs.lib.optionals pkgs.stdenv.isLinux [ pkgs.stdenv.cc.cc.lib ]; + + installPhase = '' + runHook preInstall + mkdir -p $out/lib + cp librln.a $out/lib/ 2>/dev/null || true + cp librln.so $out/lib/ 2>/dev/null || true + cp librln.dylib $out/lib/ 2>/dev/null || true + runHook postInstall + ''; + + meta = with pkgs.lib; { + description = "Prebuilt zerokit RLN library (stateless flavor)"; + homepage = "https://github.com/vacp2p/zerokit"; + license = with licenses; [ mit asl20 ]; + platforms = builtins.attrNames rlnAssets; + }; + }; in { packages = forAllSystems (system: let pkgs = pkgsFor system; - # HACK: Fix for stale cargoHash in 2.0.2 release. - zerokitRln = zerokit.packages.${system}.rln.overrideAttrs (old: { - cargoDeps = old.cargoDeps.overrideAttrs (oldCargoDeps: { - vendorStaging = oldCargoDeps.vendorStaging.overrideAttrs (_: { - outputHash = "sha256-PNwEdZLgGQPqQDrEK2hsQtSybVfBbD6xn4K47fPFJUU="; - }); - }); - }); + zerokitRln = mkZerokitRln system pkgs; liblogosdelivery = pkgs.callPackage ./nix/default.nix { inherit pkgs; @@ -94,14 +143,13 @@ inherit pkgs; src = ./.; targets = ["wakucanary"]; - zerokitRln = zerokit.packages.${system}.rln; + inherit zerokitRln; }; in { inherit liblogosdelivery wakucanary; - # Expose the cargoHash-corrected librln so downstream consumers + # Expose the prebuilt librln so downstream consumers # (e.g. logos-delivery-module) bundle the exact same librln this - # build links, instead of pulling zerokit's rln directly — whose - # committed cargoHash is stale for v2.0.2 (see zerokitRln above). + # build links against. rln = zerokitRln; default = liblogosdelivery; } diff --git a/scripts/build_rln.sh b/scripts/build_rln.sh index 35b5b8953..b028885e2 100755 --- a/scripts/build_rln.sh +++ b/scripts/build_rln.sh @@ -1,8 +1,15 @@ #!/usr/bin/env bash -# This script is used to build the rln library for the current platform. -# Previously downloaded prebuilt binaries, but due to compatibility issues -# we now always build from source. +# Provides the rln static library for the current platform. +# +# If zerokit publishes a prebuilt `stateless` release asset for this platform, +# download and use it: that is faster than compiling and avoids fetching +# zerokit's many crate dependencies from crates.io. The asset is selected by +# the Rust host target triple (the platform identifier reported by rustc, +# e.g. x86_64-unknown-linux-gnu or aarch64-apple-darwin). +# +# When no matching asset exists (e.g. Windows), build from the vendored +# zerokit submodule instead. set -e @@ -15,8 +22,26 @@ output_filename=$3 [[ -z "${rln_version}" ]] && { echo "No rln version specified"; exit 1; } [[ -z "${output_filename}" ]] && { echo "No output filename specified"; exit 1; } -echo "Building RLN library from source (version ${rln_version})..." +# --- Prefer the prebuilt release asset -------------------------------------- +# Host target triple, e.g. x86_64-unknown-linux-gnu / aarch64-apple-darwin. +host_triplet=$(rustc --version --verbose | awk '/host:/{print $2}') +tarball="${host_triplet}-stateless-rln.tar.gz" +url="https://github.com/vacp2p/zerokit/releases/download/${rln_version}/${tarball}" +echo "Looking for prebuilt RLN: ${url}" +if curl --silent --fail-with-body -L "${url}" -o "${tarball}"; then + echo "Downloaded prebuilt ${tarball}" + tar -xzf "${tarball}" + mv "release/librln.a" "${output_filename}" + rm -rf "${tarball}" release + echo "Using prebuilt ${output_filename}" + exit 0 +fi +# curl --fail-with-body writes the error body to the file on HTTP failure. +rm -f "${tarball}" +echo "No prebuilt asset for ${host_triplet} at ${rln_version}; building from source." + +# --- Fall back to building from the vendored submodule ---------------------- # Check if submodule version = version in Makefile cargo metadata --format-version=1 --no-deps --manifest-path "${build_dir}/rln/Cargo.toml" @@ -33,7 +58,6 @@ if [[ "v${submodule_version}" != "${rln_version}" ]]; then exit 1 fi -# Build rln from source. # `stateless` feature: logos-delivery does not maintain a local Merkle tree # (post-PR #3312); the contract is the source of truth and the path is fetched # via getMerkleProof(index). The stateless build compiles out tree code. diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index 052cd35c9..2f49182a2 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -147,3 +147,171 @@ suite "Reliable Channel - ingress": check not fired await manager.stop() + +suite "Reliable Channel - send state machine": + asyncTest "MessageSentEvent finalises the channelReqId as Sent": + ## Drives the real send pipeline (`send` -> segmentation -> SDS -> + ## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that + ## returns a canned `RequestId` instead of hitting the network. + ## Emitting the delivery-layer `MessageSentEvent` must drive the + ## channel-level state machine through `Confirmed` and produce a + ## `ChannelMessageSentEvent` (channel-level terminal event) for the + ## `channelReqId` returned by `send()`. + const + channelId = ChannelId("sm-success-channel") + contentTopic = ContentTopic("/reliable-channel/test/sm-success") + fakeMsgReqId = RequestId("fake-msg-req-1") + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + var sendCalls = 0 + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + sendCalls.inc + return ok(fakeMsgReqId) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + let sentFut = newFuture[RequestId]("channel-sent") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + + let channelReqId = manager.send(channelId, "hello".toBytes()).expect("send") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and sendCalls == 0: + await sleepAsync(5.milliseconds) + check sendCalls == 1 + + waku_message_events.MessageSentEvent.emit( + brokerCtx, + waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""), + ) + + let finalised = await sentFut.withTimeout(1.seconds) + check finalised + if finalised: + check sentFut.read() == channelReqId + + await manager.stop() + + asyncTest "two independent channelReqIds are finalised independently": + ## Two `send()` calls -> two independent `channelReqId`s, each with + ## one segment under the current segmentation skeleton + ## (`performSegmentation` always emits exactly one segment). The + ## fake `SendHandler` returns distinct `messagingReqId`s; finalising + ## the first emits `ChannelMessageSentEvent` for its `channelReqId`, + ## finalising the second as a failure emits `ChannelMessageErrorEvent` + ## for the other. + const + channelId = ChannelId("sm-multi-channel") + contentTopic = ContentTopic("/reliable-channel/test/sm-multi") + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + var msgReqIds: seq[RequestId] + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1)) + msgReqIds.add(id) + return ok(id) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + let sentFut = newFuture[RequestId]("channel-sent") + let erroredFut = newFuture[RequestId]("channel-errored") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + discard ChannelMessageErrorEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageErrorEvent) {.async: (raises: []).} = + if not erroredFut.finished() and evt.channelId == channelId: + erroredFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageErrorEvent") + + let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1") + let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and msgReqIds.len < 2: + await sleepAsync(5.milliseconds) + check msgReqIds.len == 2 + + waku_message_events.MessageSentEvent.emit( + brokerCtx, + waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""), + ) + let sentArrived = await sentFut.withTimeout(1.seconds) + check sentArrived + if sentArrived: + check sentFut.read() == channelReqId1 + ## The second `channelReqId` must NOT have finalised yet — its + ## segment is still `InFlight`. + check not erroredFut.finished() + + waku_message_events.MessageErrorEvent.emit( + brokerCtx, + waku_message_events.MessageErrorEvent( + requestId: msgReqIds[1], messageHash: "", error: "synthetic" + ), + ) + let erroredArrived = await erroredFut.withTimeout(1.seconds) + check erroredArrived + if erroredArrived: + check erroredFut.read() == channelReqId2 + + await manager.stop() + + asyncTest "TODO: channelReqId not pruned until ALL its segments are final": + ## Placeholder for the multi-sibling prune rule. Today's + ## `performSegmentation` (segmentation skeleton) always emits + ## exactly one segment per `send()`, so multiple siblings under one + ## `channelReqId` cannot be produced through the real pipeline. + ## Implement once segmentation does real chunking: send a payload + ## larger than `DefaultSegmentSizeBytes`, capture the N + ## `messagingReqId`s from a fake `SendHandler`, finalise some, and + ## assert prune only fires once every sibling is final. + skip() diff --git a/waku/api/types.nim b/waku/api/types.nim index 9eae503c8..2b7edd616 100644 --- a/waku/api/types.nim +++ b/waku/api/types.nim @@ -11,6 +11,10 @@ type contentTopic*: ContentTopic payload*: seq[byte] ephemeral*: bool + meta*: seq[byte] + ## Opaque wire-format marker carried on the underlying WakuMessage. + ## Higher layers (e.g. Reliable Channel) stamp this so peers can route + ## ingress traffic to their corresponding layer. Empty by default. RequestId* = distinct string @@ -34,12 +38,18 @@ proc init*( contentTopic: ContentTopic, payload: seq[byte] | string, ephemeral: bool = false, + meta: seq[byte] = @[], ): MessageEnvelope = when payload is seq[byte]: - MessageEnvelope(contentTopic: contentTopic, payload: payload, ephemeral: ephemeral) + MessageEnvelope( + contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta + ) else: MessageEnvelope( - contentTopic: contentTopic, payload: payload.toBytes(), ephemeral: ephemeral + contentTopic: contentTopic, + payload: payload.toBytes(), + ephemeral: ephemeral, + meta: meta, ) proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = @@ -48,6 +58,7 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = contentTopic: envelope.contentTopic, payload: envelope.payload, ephemeral: envelope.ephemeral, + meta: envelope.meta, timestamp: getNowInNanosecondTime(), ) diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index 902f3aa1c..88ec802cf 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -26,7 +26,7 @@ logScope: # This useful util is missing from sequtils, this extends applyIt with predicate... template applyItIf*(varSeq, pred, op: untyped) = for i in low(varSeq) .. high(varSeq): - let it {.inject.} = varSeq[i] + var it {.inject.} = varSeq[i] if pred: op varSeq[i] = it