diff --git a/.github/workflows/ci-daily.yml b/.github/workflows/ci-daily.yml index 009e6c523..dea30ab7e 100644 --- a/.github/workflows/ci-daily.yml +++ b/.github/workflows/ci-daily.yml @@ -8,7 +8,8 @@ on: env: NPROC: 2 MAKEFLAGS: "-j${NPROC}" - NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none" + NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative" + NIM_PARAMS: "-d:disableMarchNative" jobs: build: diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 84a7f0b8d..9ddf904ef 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -13,7 +13,8 @@ concurrency: env: NPROC: 2 MAKEFLAGS: "-j${NPROC}" - NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none" + NIMFLAGS: "--parallelBuild:${NPROC} --colors:off -d:chronicles_colors:none -d:disableMarchNative" + NIM_PARAMS: "-d:disableMarchNative" NIM_VERSION: '2.2.4' NIMBLE_VERSION: '0.22.3' @@ -160,7 +161,7 @@ jobs: fi export MAKEFLAGS="-j1" - export NIMFLAGS="--colors:off -d:chronicles_colors:none" + export NIMFLAGS="--colors:off -d:chronicles_colors:none -d:disableMarchNative" export USE_LIBBACKTRACE=0 make V=1 POSTGRES=$postgres_enabled test diff --git a/.github/workflows/version-check.yml b/.github/workflows/version-check.yml index e3ad958ba..ee01a9f1a 100644 --- a/.github/workflows/version-check.yml +++ b/.github/workflows/version-check.yml @@ -28,8 +28,11 @@ jobs: set -euo pipefail NIMBLE_VERSION=$(grep -m1 '^version = ' waku.nimble | sed -E 's/version = "([^"]+)"/\1/') # Nearest tag reachable from HEAD; --abbrev=0 drops the --g - # suffix so we get the bare tag (e.g. v0.38.0). - BASE_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "") + # suffix so we get the bare tag (e.g. v0.38.0). `--match 'v*'` skips + # the moving `nightly` tag (auto-updated by the daily CI to point at + # master HEAD), which would otherwise be picked as the nearest tag + # and break the version-sort comparison below. + BASE_TAG=$(git describe --tags --abbrev=0 --match 'v*' 2>/dev/null || echo "") BASE_TAG=${BASE_TAG#v} # Compare on the base version, ignoring any -rc.N prerelease suffix. BASE_TAG=${BASE_TAG%%-*} diff --git a/channels/events.nim b/channels/events.nim index 2ee20a26d..3e271976e 100644 --- a/channels/events.nim +++ b/channels/events.nim @@ -21,3 +21,19 @@ EventBroker: channelId*: ChannelId senderId*: SdsParticipantID payload*: seq[byte] + +EventBroker: + ## Emitted when every segment of a channel-level `send()` reached + ## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the + ## `requestId` is the channel-layer parent returned by `send()`. + type ChannelMessageSentEvent* = object + channelId*: ChannelId + requestId*: RequestId + +EventBroker: + ## Emitted when a channel-level `send()` finalises with at least one + ## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`. + type ChannelMessageErrorEvent* = object + channelId*: ChannelId + requestId*: RequestId + error*: string diff --git a/channels/rate_limit_manager/rate_limit_manager.nim b/channels/rate_limit_manager/rate_limit_manager.nim index 5ea6486a5..ab5a9f67b 100644 --- a/channels/rate_limit_manager/rate_limit_manager.nim +++ b/channels/rate_limit_manager/rate_limit_manager.nim @@ -29,7 +29,7 @@ EventBroker: ## ## `channelId` lets listeners filter to their own channel, since all ## reliable channels share the underlying Waku node's broker context. - type ReadyToSendEvent* = object + type ReadyToSendEvent* = ref object channelId*: SdsChannelID msgs*: seq[seq[byte]] diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim index f6a1f2ec4..23435030e 100644 --- a/channels/reliable_channel.nim +++ b/channels/reliable_channel.nim @@ -13,14 +13,14 @@ ## ## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html -import std/[options, tables] +import std/[options, sets, tables] import results import chronos import bearssl/rand import stew/byteutils import libp2p/crypto/crypto as libp2p_crypto -import waku/messaging_client +import waku/api/types import waku/node/delivery_service/send_service import waku/waku_core/topics @@ -31,8 +31,8 @@ import ./rate_limit_manager/rate_limit_manager import ./encryption/encryption export - messaging_client, send_service, events, segmentation, scalable_data_sync, - rate_limit_manager, encryption + types, send_service, events, segmentation, scalable_data_sync, rate_limit_manager, + encryption const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## Wire-format spec marker for the Reliable Channel layer, as defined @@ -42,27 +42,65 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## The trailing `/N` is the wire-format version and is bumped only ## on breaking on-the-wire changes; implementations pin one version. -type ReliableChannel* = ref object - ## Spec-defined public type. Fields are private so callers cannot - ## mutate internals and break invariants. Getters are added below - ## for the few values consumers may need. - messagingClient: MessagingClient - channelId: ChannelId - contentTopic: ContentTopic - senderId: SdsParticipantID - rng: ref HmacDrbgContext - segmentation: SegmentationHandler - sdsHandler: SdsHandler - rateLimit: RateLimitManager +type + SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. + async: (raises: [CatchableError]), gcsafe + .} + ## Egress dispatch boundary. Typically wraps `MessagingClient.send`; + ## tests inject a fake that records calls and returns canned + ## `RequestId`s so the send state machine can be exercised end-to-end + ## without a network. - requestIds: Table[RequestId, seq[RequestId]] - pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]] - brokerCtx: BrokerContext - ## Captured here so the channel emits `ChannelMessageReceivedEvent` - ## on the same broker context the owning manager registered its - ## listeners on. Without this, an emit via `globalBrokerContext()` - ## would land on whatever context happens to be thread-local at - ## emit time, which is not necessarily the manager's. + MessagePersistence {.pure.} = enum + Persistent + Ephemeral + + SegmentSendState {.pure.} = enum + ## Lifecycle of a single segment as tracked by the channel. The + ## messaging layer has its own richer `DeliveryState` (retries, + ## propagated-vs-validated); here we only model what's needed to + ## decide when a `channelReqId` is fully accounted for. + AwaitingRateLimit ## Pushed by `send`; not yet released by rate_limit_manager. + InFlight + ## Released by rate_limit_manager and handed to delivery_service; + ## `messagingReqId` is now set. + Confirmed ## `MessageSentEvent` arrived for `messagingReqId`. + Failed + ## `MessageErrorEvent` arrived for `messagingReqId`, or the local + ## delivery-task construction failed before any id was reachable. + + PendingMessagingRequest = object + ## One entry per segment (i.e. per messaging-layer request). The + ## relative order of `AwaitingRateLimit` entries must match the + ## order in which `rate_limit_manager` re-emits messages, which is + ## FIFO with `send()`. + channelReqId*: RequestId + ## The channel-layer parent id returned to the caller of `send()` in channel layer. + ## One channel request maps to N pending messaging requests. + messagingReqId*: Option[RequestId] + ## Per-segment messaging layer id. `none` until `onReadyToSend` assigns it. + persistenceReqType: MessagePersistence + segmentSendState*: SegmentSendState + + ReliableChannel* = ref object + ## Spec-defined public type. Fields are private so callers cannot + ## mutate internals and break invariants. Getters are added below + ## for the few values consumers may need. + sendHandler: SendHandler + channelId: ChannelId + contentTopic: ContentTopic + senderId: SdsParticipantID + rng: ref HmacDrbgContext + segmentation: SegmentationHandler + sdsHandler: SdsHandler + rateLimit: RateLimitManager + + requestIds: Table[RequestId, seq[RequestId]] + pendingMessagingRequests: seq[PendingMessagingRequest] + ## Entries are kept until the matching segment reaches a final + ## state (`Confirmed` or `Failed`); a whole channel request is + ## then pruned in one pass once all its segments are final. + brokerCtx: BrokerContext func getChannelId*(self: ReliableChannel): ChannelId {.inline.} = self.channelId @@ -73,19 +111,103 @@ func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} = func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = self.senderId +func isFinal(state: SegmentSendState): bool {.inline.} = + return state in {SegmentSendState.Confirmed, SegmentSendState.Failed} + +proc pruneCompletedChannelReqs(self: ReliableChannel) = + ## Drop every `pendingMessagingRequests` entry whose `channelReqId` + ## has all of its segments in a final state. A single failing + ## segment doesn't trigger a drop on its own — we wait until siblings + ## are also accounted for, so the channel-level outcome is decided + ## from a complete picture. For each fully-final `channelReqId`, emit + ## the channel-level final event before the entries are dropped: + ## `ChannelMessageSentEvent` if every sibling Confirmed, + ## `ChannelMessageErrorEvent` if any sibling Failed. + var hasPending = initHashSet[RequestId]() + var anyFailed = initHashSet[RequestId]() + for entry in self.pendingMessagingRequests: + if not entry.segmentSendState.isFinal(): + hasPending.incl(entry.channelReqId) + elif entry.segmentSendState == SegmentSendState.Failed: + anyFailed.incl(entry.channelReqId) + + var emitted = initHashSet[RequestId]() + for entry in self.pendingMessagingRequests: + if entry.channelReqId in hasPending or entry.channelReqId in emitted: + continue + emitted.incl(entry.channelReqId) + if entry.channelReqId in anyFailed: + ChannelMessageErrorEvent.emit( + self.brokerCtx, + ChannelMessageErrorEvent( + channelId: self.channelId, + requestId: entry.channelReqId, + error: "one or more segments failed", + ), + ) + else: + ChannelMessageSentEvent.emit( + self.brokerCtx, + ChannelMessageSentEvent( + channelId: self.channelId, requestId: entry.channelReqId + ), + ) + + self.pendingMessagingRequests.keepItIf(it.channelReqId in hasPending) + +proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) = + ## Invoked from this channel's `MessageSentEvent` listener. Flips + ## the matching `InFlight` segment to `Confirmed` and prunes. The + ## listener routes every event through here; entries that don't + ## belong to this channel simply don't match and are no-ops. + self.pendingMessagingRequests.applyItIf( + it.segmentSendState == SegmentSendState.InFlight and + it.messagingReqId == some(messagingReqId) + ): + it.segmentSendState = SegmentSendState.Confirmed + self.pruneCompletedChannelReqs() + +proc onMessageError(self: ReliableChannel, messagingReqId: RequestId) = + ## Symmetric to `onMessageSent` but for `MessageErrorEvent`. + self.pendingMessagingRequests.applyItIf( + it.segmentSendState == SegmentSendState.InFlight and + it.messagingReqId == some(messagingReqId) + ): + it.segmentSendState = SegmentSendState.Failed + self.pruneCompletedChannelReqs() + proc onReadyToSend( - self: ReliableChannel, msgs: seq[seq[byte]] + self: ReliableChannel, readyToSendEvent: ReadyToSendEvent ) {.async: (raises: []).} = ## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent` ## listener once `rate_limit_manager` releases a batch of opaque ## blobs (already-encoded SDS messages): ## ## ... -> rate_limit_manager -> [encryption] -> dispatch - for m in msgs: - ## Each `m` was preceded by exactly one push onto `pendingRequests` - ## in `send`, so this pop is always safe in the current skeleton. - let pending = self.pendingRequests[0] - self.pendingRequests.delete(0) + var idx = 0 + for m in readyToSendEvent.msgs: + ## The first `AwaitingRateLimit` entry in push order is the one + ## this `m` belongs to: `send()` adds one entry per segment, and + ## `rate_limit_manager` re-emits them in the same FIFO order, so + ## the two sequences advance in lockstep. Earlier entries may + ## already be `InFlight` / `Confirmed` / `Failed` because they + ## live on until every sibling of their `channelReqId` is final, + ## so we walk past those to find the next one that was awaiting for this batch. + while idx < self.pendingMessagingRequests.len and + self.pendingMessagingRequests[idx].segmentSendState != + SegmentSendState.AwaitingRateLimit + : + idx.inc() + if idx >= self.pendingMessagingRequests.len: + ## rate_limit_manager emitted more messages than we have pending — + ## should not happen given `send` pushes one entry per enqueued + ## SDS payload. Drop silently rather than corrupt state. + break + + let channelReqId = self.pendingMessagingRequests[idx].channelReqId + let isEphemeral = + self.pendingMessagingRequests[idx].persistenceReqType == + MessagePersistence.Ephemeral ## TODO: revisit which fields of the SDS message must be encrypted. ## Encrypting the whole encoded blob forces every receiver to attempt @@ -97,32 +219,58 @@ proc onReadyToSend( MessageErrorEvent.emit( self.brokerCtx, MessageErrorEvent( - requestId: pending.parent, - messageHash: "", - error: "encryption failed: " & error, + requestId: channelReqId, messageHash: "", error: "encryption failed: " & error ), ) + ## Encryption failed *before* we could hand the segment to the + ## delivery layer — no `messagingReqId` was minted and no + ## `DeliveryTask` was queued on `sendService`. The delivery + ## layer will therefore never emit a `MessageSentEvent` / + ## `MessageErrorEvent` for this segment, so `onMessageError` + ## won't fire either. Advance the state machine inline so the + ## parent `channelReqId` can still be pruned once its siblings + ## are also final. + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed + idx.inc() continue let wireBytes = seq[byte](encrypted) + ## The `meta` field carries the Reliable Channel wire-format spec + ## marker so the ingress side of any peer can route this WakuMessage + ## to its Reliable Channel layer. let envelope = MessageEnvelope( - contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral + contentTopic: self.contentTopic, + payload: wireBytes, + ephemeral: isEphemeral, + meta: LipWireReliableChannelVersion.toBytes(), ) - let deliveryReqId = RequestId.new(self.rng) - let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr: - ## TODO: emit waku `MessageErrorEvent` for the parent request id. + ## `sendHandler` is not annotated `(raises: [])`, but this listener is. + ## Convert any raise to a Result error so the state machine handles + ## both failure modes (Result.err and exception) through one path. + let sendRes = + try: + await self.sendHandler(envelope) + except CatchableError as e: + Result[RequestId, string].err("messaging send raised: " & e.msg) + + let messagingReqId = sendRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: channelReqId, messageHash: "", error: "messaging send failed: " & error + ), + ) + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed + idx.inc() continue - ## Stamp the Reliable Channel wire-format spec marker so the ingress - ## side of any peer can route this WakuMessage to its Reliable - ## Channel layer. Done on the constructed WakuMessage rather than - ## via the envelope because `MessageEnvelope` does not expose a - ## `meta` field. - deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes() + self.pendingMessagingRequests[idx].messagingReqId = some(messagingReqId) + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.InFlight + self.requestIds.mgetOrPut(channelReqId, @[]).add(messagingReqId) + idx.inc() - asyncSpawn self.messagingClient.sendService.send(deliveryTask) - self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId) + self.pruneCompletedChannelReqs() proc send*( self: ReliableChannel, payload: seq[byte], ephemeral: bool = false @@ -135,18 +283,22 @@ proc send*( ## ## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with ## the SDS messages cleared for transmission; the channel's listener - ## then runs the final stage (encryption -> dispatch). The `ephemeral` - ## flag is carried alongside each segment in `pendingRequests` and - ## stamped onto the eventual `MessageEnvelope`. + ## then runs the final stage (encryption -> dispatch). The + ## `persistenceReqType` is carried alongside each segment in + ## `pendingMessagingRequests` and stamped onto the eventual + ## `MessageEnvelope`. ## - ## The returned `RequestId` is the parent of one-or-more - ## delivery-service `RequestId`s; the mapping is recorded in + ## The returned `RequestId` is the channel-level parent of one-or-more + ## messaging-layer `RequestId`s; the mapping is recorded in ## `self.requestIds`. if payload.len == 0: return err("empty payload") - let parentReqId = RequestId.new(self.rng) - self.requestIds[parentReqId] = @[] + let channelReqId = RequestId.new(self.rng) + self.requestIds[channelReqId] = @[] + + let persistenceReqType = + if ephemeral: MessagePersistence.Ephemeral else: MessagePersistence.Persistent for segmentBytes in self.segmentation.performSegmentation(payload): ## Segments arrive already encoded; the segmentation module owns @@ -155,10 +307,17 @@ proc send*( self.channelId, self.senderId, segmentBytes ).valueOr: return err("SDS wrap failed: " & error) - self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral)) + self.pendingMessagingRequests.add( + PendingMessagingRequest( + channelReqId: channelReqId, + messagingReqId: none(RequestId), + persistenceReqType: persistenceReqType, + segmentSendState: SegmentSendState.AwaitingRateLimit, + ) + ) self.rateLimit.enqueueToSend(sdsBytes) - return ok(parentReqId) + return ok(channelReqId) proc onMessageReceived( self: ReliableChannel, messageHash: string, payload: seq[byte] @@ -206,7 +365,7 @@ proc onMessageReceived( proc new*( T: type ReliableChannel, - messagingClient: MessagingClient, + sendHandler: SendHandler, channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, @@ -221,8 +380,12 @@ proc new*( ## should be wiring up. Encryption is delegated to the `Encrypt`/ ## `Decrypt` request brokers, so the channel keeps no per-instance ## encryption state either. + ## + ## `sendHandler` is the egress dispatch. The owning `ReliableChannelManager` + ## typically constructs it as a closure over `MessagingClient.send`. Tests + ## pass a fake to drive the send state machine without touching the network. let chn = T( - messagingClient: messagingClient, + sendHandler: sendHandler, channelId: channelId, contentTopic: contentTopic, senderId: senderId, @@ -231,20 +394,21 @@ proc new*( sdsHandler: SdsHandler.new(sdsConfig, senderId), rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx), requestIds: initTable[RequestId, seq[RequestId]](), - pendingRequests: @[], + pendingMessagingRequests: @[], brokerCtx: brokerCtx, ) - ## Each channel owns its own egress + ingress listeners on - ## `chn.brokerCtx`, filtered to traffic addressed to this channel. - ## Keeping the listeners (and the procs they call) inside the - ## channel lets `onReadyToSend` and `onMessageReceived` stay private - ## — the manager doesn't need to know about them. + ## Each channel owns its own egress + ingress + send-completion + ## listeners on `chn.brokerCtx`, filtered to traffic addressed to + ## this channel. Keeping the listeners (and the handler procs they + ## call) inside the channel lets `onReadyToSend` / + ## `onMessageReceived` / `onMessageSent` / `onMessageError` stay + ## private — the manager doesn't need to know about them. discard ReadyToSendEvent.listen( chn.brokerCtx, proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} = if evt.channelId == chn.channelId: - await chn.onReadyToSend(evt.msgs) + await chn.onReadyToSend(evt) , ) @@ -261,4 +425,20 @@ proc new*( , ) + ## Send-completion events are tagged with the per-segment messaging + ## `requestId` — globally unique, so we don't need any channel filter + ## up front. The handler scans this channel's pending entries for a + ## match and is a no-op when the id belongs to a different channel. + discard MessageSentEvent.listen( + chn.brokerCtx, + proc(evt: MessageSentEvent): Future[void] {.async: (raises: []).} = + chn.onMessageSent(evt.requestId), + ) + + discard MessageErrorEvent.listen( + chn.brokerCtx, + proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} = + chn.onMessageError(evt.requestId), + ) + return chn diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim index c94baa836..020bf9b2d 100644 --- a/channels/reliable_channel_manager.nim +++ b/channels/reliable_channel_manager.nim @@ -24,23 +24,28 @@ export reliable_channel type ReliableChannelManager* = ref object channels: Table[ChannelId, ReliableChannel] messagingClient: MessagingClient - ## Borrowed from the owning `Waku`. The ownership chain is - ## Waku -> ReliableChannelManager -> MessagingClient (also Waku-owned). - ## Hidden so callers can't substitute their own and bypass the - ## manager's pipeline. + ## Borrowed from the owning `Waku`. + sendHandler: SendHandler + ## Default egress dispatch for channels created through this manager. + ## Constructed at mount time as a closure over `MessagingClient.send` + ## so the channel layer itself stays callable-only. brokerCtx: BrokerContext proc new*( T: type ReliableChannelManager, messagingClient: MessagingClient, + sendHandler: SendHandler, brokerCtx: BrokerContext = globalBrokerContext(), ): Result[T, string] = if messagingClient.isNil(): return err("messaging client is required") + if sendHandler.isNil(): + return err("sendHandler is required") ok( T( channels: initTable[ChannelId, ReliableChannel](), messagingClient: messagingClient, + sendHandler: sendHandler, brokerCtx: brokerCtx, ) ) @@ -61,15 +66,19 @@ proc createReliableChannel*( channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, + sendHandler: SendHandler = nil, ): Result[ChannelId, string] = - ## Spec entry point. The `MessagingClient` and `rng` the channel needs - ## are sourced from the owning `ReliableChannelManager` rather than - ## passed per call. Encryption is wired up through the `Encrypt`/ - ## `Decrypt` request brokers — the application installs its own - ## providers (or `setNoopEncryption()`) before traffic flows. + ## Spec entry point. The `sendHandler` and `rng` the channel needs are + ## sourced from the owning `ReliableChannelManager` rather than passed + ## per call. Encryption is wired up through the `Encrypt`/`Decrypt` + ## request brokers — the application installs its own providers + ## (or `setNoopEncryption()`) before traffic flows. ## ## Segmentation, SDS and rate-limit configs will eventually be read ## from the node's `NodeConfig`. Defaults for now. + ## + ## `sendHandler` defaults to the manager's default (constructed at mount + ## from `MessagingClient.send`); tests pass a fake to bypass the network. if self.channels.hasKey(channelId): return err("channel already exists: " & channelId) @@ -88,8 +97,14 @@ proc createReliableChannel*( epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch ) + let effectiveSendHandler = + if sendHandler.isNil(): + self.sendHandler + else: + sendHandler + let chn = ReliableChannel.new( - messagingClient = self.messagingClient, + sendHandler = effectiveSendHandler, channelId = channelId, contentTopic = contentTopic, senderId = senderId, diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index 6e189d1be..85f327859 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -97,7 +97,7 @@ suite "Reliable Channel - ingress": if arrived: check received.read() == appPayload - discard await waku.stop() + (await waku.stop()).expect("stop") asyncTest "manager drops unmarked WakuMessage": ## Mirror of the above: same content topic, but `meta` is empty @@ -150,4 +150,176 @@ suite "Reliable Channel - ingress": await sleepAsync(100.milliseconds) check not fired - discard await waku.stop() + (await waku.stop()).expect("stop") + +suite "Reliable Channel - send state machine": + asyncTest "MessageSentEvent finalises the channelReqId as Sent": + ## Drives the real send pipeline (`send` -> segmentation -> SDS -> + ## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that + ## returns a canned `RequestId` instead of hitting the network. + ## Emitting the delivery-layer `MessageSentEvent` must drive the + ## channel-level state machine through `Confirmed` and produce a + ## `ChannelMessageSentEvent` (channel-level terminal event) for the + ## `channelReqId` returned by `send()`. + const + channelId = ChannelId("sm-success-channel") + contentTopic = ContentTopic("/reliable-channel/test/sm-success") + fakeMsgReqId = RequestId("fake-msg-req-1") + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + var sendCalls = 0 + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + sendCalls.inc + return ok(fakeMsgReqId) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + let sentFut = newFuture[RequestId]("channel-sent") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + + let channelReqId = manager.send(channelId, "hello".toBytes()).expect("send") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and sendCalls == 0: + await sleepAsync(5.milliseconds) + check sendCalls == 1 + + waku_message_events.MessageSentEvent.emit( + brokerCtx, + waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""), + ) + + let finalised = await sentFut.withTimeout(1.seconds) + check finalised + if finalised: + check sentFut.read() == channelReqId + + (await waku.stop()).expect("stop") + + asyncTest "two independent channelReqIds are finalised independently": + ## Two `send()` calls -> two independent `channelReqId`s, each with + ## one segment under the current segmentation skeleton + ## (`performSegmentation` always emits exactly one segment). The + ## fake `SendHandler` returns distinct `messagingReqId`s; finalising + ## the first emits `ChannelMessageSentEvent` for its `channelReqId`, + ## finalising the second as a failure emits `ChannelMessageErrorEvent` + ## for the other. + const + channelId = ChannelId("sm-multi-channel") + contentTopic = ContentTopic("/reliable-channel/test/sm-multi") + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + var msgReqIds: seq[RequestId] + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1)) + msgReqIds.add(id) + return ok(id) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + let sentFut = newFuture[RequestId]("channel-sent") + let erroredFut = newFuture[RequestId]("channel-errored") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + discard ChannelMessageErrorEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageErrorEvent) {.async: (raises: []).} = + if not erroredFut.finished() and evt.channelId == channelId: + erroredFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageErrorEvent") + + let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1") + let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and msgReqIds.len < 2: + await sleepAsync(5.milliseconds) + check msgReqIds.len == 2 + + waku_message_events.MessageSentEvent.emit( + brokerCtx, + waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""), + ) + let sentArrived = await sentFut.withTimeout(1.seconds) + check sentArrived + if sentArrived: + check sentFut.read() == channelReqId1 + ## The second `channelReqId` must NOT have finalised yet — its + ## segment is still `InFlight`. + check not erroredFut.finished() + + waku_message_events.MessageErrorEvent.emit( + brokerCtx, + waku_message_events.MessageErrorEvent( + requestId: msgReqIds[1], messageHash: "", error: "synthetic" + ), + ) + let erroredArrived = await erroredFut.withTimeout(1.seconds) + check erroredArrived + if erroredArrived: + check erroredFut.read() == channelReqId2 + + (await waku.stop()).expect("stop") + + asyncTest "TODO: channelReqId not pruned until ALL its segments are final": + ## Placeholder for the multi-sibling prune rule. Today's + ## `performSegmentation` (segmentation skeleton) always emits + ## exactly one segment per `send()`, so multiple siblings under one + ## `channelReqId` cannot be produced through the real pipeline. + ## Implement once segmentation does real chunking: send a payload + ## larger than `DefaultSegmentSizeBytes`, capture the N + ## `messagingReqId`s from a fake `SendHandler`, finalise some, and + ## assert prune only fires once every sibling is final. + skip() diff --git a/tests/waku_relay/test_wakunode_relay.nim b/tests/waku_relay/test_wakunode_relay.nim index da79e2a09..a687119bd 100644 --- a/tests/waku_relay/test_wakunode_relay.nim +++ b/tests/waku_relay/test_wakunode_relay.nim @@ -698,13 +698,6 @@ suite "WakuNode - Relay": node.unsubscribe((kind: ContentUnsub, topic: contentTopicB)).isOkOr: assert false, "Failed to unsubscribe to topic: " & $error - check node.wakuRelay.isSubscribed(shard) - - node.unsubscribe((kind: ContentUnsub, topic: contentTopicA)).isOkOr: - assert false, "Failed to unsubscribe to topic: " & $error - node.unsubscribe((kind: ContentUnsub, topic: contentTopicC)).isOkOr: - assert false, "Failed to unsubscribe to topic: " & $error - ## After unsubcription, the node should not be subscribed to the shard anymore check not node.wakuRelay.isSubscribed(shard) diff --git a/waku/api/api.nim b/waku/api/api.nim index e0e7d4d7b..24049002b 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -50,27 +50,4 @@ proc send*( w: Waku, envelope: MessageEnvelope ): Future[Result[RequestId, string]] {.async.} = ?checkApiAvailability(w) - - let isSubbed = - w.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false) - if not isSubbed: - info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic - w.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: - warn "Failed to auto-subscribe", error = error - return err("Failed to auto-subscribe before sending: " & error) - - let requestId = RequestId.new(w.rng) - - let deliveryTask = DeliveryTask.new(requestId, envelope, w.brokerCtx).valueOr: - return err("API send: Failed to create delivery task: " & error) - - info "API send: scheduling delivery task", - requestId = $requestId, - pubsubTopic = deliveryTask.pubsubTopic, - contentTopic = deliveryTask.msg.contentTopic, - msgHash = deliveryTask.msgHash.to0xHex(), - myPeerId = w.node.peerId() - - asyncSpawn w.messagingClient.sendService.send(deliveryTask) - - return ok(requestId) + return await w.messagingClient.send(envelope) diff --git a/waku/api/types.nim b/waku/api/types.nim index 9eae503c8..2b7edd616 100644 --- a/waku/api/types.nim +++ b/waku/api/types.nim @@ -11,6 +11,10 @@ type contentTopic*: ContentTopic payload*: seq[byte] ephemeral*: bool + meta*: seq[byte] + ## Opaque wire-format marker carried on the underlying WakuMessage. + ## Higher layers (e.g. Reliable Channel) stamp this so peers can route + ## ingress traffic to their corresponding layer. Empty by default. RequestId* = distinct string @@ -34,12 +38,18 @@ proc init*( contentTopic: ContentTopic, payload: seq[byte] | string, ephemeral: bool = false, + meta: seq[byte] = @[], ): MessageEnvelope = when payload is seq[byte]: - MessageEnvelope(contentTopic: contentTopic, payload: payload, ephemeral: ephemeral) + MessageEnvelope( + contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta + ) else: MessageEnvelope( - contentTopic: contentTopic, payload: payload.toBytes(), ephemeral: ephemeral + contentTopic: contentTopic, + payload: payload.toBytes(), + ephemeral: ephemeral, + meta: meta, ) proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = @@ -48,6 +58,7 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = contentTopic: envelope.contentTopic, payload: envelope.payload, ephemeral: envelope.ephemeral, + meta: envelope.meta, timestamp: getNowInNanosecondTime(), ) diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index d4aef252b..ee70cf713 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -377,8 +377,15 @@ proc mountReliableChannelManager*(waku: Waku): Result[void, string] = return err("reliable channel manager requires a mounted messaging client") if waku.node.started: return err("cannot mount reliable channel manager on a started node") + + let messagingClient = waku.messagingClient + let defaultSendHandler: SendHandler = proc( + envelope: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + return await messagingClient.send(envelope) + waku.reliableChannelManager = ReliableChannelManager.new( - waku.messagingClient, waku.brokerCtx + messagingClient, defaultSendHandler, waku.brokerCtx ).valueOr: return err("could not create reliable channel manager: " & $error) return ok() diff --git a/waku/messaging_client.nim b/waku/messaging_client.nim index 32cb64a9e..1fc4deb3c 100644 --- a/waku/messaging_client.nim +++ b/waku/messaging_client.nim @@ -1,7 +1,17 @@ import results, chronos -import ./node/waku_node, ./node/delivery_service/[recv_service, send_service] +import chronicles +import + ./api/types, + ./node/[ + waku_node, + subscription_manager, + delivery_service/recv_service, + delivery_service/send_service, + delivery_service/send_service/delivery_task, + ] type MessagingClient* = ref object + node: WakuNode sendService*: SendService recvService*: RecvService started: bool @@ -11,7 +21,7 @@ proc new*( ): Result[T, string] = let sendService = ?SendService.new(useP2PReliability, node) let recvService = RecvService.new(node) - ok(T(sendService: sendService, recvService: recvService)) + ok(T(node: node, sendService: sendService, recvService: recvService)) proc start*(self: MessagingClient): Result[void, string] = if self.started: @@ -27,3 +37,27 @@ proc stop*(self: MessagingClient) {.async.} = await self.sendService.stopSendService() await self.recvService.stopRecvService() self.started = false + +proc send*( + self: MessagingClient, envelope: MessageEnvelope +): Future[Result[RequestId, string]] {.async.} = + ## High-level messaging API send. Auto-subscribes to the content topic + ## (so the local node sees its own gossipsub broadcast), builds a + ## `DeliveryTask`, and hands it to the send service. Returns the request + ## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`. + let isSubbed = + self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false) + if not isSubbed: + info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic + self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: + warn "Failed to auto-subscribe", error = error + return err("Failed to auto-subscribe before sending: " & error) + + let requestId = RequestId.new(self.node.rng) + + let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr: + return err("MessagingClient.send: Failed to create delivery task: " & error) + + asyncSpawn self.sendService.send(deliveryTask) + + return ok(requestId) diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index 242dfc111..e60b26124 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -26,7 +26,7 @@ logScope: # This useful util is missing from sequtils, this extends applyIt with predicate... template applyItIf*(varSeq, pred, op: untyped) = for i in low(varSeq) .. high(varSeq): - let it {.inject.} = varSeq[i] + var it {.inject.} = varSeq[i] if pred: op varSeq[i] = it