diff --git a/.github/workflows/version-check.yml b/.github/workflows/version-check.yml index e3ad958ba..ee01a9f1a 100644 --- a/.github/workflows/version-check.yml +++ b/.github/workflows/version-check.yml @@ -28,8 +28,11 @@ jobs: set -euo pipefail NIMBLE_VERSION=$(grep -m1 '^version = ' waku.nimble | sed -E 's/version = "([^"]+)"/\1/') # Nearest tag reachable from HEAD; --abbrev=0 drops the --g - # suffix so we get the bare tag (e.g. v0.38.0). - BASE_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "") + # suffix so we get the bare tag (e.g. v0.38.0). `--match 'v*'` skips + # the moving `nightly` tag (auto-updated by the daily CI to point at + # master HEAD), which would otherwise be picked as the nearest tag + # and break the version-sort comparison below. + BASE_TAG=$(git describe --tags --abbrev=0 --match 'v*' 2>/dev/null || echo "") BASE_TAG=${BASE_TAG#v} # Compare on the base version, ignoring any -rc.N prerelease suffix. BASE_TAG=${BASE_TAG%%-*} diff --git a/channels/events.nim b/channels/events.nim index 5a17c99d2..904a34dc6 100644 --- a/channels/events.nim +++ b/channels/events.nim @@ -21,3 +21,19 @@ EventBroker: channelId*: ChannelId senderId*: SdsParticipantID payload*: seq[byte] + +EventBroker: + ## Emitted when every segment of a channel-level `send()` reached + ## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the + ## `requestId` is the channel-layer parent returned by `send()`. + type ChannelMessageSentEvent* = object + channelId*: ChannelId + requestId*: RequestId + +EventBroker: + ## Emitted when a channel-level `send()` finalises with at least one + ## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`. + type ChannelMessageErrorEvent* = object + channelId*: ChannelId + requestId*: RequestId + error*: string diff --git a/channels/rate_limit_manager/rate_limit_manager.nim b/channels/rate_limit_manager/rate_limit_manager.nim index 5ea6486a5..ab5a9f67b 100644 --- a/channels/rate_limit_manager/rate_limit_manager.nim +++ b/channels/rate_limit_manager/rate_limit_manager.nim @@ -29,7 +29,7 @@ EventBroker: ## ## `channelId` lets listeners filter to their own channel, since all ## reliable channels share the underlying Waku node's broker context. - type ReadyToSendEvent* = object + type ReadyToSendEvent* = ref object channelId*: SdsChannelID msgs*: seq[seq[byte]] diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim index 2a7d01d35..c3fbe5d77 100644 --- a/channels/reliable_channel.nim +++ b/channels/reliable_channel.nim @@ -13,14 +13,15 @@ ## ## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html -import std/[options, tables] +import std/[options, sets, tables] import results import chronos import bearssl/rand import stew/byteutils import libp2p/crypto/crypto as libp2p_crypto -import waku/node/delivery_service/delivery_service +import waku/api/api +import waku/factory/waku as waku_factory import waku/node/delivery_service/send_service import waku/waku_core/topics @@ -31,8 +32,8 @@ import ./rate_limit_manager/rate_limit_manager import ./encryption/encryption export - delivery_service, send_service, events, segmentation, scalable_data_sync, - rate_limit_manager, encryption + api, waku_factory, events, segmentation, scalable_data_sync, rate_limit_manager, + encryption const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## Wire-format spec marker for the Reliable Channel layer, as defined @@ -42,27 +43,64 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## The trailing `/N` is the wire-format version and is bumped only ## on breaking on-the-wire changes; implementations pin one version. -type ReliableChannel* = ref object - ## Spec-defined public type. Fields are private so callers cannot - ## mutate internals and break invariants. Getters are added below - ## for the few values consumers may need. - deliveryService: DeliveryService - channelId: ChannelId - contentTopic: ContentTopic - senderId: SdsParticipantID - rng: ref HmacDrbgContext - segmentation: SegmentationHandler - sdsHandler: SdsHandler - rateLimit: RateLimitManager +type + SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. + async: (raises: [CatchableError]), gcsafe + .} + ## Egress dispatch boundary. Defaults to `waku.send`; tests inject a + ## fake that records calls and returns canned `RequestId`s so the + ## send state machine can be exercised end-to-end without a network. - requestIds: Table[RequestId, seq[RequestId]] - pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]] - brokerCtx: BrokerContext - ## Captured here so the channel emits `ChannelMessageReceivedEvent` - ## on the same broker context the owning manager registered its - ## listeners on. Without this, an emit via `globalBrokerContext()` - ## would land on whatever context happens to be thread-local at - ## emit time, which is not necessarily the manager's. + MessagePersistence {.pure.} = enum + Persistent + Ephemeral + + SegmentSendState {.pure.} = enum + ## Lifecycle of a single segment as tracked by the channel. The + ## messaging layer has its own richer `DeliveryState` (retries, + ## propagated-vs-validated); here we only model what's needed to + ## decide when a `channelReqId` is fully accounted for. + AwaitingRateLimit ## Pushed by `send`; not yet released by rate_limit_manager. + InFlight + ## Released by rate_limit_manager and handed to delivery_service; + ## `messagingReqId` is now set. + Confirmed ## `MessageSentEvent` arrived for `messagingReqId`. + Failed + ## `MessageErrorEvent` arrived for `messagingReqId`, or the local + ## delivery-task construction failed before any id was reachable. + + PendingMessagingRequest = object + ## One entry per segment (i.e. per messaging-layer request). The + ## relative order of `AwaitingRateLimit` entries must match the + ## order in which `rate_limit_manager` re-emits messages, which is + ## FIFO with `send()`. + channelReqId*: RequestId + ## The channel-layer parent id returned to the caller of `send()` in channel layer. + ## One channel request maps to N pending messaging requests. + messagingReqId*: Option[RequestId] + ## Per-segment messaging layer id. `none` until `onReadyToSend` assigns it. + persistenceReqType: MessagePersistence + segmentSendState*: SegmentSendState + + ReliableChannel* = ref object + ## Spec-defined public type. Fields are private so callers cannot + ## mutate internals and break invariants. Getters are added below + ## for the few values consumers may need. + sendHandler: SendHandler + channelId: ChannelId + contentTopic: ContentTopic + senderId: SdsParticipantID + rng: ref HmacDrbgContext + segmentation: SegmentationHandler + sdsHandler: SdsHandler + rateLimit: RateLimitManager + + requestIds: Table[RequestId, seq[RequestId]] + pendingMessagingRequests: seq[PendingMessagingRequest] + ## Entries are kept until the matching segment reaches a final + ## state (`Confirmed` or `Failed`); a whole channel request is + ## then pruned in one pass once all its segments are final. + brokerCtx: BrokerContext func getChannelId*(self: ReliableChannel): ChannelId {.inline.} = self.channelId @@ -73,19 +111,103 @@ func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} = func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = self.senderId +func isFinal(state: SegmentSendState): bool {.inline.} = + return state in {SegmentSendState.Confirmed, SegmentSendState.Failed} + +proc pruneCompletedChannelReqs(self: ReliableChannel) = + ## Drop every `pendingMessagingRequests` entry whose `channelReqId` + ## has all of its segments in a final state. A single failing + ## segment doesn't trigger a drop on its own — we wait until siblings + ## are also accounted for, so the channel-level outcome is decided + ## from a complete picture. For each fully-final `channelReqId`, emit + ## the channel-level final event before the entries are dropped: + ## `ChannelMessageSentEvent` if every sibling Confirmed, + ## `ChannelMessageErrorEvent` if any sibling Failed. + var hasPending = initHashSet[RequestId]() + var anyFailed = initHashSet[RequestId]() + for entry in self.pendingMessagingRequests: + if not entry.segmentSendState.isFinal(): + hasPending.incl(entry.channelReqId) + elif entry.segmentSendState == SegmentSendState.Failed: + anyFailed.incl(entry.channelReqId) + + var emitted = initHashSet[RequestId]() + for entry in self.pendingMessagingRequests: + if entry.channelReqId in hasPending or entry.channelReqId in emitted: + continue + emitted.incl(entry.channelReqId) + if entry.channelReqId in anyFailed: + ChannelMessageErrorEvent.emit( + self.brokerCtx, + ChannelMessageErrorEvent( + channelId: self.channelId, + requestId: entry.channelReqId, + error: "one or more segments failed", + ), + ) + else: + ChannelMessageSentEvent.emit( + self.brokerCtx, + ChannelMessageSentEvent( + channelId: self.channelId, requestId: entry.channelReqId + ), + ) + + self.pendingMessagingRequests.keepItIf(it.channelReqId in hasPending) + +proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) = + ## Invoked from this channel's `MessageSentEvent` listener. Flips + ## the matching `InFlight` segment to `Confirmed` and prunes. The + ## listener routes every event through here; entries that don't + ## belong to this channel simply don't match and are no-ops. + self.pendingMessagingRequests.applyItIf( + it.segmentSendState == SegmentSendState.InFlight and + it.messagingReqId == some(messagingReqId) + ): + it.segmentSendState = SegmentSendState.Confirmed + self.pruneCompletedChannelReqs() + +proc onMessageError(self: ReliableChannel, messagingReqId: RequestId) = + ## Symmetric to `onMessageSent` but for `MessageErrorEvent`. + self.pendingMessagingRequests.applyItIf( + it.segmentSendState == SegmentSendState.InFlight and + it.messagingReqId == some(messagingReqId) + ): + it.segmentSendState = SegmentSendState.Failed + self.pruneCompletedChannelReqs() + proc onReadyToSend( - self: ReliableChannel, msgs: seq[seq[byte]] + self: ReliableChannel, readyToSendEvent: ReadyToSendEvent ) {.async: (raises: []).} = ## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent` ## listener once `rate_limit_manager` releases a batch of opaque ## blobs (already-encoded SDS messages): ## ## ... -> rate_limit_manager -> [encryption] -> dispatch - for m in msgs: - ## Each `m` was preceded by exactly one push onto `pendingRequests` - ## in `send`, so this pop is always safe in the current skeleton. - let pending = self.pendingRequests[0] - self.pendingRequests.delete(0) + var idx = 0 + for m in readyToSendEvent.msgs: + ## The first `AwaitingRateLimit` entry in push order is the one + ## this `m` belongs to: `send()` adds one entry per segment, and + ## `rate_limit_manager` re-emits them in the same FIFO order, so + ## the two sequences advance in lockstep. Earlier entries may + ## already be `InFlight` / `Confirmed` / `Failed` because they + ## live on until every sibling of their `channelReqId` is final, + ## so we walk past those to find the next one that was awaiting for this batch. + while idx < self.pendingMessagingRequests.len and + self.pendingMessagingRequests[idx].segmentSendState != + SegmentSendState.AwaitingRateLimit + : + idx.inc() + if idx >= self.pendingMessagingRequests.len: + ## rate_limit_manager emitted more messages than we have pending — + ## should not happen given `send` pushes one entry per enqueued + ## SDS payload. Drop silently rather than corrupt state. + break + + let channelReqId = self.pendingMessagingRequests[idx].channelReqId + let isEphemeral = + self.pendingMessagingRequests[idx].persistenceReqType == + MessagePersistence.Ephemeral ## TODO: revisit which fields of the SDS message must be encrypted. ## Encrypting the whole encoded blob forces every receiver to attempt @@ -97,32 +219,58 @@ proc onReadyToSend( MessageErrorEvent.emit( self.brokerCtx, MessageErrorEvent( - requestId: pending.parent, - messageHash: "", - error: "encryption failed: " & error, + requestId: channelReqId, messageHash: "", error: "encryption failed: " & error ), ) + ## Encryption failed *before* we could hand the segment to the + ## delivery layer — no `messagingReqId` was minted and no + ## `DeliveryTask` was queued on `sendService`. The delivery + ## layer will therefore never emit a `MessageSentEvent` / + ## `MessageErrorEvent` for this segment, so `onMessageError` + ## won't fire either. Advance the state machine inline so the + ## parent `channelReqId` can still be pruned once its siblings + ## are also final. + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed + idx.inc() continue let wireBytes = seq[byte](encrypted) + ## The `meta` field carries the Reliable Channel wire-format spec + ## marker so the ingress side of any peer can route this WakuMessage + ## to its Reliable Channel layer. let envelope = MessageEnvelope( - contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral + contentTopic: self.contentTopic, + payload: wireBytes, + ephemeral: isEphemeral, + meta: LipWireReliableChannelVersion.toBytes(), ) - let deliveryReqId = RequestId.new(self.rng) - let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr: - ## TODO: emit waku `MessageErrorEvent` for the parent request id. + ## `waku.send` is not annotated `(raises: [])`, but this listener is. + ## Convert any raise to a Result error so the state machine handles + ## both failure modes (Result.err and exception) through one path. + let sendRes = + try: + await self.sendHandler(envelope) + except CatchableError as e: + Result[RequestId, string].err("waku send raised: " & e.msg) + + let messagingReqId = sendRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: channelReqId, messageHash: "", error: "waku send failed: " & error + ), + ) + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed + idx.inc() continue - ## Stamp the Reliable Channel wire-format spec marker so the ingress - ## side of any peer can route this WakuMessage to its Reliable - ## Channel layer. Done on the constructed WakuMessage rather than - ## via the envelope because `MessageEnvelope` does not expose a - ## `meta` field. - deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes() + self.pendingMessagingRequests[idx].messagingReqId = some(messagingReqId) + self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.InFlight + self.requestIds.mgetOrPut(channelReqId, @[]).add(messagingReqId) + idx.inc() - asyncSpawn self.deliveryService.sendService.send(deliveryTask) - self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId) + self.pruneCompletedChannelReqs() proc send*( self: ReliableChannel, payload: seq[byte], ephemeral: bool = false @@ -135,18 +283,22 @@ proc send*( ## ## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with ## the SDS messages cleared for transmission; the channel's listener - ## then runs the final stage (encryption -> dispatch). The `ephemeral` - ## flag is carried alongside each segment in `pendingRequests` and - ## stamped onto the eventual `MessageEnvelope`. + ## then runs the final stage (encryption -> dispatch). The + ## `persistenceReqType` is carried alongside each segment in + ## `pendingMessagingRequests` and stamped onto the eventual + ## `MessageEnvelope`. ## - ## The returned `RequestId` is the parent of one-or-more - ## delivery-service `RequestId`s; the mapping is recorded in + ## The returned `RequestId` is the channel-level parent of one-or-more + ## messaging-layer `RequestId`s; the mapping is recorded in ## `self.requestIds`. if payload.len == 0: return err("empty payload") - let parentReqId = RequestId.new(self.rng) - self.requestIds[parentReqId] = @[] + let channelReqId = RequestId.new(self.rng) + self.requestIds[channelReqId] = @[] + + let persistenceReqType = + if ephemeral: MessagePersistence.Ephemeral else: MessagePersistence.Persistent for segmentBytes in self.segmentation.performSegmentation(payload): ## Segments arrive already encoded; the segmentation module owns @@ -155,10 +307,17 @@ proc send*( self.channelId, self.senderId, segmentBytes ).valueOr: return err("SDS wrap failed: " & error) - self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral)) + self.pendingMessagingRequests.add( + PendingMessagingRequest( + channelReqId: channelReqId, + messagingReqId: none(RequestId), + persistenceReqType: persistenceReqType, + segmentSendState: SegmentSendState.AwaitingRateLimit, + ) + ) self.rateLimit.enqueueToSend(sdsBytes) - return ok(parentReqId) + return ok(channelReqId) proc onMessageReceived( self: ReliableChannel, messageHash: string, payload: seq[byte] @@ -206,7 +365,7 @@ proc onMessageReceived( proc new*( T: type ReliableChannel, - deliveryService: DeliveryService, + waku: Waku, channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, @@ -214,6 +373,7 @@ proc new*( sdsConfig: SdsConfig, rateConfig: RateLimitConfig, brokerCtx: BrokerContext = globalBrokerContext(), + sendHandler: SendHandler = nil, ): T = ## Pipeline handlers (segmentation/SDS/rate-limit) are constructed ## inside the channel rather than handed in by the caller — they are @@ -221,8 +381,20 @@ proc new*( ## should be wiring up. Encryption is delegated to the `Encrypt`/ ## `Decrypt` request brokers, so the channel keeps no per-instance ## encryption state either. + ## + ## `sendHandler` defaults to `waku.send`; tests pass a fake to drive + ## the send state machine without touching the network. + let resolvedSendHandler = + if sendHandler.isNil(): + proc( + envelope: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + return await waku.send(envelope) + else: + sendHandler + let chn = T( - deliveryService: deliveryService, + sendHandler: resolvedSendHandler, channelId: channelId, contentTopic: contentTopic, senderId: senderId, @@ -231,20 +403,21 @@ proc new*( sdsHandler: SdsHandler.new(sdsConfig, senderId), rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx), requestIds: initTable[RequestId, seq[RequestId]](), - pendingRequests: @[], + pendingMessagingRequests: @[], brokerCtx: brokerCtx, ) - ## Each channel owns its own egress + ingress listeners on - ## `chn.brokerCtx`, filtered to traffic addressed to this channel. - ## Keeping the listeners (and the procs they call) inside the - ## channel lets `onReadyToSend` and `onMessageReceived` stay private - ## — the manager doesn't need to know about them. + ## Each channel owns its own egress + ingress + send-completion + ## listeners on `chn.brokerCtx`, filtered to traffic addressed to + ## this channel. Keeping the listeners (and the handler procs they + ## call) inside the channel lets `onReadyToSend` / + ## `onMessageReceived` / `onMessageSent` / `onMessageError` stay + ## private — the manager doesn't need to know about them. discard ReadyToSendEvent.listen( chn.brokerCtx, proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} = if evt.channelId == chn.channelId: - await chn.onReadyToSend(evt.msgs) + await chn.onReadyToSend(evt) , ) @@ -261,4 +434,20 @@ proc new*( , ) + ## Send-completion events are tagged with the per-segment messaging + ## `requestId` — globally unique, so we don't need any channel filter + ## up front. The handler scans this channel's pending entries for a + ## match and is a no-op when the id belongs to a different channel. + discard MessageSentEvent.listen( + chn.brokerCtx, + proc(evt: MessageSentEvent): Future[void] {.async: (raises: []).} = + chn.onMessageSent(evt.requestId), + ) + + discard MessageErrorEvent.listen( + chn.brokerCtx, + proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} = + chn.onMessageError(evt.requestId), + ) + return chn diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim index ddbdb37a6..747f755b4 100644 --- a/channels/reliable_channel_manager.nim +++ b/channels/reliable_channel_manager.nim @@ -14,6 +14,7 @@ import waku/api/api import waku/api/api_conf import waku/events/message_events as waku_message_events import waku/factory/waku as waku_factory +import waku/node/delivery_service/delivery_service import waku/waku_core/topics import ./reliable_channel @@ -23,11 +24,10 @@ export reliable_channel type ReliableChannelManager* = ref object channels: Table[ChannelId, ReliableChannel] - deliveryService: DeliveryService - ## Owned by the manager. The ownership chain is - ## ReliableChannelManager -> DeliveryService -> Waku -> WakuNode. - ## Hidden so callers can't substitute their own and bypass the - ## manager's pipeline. + waku: Waku + ## Owned by the manager. The channel layer reaches the messaging + ## API through `waku.send(envelope)`; constructing DeliveryTasks + ## directly would breach the layer boundary. brokerCtx: BrokerContext proc new*( @@ -38,15 +38,13 @@ proc new*( ## TODO !! The proper ownership chain is: ## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode, ## and this will be implemented in the future. For now, `createNode` - ## is called here to get a DeliveryService instance, and the WakuNode is immediately discarded. + ## is called here to get a Waku instance, and the WakuNode is immediately discarded. ## This is a temporary workaround to get the API let waku = ?(await createNode(conf)) let manager = T( - channels: initTable[ChannelId, ReliableChannel](), - deliveryService: waku.deliveryService, - brokerCtx: brokerCtx, + channels: initTable[ChannelId, ReliableChannel](), waku: waku, brokerCtx: brokerCtx ) return ok(manager) @@ -55,17 +53,18 @@ proc start*(self: ReliableChannelManager): Result[void, string] = ## Bring the owned DeliveryService up. Separated from `new` so callers ## can register encryption providers / create channels before traffic ## starts flowing. - self.deliveryService.startDeliveryService() + self.waku.deliveryService.startDeliveryService() proc stop*(self: ReliableChannelManager) {.async.} = - if not self.deliveryService.isNil(): - await self.deliveryService.stopDeliveryService() + if not self.waku.isNil(): + await self.waku.deliveryService.stopDeliveryService() proc createReliableChannel*( self: ReliableChannelManager, channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, + sendHandler: SendHandler = nil, ): Result[ChannelId, string] = ## Spec entry point. The `DeliveryService` and `rng` the channel needs ## are sourced from the owning `ReliableChannelManager` rather than @@ -75,6 +74,9 @@ proc createReliableChannel*( ## ## Segmentation, SDS and rate-limit configs will eventually be read ## from the node's `NodeConfig`. Defaults for now. + ## + ## `sendHandler` is left `nil` in production so the channel uses the + ## owned `waku.send`; tests pass a fake to bypass the network. if self.channels.hasKey(channelId): return err("channel already exists: " & channelId) @@ -94,7 +96,7 @@ proc createReliableChannel*( ) let chn = ReliableChannel.new( - deliveryService = self.deliveryService, + waku = self.waku, channelId = channelId, contentTopic = contentTopic, senderId = senderId, @@ -102,6 +104,7 @@ proc createReliableChannel*( sdsConfig = sdsConfig, rateConfig = rateConfig, brokerCtx = self.brokerCtx, + sendHandler = sendHandler, ) self.channels[channelId] = chn diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index 052cd35c9..2f49182a2 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -147,3 +147,171 @@ suite "Reliable Channel - ingress": check not fired await manager.stop() + +suite "Reliable Channel - send state machine": + asyncTest "MessageSentEvent finalises the channelReqId as Sent": + ## Drives the real send pipeline (`send` -> segmentation -> SDS -> + ## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that + ## returns a canned `RequestId` instead of hitting the network. + ## Emitting the delivery-layer `MessageSentEvent` must drive the + ## channel-level state machine through `Confirmed` and produce a + ## `ChannelMessageSentEvent` (channel-level terminal event) for the + ## `channelReqId` returned by `send()`. + const + channelId = ChannelId("sm-success-channel") + contentTopic = ContentTopic("/reliable-channel/test/sm-success") + fakeMsgReqId = RequestId("fake-msg-req-1") + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + var sendCalls = 0 + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + sendCalls.inc + return ok(fakeMsgReqId) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + let sentFut = newFuture[RequestId]("channel-sent") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + + let channelReqId = manager.send(channelId, "hello".toBytes()).expect("send") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and sendCalls == 0: + await sleepAsync(5.milliseconds) + check sendCalls == 1 + + waku_message_events.MessageSentEvent.emit( + brokerCtx, + waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""), + ) + + let finalised = await sentFut.withTimeout(1.seconds) + check finalised + if finalised: + check sentFut.read() == channelReqId + + await manager.stop() + + asyncTest "two independent channelReqIds are finalised independently": + ## Two `send()` calls -> two independent `channelReqId`s, each with + ## one segment under the current segmentation skeleton + ## (`performSegmentation` always emits exactly one segment). The + ## fake `SendHandler` returns distinct `messagingReqId`s; finalising + ## the first emits `ChannelMessageSentEvent` for its `channelReqId`, + ## finalising the second as a failure emits `ChannelMessageErrorEvent` + ## for the other. + const + channelId = ChannelId("sm-multi-channel") + contentTopic = ContentTopic("/reliable-channel/test/sm-multi") + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + var msgReqIds: seq[RequestId] + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1)) + msgReqIds.add(id) + return ok(id) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + let sentFut = newFuture[RequestId]("channel-sent") + let erroredFut = newFuture[RequestId]("channel-errored") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + discard ChannelMessageErrorEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageErrorEvent) {.async: (raises: []).} = + if not erroredFut.finished() and evt.channelId == channelId: + erroredFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageErrorEvent") + + let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1") + let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and msgReqIds.len < 2: + await sleepAsync(5.milliseconds) + check msgReqIds.len == 2 + + waku_message_events.MessageSentEvent.emit( + brokerCtx, + waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""), + ) + let sentArrived = await sentFut.withTimeout(1.seconds) + check sentArrived + if sentArrived: + check sentFut.read() == channelReqId1 + ## The second `channelReqId` must NOT have finalised yet — its + ## segment is still `InFlight`. + check not erroredFut.finished() + + waku_message_events.MessageErrorEvent.emit( + brokerCtx, + waku_message_events.MessageErrorEvent( + requestId: msgReqIds[1], messageHash: "", error: "synthetic" + ), + ) + let erroredArrived = await erroredFut.withTimeout(1.seconds) + check erroredArrived + if erroredArrived: + check erroredFut.read() == channelReqId2 + + await manager.stop() + + asyncTest "TODO: channelReqId not pruned until ALL its segments are final": + ## Placeholder for the multi-sibling prune rule. Today's + ## `performSegmentation` (segmentation skeleton) always emits + ## exactly one segment per `send()`, so multiple siblings under one + ## `channelReqId` cannot be produced through the real pipeline. + ## Implement once segmentation does real chunking: send a payload + ## larger than `DefaultSegmentSizeBytes`, capture the N + ## `messagingReqId`s from a fake `SendHandler`, finalise some, and + ## assert prune only fires once every sibling is final. + skip() diff --git a/waku/api/types.nim b/waku/api/types.nim index 9eae503c8..2b7edd616 100644 --- a/waku/api/types.nim +++ b/waku/api/types.nim @@ -11,6 +11,10 @@ type contentTopic*: ContentTopic payload*: seq[byte] ephemeral*: bool + meta*: seq[byte] + ## Opaque wire-format marker carried on the underlying WakuMessage. + ## Higher layers (e.g. Reliable Channel) stamp this so peers can route + ## ingress traffic to their corresponding layer. Empty by default. RequestId* = distinct string @@ -34,12 +38,18 @@ proc init*( contentTopic: ContentTopic, payload: seq[byte] | string, ephemeral: bool = false, + meta: seq[byte] = @[], ): MessageEnvelope = when payload is seq[byte]: - MessageEnvelope(contentTopic: contentTopic, payload: payload, ephemeral: ephemeral) + MessageEnvelope( + contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta + ) else: MessageEnvelope( - contentTopic: contentTopic, payload: payload.toBytes(), ephemeral: ephemeral + contentTopic: contentTopic, + payload: payload.toBytes(), + ephemeral: ephemeral, + meta: meta, ) proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = @@ -48,6 +58,7 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = contentTopic: envelope.contentTopic, payload: envelope.payload, ephemeral: envelope.ephemeral, + meta: envelope.meta, timestamp: getNowInNanosecondTime(), ) diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index 902f3aa1c..88ec802cf 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -26,7 +26,7 @@ logScope: # This useful util is missing from sequtils, this extends applyIt with predicate... template applyItIf*(varSeq, pred, op: untyped) = for i in low(varSeq) .. high(varSeq): - let it {.inject.} = varSeq[i] + var it {.inject.} = varSeq[i] if pred: op varSeq[i] = it