use sendHandler in reliable channel

This commit is contained in:
Ivan FB 2026-05-29 13:08:56 +02:00
parent 9976fe3d07
commit 4d90c1190a
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
3 changed files with 110 additions and 54 deletions

View File

@ -44,6 +44,13 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
## on breaking on-the-wire changes; implementations pin one version.
type
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.}
## Egress dispatch boundary. Defaults to `waku.send`; tests inject a
## fake that records calls and returns canned `RequestId`s so the
## send state machine can be exercised end-to-end without a network.
MessagePersistence {.pure.} = enum
Persistent
Ephemeral
@ -81,7 +88,7 @@ type
## Spec-defined public type. Fields are private so callers cannot
## mutate internals and break invariants. Getters are added below
## for the few values consumers may need.
waku: Waku
sendHandler: SendHandler
channelId: ChannelId
contentTopic: ContentTopic
senderId: SdsParticipantID
@ -109,26 +116,10 @@ func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} =
func pendingMessagingRequestsLenForTest*(self: ReliableChannel): int {.inline.} =
## Test-only: returns how many segments are still tracked in the
## state machine. The internal segment lifecycle is not part of the
## spec'd API; production callers must not observe it.
## spec'd API; production callers must not observe it. Read-only — to
## inject state, drive `send()` with a fake `SendHandler` instead.
return self.pendingMessagingRequests.len
proc forceInjectInFlightForTest*(
self: ReliableChannel, channelReqId: RequestId, messagingReqId: RequestId
) =
## Test-only: inject a pending entry already in `InFlight`. Bypasses
## `send` / `onReadyToSend` so unit tests can exercise final-state
## handling and the `pruneCompletedChannelReqs` rule (drop only when
## *all* siblings of a `channelReqId` are final) without having
## to drive — and race with — the real send pipeline.
self.pendingMessagingRequests.add(
PendingMessagingRequest(
channelReqId: channelReqId,
messagingReqId: some(messagingReqId),
persistenceReqType: MessagePersistence.Persistent,
segmentSendState: SegmentSendState.InFlight,
)
)
func isFinal(state: SegmentSendState): bool {.inline.} =
return state in {SegmentSendState.Confirmed, SegmentSendState.Failed}
@ -239,7 +230,7 @@ proc onReadyToSend(
## both failure modes (Result.err and exception) through one path.
let sendRes =
try:
await self.waku.send(envelope)
await self.sendHandler(envelope)
except CatchableError as e:
Result[RequestId, string].err("waku send raised: " & e.msg)
@ -364,6 +355,7 @@ proc new*(
sdsConfig: SdsConfig,
rateConfig: RateLimitConfig,
brokerCtx: BrokerContext = globalBrokerContext(),
sendHandler: SendHandler = nil,
): T =
## Pipeline handlers (segmentation/SDS/rate-limit) are constructed
## inside the channel rather than handed in by the caller — they are
@ -371,8 +363,20 @@ proc new*(
## should be wiring up. Encryption is delegated to the `Encrypt`/
## `Decrypt` request brokers, so the channel keeps no per-instance
## encryption state either.
##
## `sendHandler` defaults to `waku.send`; tests pass a fake to drive
## the send state machine without touching the network.
let resolvedSendHandler =
if sendHandler.isNil():
proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.} =
return await waku.send(envelope)
else:
sendHandler
let chn = T(
waku: waku,
sendHandler: resolvedSendHandler,
channelId: channelId,
contentTopic: contentTopic,
senderId: senderId,

View File

@ -75,6 +75,7 @@ proc createReliableChannel*(
channelId: ChannelId,
contentTopic: ContentTopic,
senderId: SdsParticipantID,
sendHandler: SendHandler = nil,
): Result[ChannelId, string] =
## Spec entry point. The `DeliveryService` and `rng` the channel needs
## are sourced from the owning `ReliableChannelManager` rather than
@ -84,6 +85,9 @@ proc createReliableChannel*(
##
## Segmentation, SDS and rate-limit configs will eventually be read
## from the node's `NodeConfig`. Defaults for now.
##
## `sendHandler` is left `nil` in production so the channel uses the
## owned `waku.send`; tests pass a fake to bypass the network.
if self.channels.hasKey(channelId):
return err("channel already exists: " & channelId)
@ -111,6 +115,7 @@ proc createReliableChannel*(
sdsConfig = sdsConfig,
rateConfig = rateConfig,
brokerCtx = self.brokerCtx,
sendHandler = sendHandler,
)
self.channels[channelId] = chn

View File

@ -150,15 +150,17 @@ suite "Reliable Channel - ingress":
suite "Reliable Channel - send state machine":
asyncTest "MessageSentEvent flips InFlight -> Confirmed and prunes":
## Exercises the channel-side state machine in isolation. We
## inject a pending entry already in `InFlight` (so we don't have
## to drive — and race with — the real send pipeline), then emit
## the delivery-layer `MessageSentEvent` for its `messagingReqId`.
## The channel's own listener flips the entry to `Confirmed` and,
## since it's the only segment for that `channelReqId`, prunes it.
## Drives the real send pipeline (`send` -> segmentation -> SDS ->
## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that
## returns canned `RequestId`s instead of hitting the network. Once
## the segment reaches `InFlight`, the delivery-layer
## `MessageSentEvent` is emitted and the entry must transition to
## `Confirmed` and be pruned (it's the only segment for that
## `channelReqId`).
const
channelId = ChannelId("sm-success-channel")
contentTopic = ContentTopic("/reliable-channel/test/sm-success")
fakeMsgReqId = RequestId("fake-msg-req-1")
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
@ -169,37 +171,54 @@ suite "Reliable Channel - send state machine":
)
setNoopEncryption()
var sendCalls = 0
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.} =
sendCalls.inc
return ok(fakeMsgReqId)
discard manager
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.expect("createReliableChannel")
let chn = manager.getChannelForTest(channelId)
doAssert not chn.isNil()
check chn.pendingMessagingRequestsLenForTest == 0
let channelReqId = RequestId("test-channel-req")
let messagingReqId = RequestId("test-msg-req")
chn.forceInjectInFlightForTest(channelReqId, messagingReqId)
## Small payload -> one segment -> exactly one `SendHandler` call.
discard chn.send("hello".toBytes()).expect("send")
let dispatchDeadline = Moment.now() + 1.seconds
while Moment.now() < dispatchDeadline and sendCalls == 0:
await sleepAsync(5.milliseconds)
check sendCalls == 1
check chn.pendingMessagingRequestsLenForTest == 1
waku_message_events.MessageSentEvent.emit(
brokerCtx,
waku_message_events.MessageSentEvent(requestId: messagingReqId, messageHash: ""),
waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""),
)
let deadline = Moment.now() + 1.seconds
while Moment.now() < deadline and chn.pendingMessagingRequestsLenForTest > 0:
let pruneDeadline = Moment.now() + 1.seconds
while Moment.now() < pruneDeadline and chn.pendingMessagingRequestsLenForTest > 0:
await sleepAsync(5.milliseconds)
check chn.pendingMessagingRequestsLenForTest == 0
await manager.stop()
asyncTest "channelReqId not pruned until ALL its segments are final":
## Validates `pruneCompletedChannelReqs`'s "wait for siblings" rule:
## a channel request with multiple segments is only dropped once
## every segment is `Confirmed` or `Failed`. Confirm the first
## segment and assert both entries are still tracked; fail the
## second and assert both are pruned.
asyncTest "two independent channelReqIds are pruned independently":
## Two `send()` calls -> two independent `channelReqId`s, each with
## one segment under the current segmentation skeleton
## (`performSegmentation` always emits exactly one segment). The
## fake `SendHandler` returns distinct `messagingReqId`s; finalising
## the first must prune only its entry, leaving the second tracked,
## then finalising the second prunes the remainder.
const
channelId = ChannelId("sm-multi-channel")
contentTopic = ContentTopic("/reliable-channel/test/sm-multi")
@ -213,38 +232,66 @@ suite "Reliable Channel - send state machine":
)
setNoopEncryption()
var msgReqIds: seq[RequestId]
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.
async: (raises: [CatchableError]), gcsafe
.} =
let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1))
msgReqIds.add(id)
return ok(id)
discard manager
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.expect("createReliableChannel")
let chn = manager.getChannelForTest(channelId)
doAssert not chn.isNil()
let channelReqId = RequestId("multi-channel-req")
let msgReqId1 = RequestId("multi-msg-req-1")
let msgReqId2 = RequestId("multi-msg-req-2")
chn.forceInjectInFlightForTest(channelReqId, msgReqId1)
chn.forceInjectInFlightForTest(channelReqId, msgReqId2)
discard chn.send("first".toBytes()).expect("send 1")
discard chn.send("second".toBytes()).expect("send 2")
let dispatchDeadline = Moment.now() + 1.seconds
while Moment.now() < dispatchDeadline and msgReqIds.len < 2:
await sleepAsync(5.milliseconds)
check msgReqIds.len == 2
check chn.pendingMessagingRequestsLenForTest == 2
waku_message_events.MessageSentEvent.emit(
brokerCtx,
waku_message_events.MessageSentEvent(requestId: msgReqId1, messageHash: ""),
waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""),
)
await sleepAsync(50.milliseconds)
## Sibling msgReqId2 is still `InFlight`, so prune must NOT fire
## yet — both entries remain tracked.
check chn.pendingMessagingRequestsLenForTest == 2
let firstPruneDeadline = Moment.now() + 1.seconds
while Moment.now() < firstPruneDeadline and chn.pendingMessagingRequestsLenForTest > 1:
await sleepAsync(5.milliseconds)
## Only the first `channelReqId` is fully accounted for; the second
## one's segment is still `InFlight`, so exactly one entry remains.
check chn.pendingMessagingRequestsLenForTest == 1
waku_message_events.MessageErrorEvent.emit(
brokerCtx,
waku_message_events.MessageErrorEvent(
requestId: msgReqId2, messageHash: "", error: "synthetic"
requestId: msgReqIds[1], messageHash: "", error: "synthetic"
),
)
let deadline = Moment.now() + 1.seconds
while Moment.now() < deadline and chn.pendingMessagingRequestsLenForTest > 0:
let pruneDeadline = Moment.now() + 1.seconds
while Moment.now() < pruneDeadline and chn.pendingMessagingRequestsLenForTest > 0:
await sleepAsync(5.milliseconds)
check chn.pendingMessagingRequestsLenForTest == 0
await manager.stop()
asyncTest "TODO: channelReqId not pruned until ALL its segments are final":
## Placeholder for the multi-sibling prune rule. Today's
## `performSegmentation` (segmentation skeleton) always emits
## exactly one segment per `send()`, so multiple siblings under one
## `channelReqId` cannot be produced through the real pipeline.
## Implement once segmentation does real chunking: send a payload
## larger than `DefaultSegmentSizeBytes`, capture the N
## `messagingReqId`s from a fake `SendHandler`, finalise some, and
## assert prune only fires once every sibling is final.
skip()