introduce ChannelMessageSentEven and ChannelMessageErrorEvent

This commit is contained in:
Ivan FB 2026-05-29 15:05:20 +02:00
parent 4d90c1190a
commit 2f9d9336cc
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
3 changed files with 109 additions and 41 deletions

View File

@ -21,3 +21,19 @@ EventBroker:
channelId*: ChannelId
senderId*: SdsParticipantID
payload*: seq[byte]
EventBroker:
## Emitted when every segment of a channel-level `send()` reached
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
## `requestId` is the channel-layer parent returned by `send()`.
type ChannelMessageSentEvent* = object
channelId*: ChannelId
requestId*: RequestId
EventBroker:
## Emitted when a channel-level `send()` finalises with at least one
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
type ChannelMessageErrorEvent* = object
channelId*: ChannelId
requestId*: RequestId
error*: string

View File

@ -113,13 +113,6 @@ func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} =
func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} =
self.senderId
func pendingMessagingRequestsLenForTest*(self: ReliableChannel): int {.inline.} =
## Test-only: returns how many segments are still tracked in the
## state machine. The internal segment lifecycle is not part of the
## spec'd API; production callers must not observe it. Read-only — to
## inject state, drive `send()` with a fake `SendHandler` instead.
return self.pendingMessagingRequests.len
func isFinal(state: SegmentSendState): bool {.inline.} =
return state in {SegmentSendState.Confirmed, SegmentSendState.Failed}
@ -128,12 +121,41 @@ proc pruneCompletedChannelReqs(self: ReliableChannel) =
## has all of its segments in a final state. A single failing
## segment doesn't trigger a drop on its own — we wait until siblings
## are also accounted for, so the channel-level outcome is decided
## from a complete picture.
var channelsWithPending = initHashSet[RequestId]()
## from a complete picture. For each fully-final `channelReqId`, emit
## the channel-level final event before the entries are dropped:
## `ChannelMessageSentEvent` if every sibling Confirmed,
## `ChannelMessageErrorEvent` if any sibling Failed.
var hasPending = initHashSet[RequestId]()
var anyFailed = initHashSet[RequestId]()
for entry in self.pendingMessagingRequests:
if not entry.segmentSendState.isFinal():
channelsWithPending.incl(entry.channelReqId)
self.pendingMessagingRequests.keepItIf(it.channelReqId in channelsWithPending)
hasPending.incl(entry.channelReqId)
elif entry.segmentSendState == SegmentSendState.Failed:
anyFailed.incl(entry.channelReqId)
var emitted = initHashSet[RequestId]()
for entry in self.pendingMessagingRequests:
if entry.channelReqId in hasPending or entry.channelReqId in emitted:
continue
emitted.incl(entry.channelReqId)
if entry.channelReqId in anyFailed:
ChannelMessageErrorEvent.emit(
self.brokerCtx,
ChannelMessageErrorEvent(
channelId: self.channelId,
requestId: entry.channelReqId,
error: "one or more segments failed",
),
)
else:
ChannelMessageSentEvent.emit(
self.brokerCtx,
ChannelMessageSentEvent(
channelId: self.channelId, requestId: entry.channelReqId
),
)
self.pendingMessagingRequests.keepItIf(it.channelReqId in hasPending)
proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) =
## Invoked from this channel's `MessageSentEvent` listener. Flips

View File

@ -149,14 +149,14 @@ suite "Reliable Channel - ingress":
await manager.stop()
suite "Reliable Channel - send state machine":
asyncTest "MessageSentEvent flips InFlight -> Confirmed and prunes":
asyncTest "MessageSentEvent finalises the channelReqId as Sent":
## Drives the real send pipeline (`send` -> segmentation -> SDS ->
## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that
## returns canned `RequestId`s instead of hitting the network. Once
## the segment reaches `InFlight`, the delivery-layer
## `MessageSentEvent` is emitted and the entry must transition to
## `Confirmed` and be pruned (it's the only segment for that
## `channelReqId`).
## returns a canned `RequestId` instead of hitting the network.
## Emitting the delivery-layer `MessageSentEvent` must drive the
## channel-level state machine through `Confirmed` and produce a
## `ChannelMessageSentEvent` (channel-level terminal event) for the
## `channelReqId` returned by `send()`.
const
channelId = ChannelId("sm-success-channel")
contentTopic = ContentTopic("/reliable-channel/test/sm-success")
@ -189,36 +189,45 @@ suite "Reliable Channel - send state machine":
let chn = manager.getChannelForTest(channelId)
doAssert not chn.isNil()
check chn.pendingMessagingRequestsLenForTest == 0
## Small payload -> one segment -> exactly one `SendHandler` call.
discard chn.send("hello".toBytes()).expect("send")
let sentFut = newFuture[RequestId]("channel-sent")
discard ChannelMessageSentEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} =
if not sentFut.finished() and evt.channelId == channelId:
sentFut.complete(evt.requestId)
,
)
.expect("listen ChannelMessageSentEvent")
let channelReqId = chn.send("hello".toBytes()).expect("send")
let dispatchDeadline = Moment.now() + 1.seconds
while Moment.now() < dispatchDeadline and sendCalls == 0:
await sleepAsync(5.milliseconds)
check sendCalls == 1
check chn.pendingMessagingRequestsLenForTest == 1
waku_message_events.MessageSentEvent.emit(
brokerCtx,
waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""),
)
let pruneDeadline = Moment.now() + 1.seconds
while Moment.now() < pruneDeadline and chn.pendingMessagingRequestsLenForTest > 0:
await sleepAsync(5.milliseconds)
check chn.pendingMessagingRequestsLenForTest == 0
let finalised = await sentFut.withTimeout(1.seconds)
check finalised
if finalised:
check sentFut.read() == channelReqId
await manager.stop()
asyncTest "two independent channelReqIds are pruned independently":
asyncTest "two independent channelReqIds are finalised independently":
## Two `send()` calls -> two independent `channelReqId`s, each with
## one segment under the current segmentation skeleton
## (`performSegmentation` always emits exactly one segment). The
## fake `SendHandler` returns distinct `messagingReqId`s; finalising
## the first must prune only its entry, leaving the second tracked,
## then finalising the second prunes the remainder.
## the first emits `ChannelMessageSentEvent` for its `channelReqId`,
## finalising the second as a failure emits `ChannelMessageErrorEvent`
## for the other.
const
channelId = ChannelId("sm-multi-channel")
contentTopic = ContentTopic("/reliable-channel/test/sm-multi")
@ -252,25 +261,46 @@ suite "Reliable Channel - send state machine":
let chn = manager.getChannelForTest(channelId)
doAssert not chn.isNil()
discard chn.send("first".toBytes()).expect("send 1")
discard chn.send("second".toBytes()).expect("send 2")
let sentFut = newFuture[RequestId]("channel-sent")
let erroredFut = newFuture[RequestId]("channel-errored")
discard ChannelMessageSentEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} =
if not sentFut.finished() and evt.channelId == channelId:
sentFut.complete(evt.requestId)
,
)
.expect("listen ChannelMessageSentEvent")
discard ChannelMessageErrorEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageErrorEvent) {.async: (raises: []).} =
if not erroredFut.finished() and evt.channelId == channelId:
erroredFut.complete(evt.requestId)
,
)
.expect("listen ChannelMessageErrorEvent")
let channelReqId1 = chn.send("first".toBytes()).expect("send 1")
let channelReqId2 = chn.send("second".toBytes()).expect("send 2")
let dispatchDeadline = Moment.now() + 1.seconds
while Moment.now() < dispatchDeadline and msgReqIds.len < 2:
await sleepAsync(5.milliseconds)
check msgReqIds.len == 2
check chn.pendingMessagingRequestsLenForTest == 2
waku_message_events.MessageSentEvent.emit(
brokerCtx,
waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""),
)
let firstPruneDeadline = Moment.now() + 1.seconds
while Moment.now() < firstPruneDeadline and chn.pendingMessagingRequestsLenForTest > 1:
await sleepAsync(5.milliseconds)
## Only the first `channelReqId` is fully accounted for; the second
## one's segment is still `InFlight`, so exactly one entry remains.
check chn.pendingMessagingRequestsLenForTest == 1
let sentArrived = await sentFut.withTimeout(1.seconds)
check sentArrived
if sentArrived:
check sentFut.read() == channelReqId1
## The second `channelReqId` must NOT have finalised yet — its
## segment is still `InFlight`.
check not erroredFut.finished()
waku_message_events.MessageErrorEvent.emit(
brokerCtx,
@ -278,10 +308,10 @@ suite "Reliable Channel - send state machine":
requestId: msgReqIds[1], messageHash: "", error: "synthetic"
),
)
let pruneDeadline = Moment.now() + 1.seconds
while Moment.now() < pruneDeadline and chn.pendingMessagingRequestsLenForTest > 0:
await sleepAsync(5.milliseconds)
check chn.pendingMessagingRequestsLenForTest == 0
let erroredArrived = await erroredFut.withTimeout(1.seconds)
check erroredArrived
if erroredArrived:
check erroredFut.read() == channelReqId2
await manager.stop()