diff --git a/channels/events.nim b/channels/events.nim index 5a17c99d2..904a34dc6 100644 --- a/channels/events.nim +++ b/channels/events.nim @@ -21,3 +21,19 @@ EventBroker: channelId*: ChannelId senderId*: SdsParticipantID payload*: seq[byte] + +EventBroker: + ## Emitted when every segment of a channel-level `send()` reached + ## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the + ## `requestId` is the channel-layer parent returned by `send()`. + type ChannelMessageSentEvent* = object + channelId*: ChannelId + requestId*: RequestId + +EventBroker: + ## Emitted when a channel-level `send()` finalises with at least one + ## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`. + type ChannelMessageErrorEvent* = object + channelId*: ChannelId + requestId*: RequestId + error*: string diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim index 49dec024a..1caeb2ba2 100644 --- a/channels/reliable_channel.nim +++ b/channels/reliable_channel.nim @@ -113,13 +113,6 @@ func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} = func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = self.senderId -func pendingMessagingRequestsLenForTest*(self: ReliableChannel): int {.inline.} = - ## Test-only: returns how many segments are still tracked in the - ## state machine. The internal segment lifecycle is not part of the - ## spec'd API; production callers must not observe it. Read-only — to - ## inject state, drive `send()` with a fake `SendHandler` instead. - return self.pendingMessagingRequests.len - func isFinal(state: SegmentSendState): bool {.inline.} = return state in {SegmentSendState.Confirmed, SegmentSendState.Failed} @@ -128,12 +121,41 @@ proc pruneCompletedChannelReqs(self: ReliableChannel) = ## has all of its segments in a final state. A single failing ## segment doesn't trigger a drop on its own — we wait until siblings ## are also accounted for, so the channel-level outcome is decided - ## from a complete picture. - var channelsWithPending = initHashSet[RequestId]() + ## from a complete picture. For each fully-final `channelReqId`, emit + ## the channel-level final event before the entries are dropped: + ## `ChannelMessageSentEvent` if every sibling Confirmed, + ## `ChannelMessageErrorEvent` if any sibling Failed. + var hasPending = initHashSet[RequestId]() + var anyFailed = initHashSet[RequestId]() for entry in self.pendingMessagingRequests: if not entry.segmentSendState.isFinal(): - channelsWithPending.incl(entry.channelReqId) - self.pendingMessagingRequests.keepItIf(it.channelReqId in channelsWithPending) + hasPending.incl(entry.channelReqId) + elif entry.segmentSendState == SegmentSendState.Failed: + anyFailed.incl(entry.channelReqId) + + var emitted = initHashSet[RequestId]() + for entry in self.pendingMessagingRequests: + if entry.channelReqId in hasPending or entry.channelReqId in emitted: + continue + emitted.incl(entry.channelReqId) + if entry.channelReqId in anyFailed: + ChannelMessageErrorEvent.emit( + self.brokerCtx, + ChannelMessageErrorEvent( + channelId: self.channelId, + requestId: entry.channelReqId, + error: "one or more segments failed", + ), + ) + else: + ChannelMessageSentEvent.emit( + self.brokerCtx, + ChannelMessageSentEvent( + channelId: self.channelId, requestId: entry.channelReqId + ), + ) + + self.pendingMessagingRequests.keepItIf(it.channelReqId in hasPending) proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) = ## Invoked from this channel's `MessageSentEvent` listener. Flips diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index 0bf7b2d0c..1b10f2b2b 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -149,14 +149,14 @@ suite "Reliable Channel - ingress": await manager.stop() suite "Reliable Channel - send state machine": - asyncTest "MessageSentEvent flips InFlight -> Confirmed and prunes": + asyncTest "MessageSentEvent finalises the channelReqId as Sent": ## Drives the real send pipeline (`send` -> segmentation -> SDS -> ## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that - ## returns canned `RequestId`s instead of hitting the network. Once - ## the segment reaches `InFlight`, the delivery-layer - ## `MessageSentEvent` is emitted and the entry must transition to - ## `Confirmed` and be pruned (it's the only segment for that - ## `channelReqId`). + ## returns a canned `RequestId` instead of hitting the network. + ## Emitting the delivery-layer `MessageSentEvent` must drive the + ## channel-level state machine through `Confirmed` and produce a + ## `ChannelMessageSentEvent` (channel-level terminal event) for the + ## `channelReqId` returned by `send()`. const channelId = ChannelId("sm-success-channel") contentTopic = ContentTopic("/reliable-channel/test/sm-success") @@ -189,36 +189,45 @@ suite "Reliable Channel - send state machine": let chn = manager.getChannelForTest(channelId) doAssert not chn.isNil() - check chn.pendingMessagingRequestsLenForTest == 0 - ## Small payload -> one segment -> exactly one `SendHandler` call. - discard chn.send("hello".toBytes()).expect("send") + let sentFut = newFuture[RequestId]("channel-sent") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + + let channelReqId = chn.send("hello".toBytes()).expect("send") let dispatchDeadline = Moment.now() + 1.seconds while Moment.now() < dispatchDeadline and sendCalls == 0: await sleepAsync(5.milliseconds) check sendCalls == 1 - check chn.pendingMessagingRequestsLenForTest == 1 waku_message_events.MessageSentEvent.emit( brokerCtx, waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""), ) - let pruneDeadline = Moment.now() + 1.seconds - while Moment.now() < pruneDeadline and chn.pendingMessagingRequestsLenForTest > 0: - await sleepAsync(5.milliseconds) - check chn.pendingMessagingRequestsLenForTest == 0 + let finalised = await sentFut.withTimeout(1.seconds) + check finalised + if finalised: + check sentFut.read() == channelReqId await manager.stop() - asyncTest "two independent channelReqIds are pruned independently": + asyncTest "two independent channelReqIds are finalised independently": ## Two `send()` calls -> two independent `channelReqId`s, each with ## one segment under the current segmentation skeleton ## (`performSegmentation` always emits exactly one segment). The ## fake `SendHandler` returns distinct `messagingReqId`s; finalising - ## the first must prune only its entry, leaving the second tracked, - ## then finalising the second prunes the remainder. + ## the first emits `ChannelMessageSentEvent` for its `channelReqId`, + ## finalising the second as a failure emits `ChannelMessageErrorEvent` + ## for the other. const channelId = ChannelId("sm-multi-channel") contentTopic = ContentTopic("/reliable-channel/test/sm-multi") @@ -252,25 +261,46 @@ suite "Reliable Channel - send state machine": let chn = manager.getChannelForTest(channelId) doAssert not chn.isNil() - discard chn.send("first".toBytes()).expect("send 1") - discard chn.send("second".toBytes()).expect("send 2") + let sentFut = newFuture[RequestId]("channel-sent") + let erroredFut = newFuture[RequestId]("channel-errored") + discard ChannelMessageSentEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} = + if not sentFut.finished() and evt.channelId == channelId: + sentFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageSentEvent") + discard ChannelMessageErrorEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageErrorEvent) {.async: (raises: []).} = + if not erroredFut.finished() and evt.channelId == channelId: + erroredFut.complete(evt.requestId) + , + ) + .expect("listen ChannelMessageErrorEvent") + + let channelReqId1 = chn.send("first".toBytes()).expect("send 1") + let channelReqId2 = chn.send("second".toBytes()).expect("send 2") let dispatchDeadline = Moment.now() + 1.seconds while Moment.now() < dispatchDeadline and msgReqIds.len < 2: await sleepAsync(5.milliseconds) check msgReqIds.len == 2 - check chn.pendingMessagingRequestsLenForTest == 2 waku_message_events.MessageSentEvent.emit( brokerCtx, waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""), ) - let firstPruneDeadline = Moment.now() + 1.seconds - while Moment.now() < firstPruneDeadline and chn.pendingMessagingRequestsLenForTest > 1: - await sleepAsync(5.milliseconds) - ## Only the first `channelReqId` is fully accounted for; the second - ## one's segment is still `InFlight`, so exactly one entry remains. - check chn.pendingMessagingRequestsLenForTest == 1 + let sentArrived = await sentFut.withTimeout(1.seconds) + check sentArrived + if sentArrived: + check sentFut.read() == channelReqId1 + ## The second `channelReqId` must NOT have finalised yet — its + ## segment is still `InFlight`. + check not erroredFut.finished() waku_message_events.MessageErrorEvent.emit( brokerCtx, @@ -278,10 +308,10 @@ suite "Reliable Channel - send state machine": requestId: msgReqIds[1], messageHash: "", error: "synthetic" ), ) - let pruneDeadline = Moment.now() + 1.seconds - while Moment.now() < pruneDeadline and chn.pendingMessagingRequestsLenForTest > 0: - await sleepAsync(5.milliseconds) - check chn.pendingMessagingRequestsLenForTest == 0 + let erroredArrived = await erroredFut.withTimeout(1.seconds) + check erroredArrived + if erroredArrived: + check erroredFut.read() == channelReqId2 await manager.stop()