diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim index 662ed5276..49dec024a 100644 --- a/channels/reliable_channel.nim +++ b/channels/reliable_channel.nim @@ -44,6 +44,13 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" ## on breaking on-the-wire changes; implementations pin one version. type + SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. + async: (raises: [CatchableError]), gcsafe + .} + ## Egress dispatch boundary. Defaults to `waku.send`; tests inject a + ## fake that records calls and returns canned `RequestId`s so the + ## send state machine can be exercised end-to-end without a network. + MessagePersistence {.pure.} = enum Persistent Ephemeral @@ -81,7 +88,7 @@ type ## Spec-defined public type. Fields are private so callers cannot ## mutate internals and break invariants. Getters are added below ## for the few values consumers may need. - waku: Waku + sendHandler: SendHandler channelId: ChannelId contentTopic: ContentTopic senderId: SdsParticipantID @@ -109,26 +116,10 @@ func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = func pendingMessagingRequestsLenForTest*(self: ReliableChannel): int {.inline.} = ## Test-only: returns how many segments are still tracked in the ## state machine. The internal segment lifecycle is not part of the - ## spec'd API; production callers must not observe it. + ## spec'd API; production callers must not observe it. Read-only — to + ## inject state, drive `send()` with a fake `SendHandler` instead. return self.pendingMessagingRequests.len -proc forceInjectInFlightForTest*( - self: ReliableChannel, channelReqId: RequestId, messagingReqId: RequestId -) = - ## Test-only: inject a pending entry already in `InFlight`. Bypasses - ## `send` / `onReadyToSend` so unit tests can exercise final-state - ## handling and the `pruneCompletedChannelReqs` rule (drop only when - ## *all* siblings of a `channelReqId` are final) without having - ## to drive — and race with — the real send pipeline. - self.pendingMessagingRequests.add( - PendingMessagingRequest( - channelReqId: channelReqId, - messagingReqId: some(messagingReqId), - persistenceReqType: MessagePersistence.Persistent, - segmentSendState: SegmentSendState.InFlight, - ) - ) - func isFinal(state: SegmentSendState): bool {.inline.} = return state in {SegmentSendState.Confirmed, SegmentSendState.Failed} @@ -239,7 +230,7 @@ proc onReadyToSend( ## both failure modes (Result.err and exception) through one path. let sendRes = try: - await self.waku.send(envelope) + await self.sendHandler(envelope) except CatchableError as e: Result[RequestId, string].err("waku send raised: " & e.msg) @@ -364,6 +355,7 @@ proc new*( sdsConfig: SdsConfig, rateConfig: RateLimitConfig, brokerCtx: BrokerContext = globalBrokerContext(), + sendHandler: SendHandler = nil, ): T = ## Pipeline handlers (segmentation/SDS/rate-limit) are constructed ## inside the channel rather than handed in by the caller — they are @@ -371,8 +363,20 @@ proc new*( ## should be wiring up. Encryption is delegated to the `Encrypt`/ ## `Decrypt` request brokers, so the channel keeps no per-instance ## encryption state either. + ## + ## `sendHandler` defaults to `waku.send`; tests pass a fake to drive + ## the send state machine without touching the network. + let resolvedSendHandler = + if sendHandler.isNil(): + proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {. + async: (raises: [CatchableError]), gcsafe + .} = + return await waku.send(envelope) + else: + sendHandler + let chn = T( - waku: waku, + sendHandler: resolvedSendHandler, channelId: channelId, contentTopic: contentTopic, senderId: senderId, diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim index e343d0ec4..c3ef2dcd7 100644 --- a/channels/reliable_channel_manager.nim +++ b/channels/reliable_channel_manager.nim @@ -75,6 +75,7 @@ proc createReliableChannel*( channelId: ChannelId, contentTopic: ContentTopic, senderId: SdsParticipantID, + sendHandler: SendHandler = nil, ): Result[ChannelId, string] = ## Spec entry point. The `DeliveryService` and `rng` the channel needs ## are sourced from the owning `ReliableChannelManager` rather than @@ -84,6 +85,9 @@ proc createReliableChannel*( ## ## Segmentation, SDS and rate-limit configs will eventually be read ## from the node's `NodeConfig`. Defaults for now. + ## + ## `sendHandler` is left `nil` in production so the channel uses the + ## owned `waku.send`; tests pass a fake to bypass the network. if self.channels.hasKey(channelId): return err("channel already exists: " & channelId) @@ -111,6 +115,7 @@ proc createReliableChannel*( sdsConfig = sdsConfig, rateConfig = rateConfig, brokerCtx = self.brokerCtx, + sendHandler = sendHandler, ) self.channels[channelId] = chn diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index 997a9d5cb..0bf7b2d0c 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -150,15 +150,17 @@ suite "Reliable Channel - ingress": suite "Reliable Channel - send state machine": asyncTest "MessageSentEvent flips InFlight -> Confirmed and prunes": - ## Exercises the channel-side state machine in isolation. We - ## inject a pending entry already in `InFlight` (so we don't have - ## to drive — and race with — the real send pipeline), then emit - ## the delivery-layer `MessageSentEvent` for its `messagingReqId`. - ## The channel's own listener flips the entry to `Confirmed` and, - ## since it's the only segment for that `channelReqId`, prunes it. + ## Drives the real send pipeline (`send` -> segmentation -> SDS -> + ## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that + ## returns canned `RequestId`s instead of hitting the network. Once + ## the segment reaches `InFlight`, the delivery-layer + ## `MessageSentEvent` is emitted and the entry must transition to + ## `Confirmed` and be pruned (it's the only segment for that + ## `channelReqId`). const channelId = ChannelId("sm-success-channel") contentTopic = ContentTopic("/reliable-channel/test/sm-success") + fakeMsgReqId = RequestId("fake-msg-req-1") var manager: ReliableChannelManager var brokerCtx: BrokerContext @@ -169,37 +171,54 @@ suite "Reliable Channel - send state machine": ) setNoopEncryption() + + var sendCalls = 0 + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {. + async: (raises: [CatchableError]), gcsafe + .} = + sendCalls.inc + return ok(fakeMsgReqId) + discard manager - .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) .expect("createReliableChannel") let chn = manager.getChannelForTest(channelId) doAssert not chn.isNil() check chn.pendingMessagingRequestsLenForTest == 0 - let channelReqId = RequestId("test-channel-req") - let messagingReqId = RequestId("test-msg-req") - chn.forceInjectInFlightForTest(channelReqId, messagingReqId) + ## Small payload -> one segment -> exactly one `SendHandler` call. + discard chn.send("hello".toBytes()).expect("send") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and sendCalls == 0: + await sleepAsync(5.milliseconds) + check sendCalls == 1 check chn.pendingMessagingRequestsLenForTest == 1 waku_message_events.MessageSentEvent.emit( brokerCtx, - waku_message_events.MessageSentEvent(requestId: messagingReqId, messageHash: ""), + waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""), ) - let deadline = Moment.now() + 1.seconds - while Moment.now() < deadline and chn.pendingMessagingRequestsLenForTest > 0: + let pruneDeadline = Moment.now() + 1.seconds + while Moment.now() < pruneDeadline and chn.pendingMessagingRequestsLenForTest > 0: await sleepAsync(5.milliseconds) check chn.pendingMessagingRequestsLenForTest == 0 await manager.stop() - asyncTest "channelReqId not pruned until ALL its segments are final": - ## Validates `pruneCompletedChannelReqs`'s "wait for siblings" rule: - ## a channel request with multiple segments is only dropped once - ## every segment is `Confirmed` or `Failed`. Confirm the first - ## segment and assert both entries are still tracked; fail the - ## second and assert both are pruned. + asyncTest "two independent channelReqIds are pruned independently": + ## Two `send()` calls -> two independent `channelReqId`s, each with + ## one segment under the current segmentation skeleton + ## (`performSegmentation` always emits exactly one segment). The + ## fake `SendHandler` returns distinct `messagingReqId`s; finalising + ## the first must prune only its entry, leaving the second tracked, + ## then finalising the second prunes the remainder. const channelId = ChannelId("sm-multi-channel") contentTopic = ContentTopic("/reliable-channel/test/sm-multi") @@ -213,38 +232,66 @@ suite "Reliable Channel - send state machine": ) setNoopEncryption() + + var msgReqIds: seq[RequestId] + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {. + async: (raises: [CatchableError]), gcsafe + .} = + let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1)) + msgReqIds.add(id) + return ok(id) + discard manager - .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) .expect("createReliableChannel") let chn = manager.getChannelForTest(channelId) doAssert not chn.isNil() - let channelReqId = RequestId("multi-channel-req") - let msgReqId1 = RequestId("multi-msg-req-1") - let msgReqId2 = RequestId("multi-msg-req-2") - chn.forceInjectInFlightForTest(channelReqId, msgReqId1) - chn.forceInjectInFlightForTest(channelReqId, msgReqId2) + discard chn.send("first".toBytes()).expect("send 1") + discard chn.send("second".toBytes()).expect("send 2") + + let dispatchDeadline = Moment.now() + 1.seconds + while Moment.now() < dispatchDeadline and msgReqIds.len < 2: + await sleepAsync(5.milliseconds) + check msgReqIds.len == 2 check chn.pendingMessagingRequestsLenForTest == 2 waku_message_events.MessageSentEvent.emit( brokerCtx, - waku_message_events.MessageSentEvent(requestId: msgReqId1, messageHash: ""), + waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""), ) - await sleepAsync(50.milliseconds) - ## Sibling msgReqId2 is still `InFlight`, so prune must NOT fire - ## yet — both entries remain tracked. - check chn.pendingMessagingRequestsLenForTest == 2 + let firstPruneDeadline = Moment.now() + 1.seconds + while Moment.now() < firstPruneDeadline and chn.pendingMessagingRequestsLenForTest > 1: + await sleepAsync(5.milliseconds) + ## Only the first `channelReqId` is fully accounted for; the second + ## one's segment is still `InFlight`, so exactly one entry remains. + check chn.pendingMessagingRequestsLenForTest == 1 waku_message_events.MessageErrorEvent.emit( brokerCtx, waku_message_events.MessageErrorEvent( - requestId: msgReqId2, messageHash: "", error: "synthetic" + requestId: msgReqIds[1], messageHash: "", error: "synthetic" ), ) - let deadline = Moment.now() + 1.seconds - while Moment.now() < deadline and chn.pendingMessagingRequestsLenForTest > 0: + let pruneDeadline = Moment.now() + 1.seconds + while Moment.now() < pruneDeadline and chn.pendingMessagingRequestsLenForTest > 0: await sleepAsync(5.milliseconds) check chn.pendingMessagingRequestsLenForTest == 0 await manager.stop() + + asyncTest "TODO: channelReqId not pruned until ALL its segments are final": + ## Placeholder for the multi-sibling prune rule. Today's + ## `performSegmentation` (segmentation skeleton) always emits + ## exactly one segment per `send()`, so multiple siblings under one + ## `channelReqId` cannot be produced through the real pipeline. + ## Implement once segmentation does real chunking: send a payload + ## larger than `DefaultSegmentSizeBytes`, capture the N + ## `messagingReqId`s from a fake `SendHandler`, finalise some, and + ## assert prune only fires once every sibling is final. + skip()