mirror of
https://github.com/logos-messaging/logos-delivery.git
synced 2026-05-30 02:29:53 +00:00
better pending segments management (#3914)
Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
This commit is contained in:
parent
bb23ee64af
commit
c5b24e21da
7
.github/workflows/version-check.yml
vendored
7
.github/workflows/version-check.yml
vendored
@ -28,8 +28,11 @@ jobs:
|
||||
set -euo pipefail
|
||||
NIMBLE_VERSION=$(grep -m1 '^version = ' waku.nimble | sed -E 's/version = "([^"]+)"/\1/')
|
||||
# Nearest tag reachable from HEAD; --abbrev=0 drops the -<n>-g<sha>
|
||||
# suffix so we get the bare tag (e.g. v0.38.0).
|
||||
BASE_TAG=$(git describe --tags --abbrev=0 2>/dev/null || echo "")
|
||||
# suffix so we get the bare tag (e.g. v0.38.0). `--match 'v*'` skips
|
||||
# the moving `nightly` tag (auto-updated by the daily CI to point at
|
||||
# master HEAD), which would otherwise be picked as the nearest tag
|
||||
# and break the version-sort comparison below.
|
||||
BASE_TAG=$(git describe --tags --abbrev=0 --match 'v*' 2>/dev/null || echo "")
|
||||
BASE_TAG=${BASE_TAG#v}
|
||||
# Compare on the base version, ignoring any -rc.N prerelease suffix.
|
||||
BASE_TAG=${BASE_TAG%%-*}
|
||||
|
||||
@ -21,3 +21,19 @@ EventBroker:
|
||||
channelId*: ChannelId
|
||||
senderId*: SdsParticipantID
|
||||
payload*: seq[byte]
|
||||
|
||||
EventBroker:
|
||||
## Emitted when every segment of a channel-level `send()` reached
|
||||
## `Confirmed`. Channel-level analogue of `MessageSentEvent`; the
|
||||
## `requestId` is the channel-layer parent returned by `send()`.
|
||||
type ChannelMessageSentEvent* = object
|
||||
channelId*: ChannelId
|
||||
requestId*: RequestId
|
||||
|
||||
EventBroker:
|
||||
## Emitted when a channel-level `send()` finalises with at least one
|
||||
## segment in `Failed`. Channel-level analogue of `MessageErrorEvent`.
|
||||
type ChannelMessageErrorEvent* = object
|
||||
channelId*: ChannelId
|
||||
requestId*: RequestId
|
||||
error*: string
|
||||
|
||||
@ -29,7 +29,7 @@ EventBroker:
|
||||
##
|
||||
## `channelId` lets listeners filter to their own channel, since all
|
||||
## reliable channels share the underlying Waku node's broker context.
|
||||
type ReadyToSendEvent* = object
|
||||
type ReadyToSendEvent* = ref object
|
||||
channelId*: SdsChannelID
|
||||
msgs*: seq[seq[byte]]
|
||||
|
||||
|
||||
@ -13,14 +13,15 @@
|
||||
##
|
||||
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
|
||||
|
||||
import std/[options, tables]
|
||||
import std/[options, sets, tables]
|
||||
import results
|
||||
import chronos
|
||||
import bearssl/rand
|
||||
import stew/byteutils
|
||||
import libp2p/crypto/crypto as libp2p_crypto
|
||||
|
||||
import waku/node/delivery_service/delivery_service
|
||||
import waku/api/api
|
||||
import waku/factory/waku as waku_factory
|
||||
import waku/node/delivery_service/send_service
|
||||
import waku/waku_core/topics
|
||||
|
||||
@ -31,8 +32,8 @@ import ./rate_limit_manager/rate_limit_manager
|
||||
import ./encryption/encryption
|
||||
|
||||
export
|
||||
delivery_service, send_service, events, segmentation, scalable_data_sync,
|
||||
rate_limit_manager, encryption
|
||||
api, waku_factory, events, segmentation, scalable_data_sync, rate_limit_manager,
|
||||
encryption
|
||||
|
||||
const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
|
||||
## Wire-format spec marker for the Reliable Channel layer, as defined
|
||||
@ -42,27 +43,64 @@ const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1"
|
||||
## The trailing `/N` is the wire-format version and is bumped only
|
||||
## on breaking on-the-wire changes; implementations pin one version.
|
||||
|
||||
type ReliableChannel* = ref object
|
||||
## Spec-defined public type. Fields are private so callers cannot
|
||||
## mutate internals and break invariants. Getters are added below
|
||||
## for the few values consumers may need.
|
||||
deliveryService: DeliveryService
|
||||
channelId: ChannelId
|
||||
contentTopic: ContentTopic
|
||||
senderId: SdsParticipantID
|
||||
rng: ref HmacDrbgContext
|
||||
segmentation: SegmentationHandler
|
||||
sdsHandler: SdsHandler
|
||||
rateLimit: RateLimitManager
|
||||
type
|
||||
SendHandler* = proc(envelope: MessageEnvelope): Future[Result[RequestId, string]] {.
|
||||
async: (raises: [CatchableError]), gcsafe
|
||||
.}
|
||||
## Egress dispatch boundary. Defaults to `waku.send`; tests inject a
|
||||
## fake that records calls and returns canned `RequestId`s so the
|
||||
## send state machine can be exercised end-to-end without a network.
|
||||
|
||||
requestIds: Table[RequestId, seq[RequestId]]
|
||||
pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]]
|
||||
brokerCtx: BrokerContext
|
||||
## Captured here so the channel emits `ChannelMessageReceivedEvent`
|
||||
## on the same broker context the owning manager registered its
|
||||
## listeners on. Without this, an emit via `globalBrokerContext()`
|
||||
## would land on whatever context happens to be thread-local at
|
||||
## emit time, which is not necessarily the manager's.
|
||||
MessagePersistence {.pure.} = enum
|
||||
Persistent
|
||||
Ephemeral
|
||||
|
||||
SegmentSendState {.pure.} = enum
|
||||
## Lifecycle of a single segment as tracked by the channel. The
|
||||
## messaging layer has its own richer `DeliveryState` (retries,
|
||||
## propagated-vs-validated); here we only model what's needed to
|
||||
## decide when a `channelReqId` is fully accounted for.
|
||||
AwaitingRateLimit ## Pushed by `send`; not yet released by rate_limit_manager.
|
||||
InFlight
|
||||
## Released by rate_limit_manager and handed to delivery_service;
|
||||
## `messagingReqId` is now set.
|
||||
Confirmed ## `MessageSentEvent` arrived for `messagingReqId`.
|
||||
Failed
|
||||
## `MessageErrorEvent` arrived for `messagingReqId`, or the local
|
||||
## delivery-task construction failed before any id was reachable.
|
||||
|
||||
PendingMessagingRequest = object
|
||||
## One entry per segment (i.e. per messaging-layer request). The
|
||||
## relative order of `AwaitingRateLimit` entries must match the
|
||||
## order in which `rate_limit_manager` re-emits messages, which is
|
||||
## FIFO with `send()`.
|
||||
channelReqId*: RequestId
|
||||
## The channel-layer parent id returned to the caller of `send()` in channel layer.
|
||||
## One channel request maps to N pending messaging requests.
|
||||
messagingReqId*: Option[RequestId]
|
||||
## Per-segment messaging layer id. `none` until `onReadyToSend` assigns it.
|
||||
persistenceReqType: MessagePersistence
|
||||
segmentSendState*: SegmentSendState
|
||||
|
||||
ReliableChannel* = ref object
|
||||
## Spec-defined public type. Fields are private so callers cannot
|
||||
## mutate internals and break invariants. Getters are added below
|
||||
## for the few values consumers may need.
|
||||
sendHandler: SendHandler
|
||||
channelId: ChannelId
|
||||
contentTopic: ContentTopic
|
||||
senderId: SdsParticipantID
|
||||
rng: ref HmacDrbgContext
|
||||
segmentation: SegmentationHandler
|
||||
sdsHandler: SdsHandler
|
||||
rateLimit: RateLimitManager
|
||||
|
||||
requestIds: Table[RequestId, seq[RequestId]]
|
||||
pendingMessagingRequests: seq[PendingMessagingRequest]
|
||||
## Entries are kept until the matching segment reaches a final
|
||||
## state (`Confirmed` or `Failed`); a whole channel request is
|
||||
## then pruned in one pass once all its segments are final.
|
||||
brokerCtx: BrokerContext
|
||||
|
||||
func getChannelId*(self: ReliableChannel): ChannelId {.inline.} =
|
||||
self.channelId
|
||||
@ -73,19 +111,103 @@ func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} =
|
||||
func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} =
|
||||
self.senderId
|
||||
|
||||
func isFinal(state: SegmentSendState): bool {.inline.} =
|
||||
return state in {SegmentSendState.Confirmed, SegmentSendState.Failed}
|
||||
|
||||
proc pruneCompletedChannelReqs(self: ReliableChannel) =
|
||||
## Drop every `pendingMessagingRequests` entry whose `channelReqId`
|
||||
## has all of its segments in a final state. A single failing
|
||||
## segment doesn't trigger a drop on its own — we wait until siblings
|
||||
## are also accounted for, so the channel-level outcome is decided
|
||||
## from a complete picture. For each fully-final `channelReqId`, emit
|
||||
## the channel-level final event before the entries are dropped:
|
||||
## `ChannelMessageSentEvent` if every sibling Confirmed,
|
||||
## `ChannelMessageErrorEvent` if any sibling Failed.
|
||||
var hasPending = initHashSet[RequestId]()
|
||||
var anyFailed = initHashSet[RequestId]()
|
||||
for entry in self.pendingMessagingRequests:
|
||||
if not entry.segmentSendState.isFinal():
|
||||
hasPending.incl(entry.channelReqId)
|
||||
elif entry.segmentSendState == SegmentSendState.Failed:
|
||||
anyFailed.incl(entry.channelReqId)
|
||||
|
||||
var emitted = initHashSet[RequestId]()
|
||||
for entry in self.pendingMessagingRequests:
|
||||
if entry.channelReqId in hasPending or entry.channelReqId in emitted:
|
||||
continue
|
||||
emitted.incl(entry.channelReqId)
|
||||
if entry.channelReqId in anyFailed:
|
||||
ChannelMessageErrorEvent.emit(
|
||||
self.brokerCtx,
|
||||
ChannelMessageErrorEvent(
|
||||
channelId: self.channelId,
|
||||
requestId: entry.channelReqId,
|
||||
error: "one or more segments failed",
|
||||
),
|
||||
)
|
||||
else:
|
||||
ChannelMessageSentEvent.emit(
|
||||
self.brokerCtx,
|
||||
ChannelMessageSentEvent(
|
||||
channelId: self.channelId, requestId: entry.channelReqId
|
||||
),
|
||||
)
|
||||
|
||||
self.pendingMessagingRequests.keepItIf(it.channelReqId in hasPending)
|
||||
|
||||
proc onMessageSent(self: ReliableChannel, messagingReqId: RequestId) =
|
||||
## Invoked from this channel's `MessageSentEvent` listener. Flips
|
||||
## the matching `InFlight` segment to `Confirmed` and prunes. The
|
||||
## listener routes every event through here; entries that don't
|
||||
## belong to this channel simply don't match and are no-ops.
|
||||
self.pendingMessagingRequests.applyItIf(
|
||||
it.segmentSendState == SegmentSendState.InFlight and
|
||||
it.messagingReqId == some(messagingReqId)
|
||||
):
|
||||
it.segmentSendState = SegmentSendState.Confirmed
|
||||
self.pruneCompletedChannelReqs()
|
||||
|
||||
proc onMessageError(self: ReliableChannel, messagingReqId: RequestId) =
|
||||
## Symmetric to `onMessageSent` but for `MessageErrorEvent`.
|
||||
self.pendingMessagingRequests.applyItIf(
|
||||
it.segmentSendState == SegmentSendState.InFlight and
|
||||
it.messagingReqId == some(messagingReqId)
|
||||
):
|
||||
it.segmentSendState = SegmentSendState.Failed
|
||||
self.pruneCompletedChannelReqs()
|
||||
|
||||
proc onReadyToSend(
|
||||
self: ReliableChannel, msgs: seq[seq[byte]]
|
||||
self: ReliableChannel, readyToSendEvent: ReadyToSendEvent
|
||||
) {.async: (raises: []).} =
|
||||
## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent`
|
||||
## listener once `rate_limit_manager` releases a batch of opaque
|
||||
## blobs (already-encoded SDS messages):
|
||||
##
|
||||
## ... -> rate_limit_manager -> [encryption] -> dispatch
|
||||
for m in msgs:
|
||||
## Each `m` was preceded by exactly one push onto `pendingRequests`
|
||||
## in `send`, so this pop is always safe in the current skeleton.
|
||||
let pending = self.pendingRequests[0]
|
||||
self.pendingRequests.delete(0)
|
||||
var idx = 0
|
||||
for m in readyToSendEvent.msgs:
|
||||
## The first `AwaitingRateLimit` entry in push order is the one
|
||||
## this `m` belongs to: `send()` adds one entry per segment, and
|
||||
## `rate_limit_manager` re-emits them in the same FIFO order, so
|
||||
## the two sequences advance in lockstep. Earlier entries may
|
||||
## already be `InFlight` / `Confirmed` / `Failed` because they
|
||||
## live on until every sibling of their `channelReqId` is final,
|
||||
## so we walk past those to find the next one that was awaiting for this batch.
|
||||
while idx < self.pendingMessagingRequests.len and
|
||||
self.pendingMessagingRequests[idx].segmentSendState !=
|
||||
SegmentSendState.AwaitingRateLimit
|
||||
:
|
||||
idx.inc()
|
||||
if idx >= self.pendingMessagingRequests.len:
|
||||
## rate_limit_manager emitted more messages than we have pending —
|
||||
## should not happen given `send` pushes one entry per enqueued
|
||||
## SDS payload. Drop silently rather than corrupt state.
|
||||
break
|
||||
|
||||
let channelReqId = self.pendingMessagingRequests[idx].channelReqId
|
||||
let isEphemeral =
|
||||
self.pendingMessagingRequests[idx].persistenceReqType ==
|
||||
MessagePersistence.Ephemeral
|
||||
|
||||
## TODO: revisit which fields of the SDS message must be encrypted.
|
||||
## Encrypting the whole encoded blob forces every receiver to attempt
|
||||
@ -97,32 +219,58 @@ proc onReadyToSend(
|
||||
MessageErrorEvent.emit(
|
||||
self.brokerCtx,
|
||||
MessageErrorEvent(
|
||||
requestId: pending.parent,
|
||||
messageHash: "",
|
||||
error: "encryption failed: " & error,
|
||||
requestId: channelReqId, messageHash: "", error: "encryption failed: " & error
|
||||
),
|
||||
)
|
||||
## Encryption failed *before* we could hand the segment to the
|
||||
## delivery layer — no `messagingReqId` was minted and no
|
||||
## `DeliveryTask` was queued on `sendService`. The delivery
|
||||
## layer will therefore never emit a `MessageSentEvent` /
|
||||
## `MessageErrorEvent` for this segment, so `onMessageError`
|
||||
## won't fire either. Advance the state machine inline so the
|
||||
## parent `channelReqId` can still be pruned once its siblings
|
||||
## are also final.
|
||||
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed
|
||||
idx.inc()
|
||||
continue
|
||||
let wireBytes = seq[byte](encrypted)
|
||||
|
||||
## The `meta` field carries the Reliable Channel wire-format spec
|
||||
## marker so the ingress side of any peer can route this WakuMessage
|
||||
## to its Reliable Channel layer.
|
||||
let envelope = MessageEnvelope(
|
||||
contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral
|
||||
contentTopic: self.contentTopic,
|
||||
payload: wireBytes,
|
||||
ephemeral: isEphemeral,
|
||||
meta: LipWireReliableChannelVersion.toBytes(),
|
||||
)
|
||||
|
||||
let deliveryReqId = RequestId.new(self.rng)
|
||||
let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr:
|
||||
## TODO: emit waku `MessageErrorEvent` for the parent request id.
|
||||
## `waku.send` is not annotated `(raises: [])`, but this listener is.
|
||||
## Convert any raise to a Result error so the state machine handles
|
||||
## both failure modes (Result.err and exception) through one path.
|
||||
let sendRes =
|
||||
try:
|
||||
await self.sendHandler(envelope)
|
||||
except CatchableError as e:
|
||||
Result[RequestId, string].err("waku send raised: " & e.msg)
|
||||
|
||||
let messagingReqId = sendRes.valueOr:
|
||||
MessageErrorEvent.emit(
|
||||
self.brokerCtx,
|
||||
MessageErrorEvent(
|
||||
requestId: channelReqId, messageHash: "", error: "waku send failed: " & error
|
||||
),
|
||||
)
|
||||
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.Failed
|
||||
idx.inc()
|
||||
continue
|
||||
|
||||
## Stamp the Reliable Channel wire-format spec marker so the ingress
|
||||
## side of any peer can route this WakuMessage to its Reliable
|
||||
## Channel layer. Done on the constructed WakuMessage rather than
|
||||
## via the envelope because `MessageEnvelope` does not expose a
|
||||
## `meta` field.
|
||||
deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes()
|
||||
self.pendingMessagingRequests[idx].messagingReqId = some(messagingReqId)
|
||||
self.pendingMessagingRequests[idx].segmentSendState = SegmentSendState.InFlight
|
||||
self.requestIds.mgetOrPut(channelReqId, @[]).add(messagingReqId)
|
||||
idx.inc()
|
||||
|
||||
asyncSpawn self.deliveryService.sendService.send(deliveryTask)
|
||||
self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId)
|
||||
self.pruneCompletedChannelReqs()
|
||||
|
||||
proc send*(
|
||||
self: ReliableChannel, payload: seq[byte], ephemeral: bool = false
|
||||
@ -135,18 +283,22 @@ proc send*(
|
||||
##
|
||||
## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with
|
||||
## the SDS messages cleared for transmission; the channel's listener
|
||||
## then runs the final stage (encryption -> dispatch). The `ephemeral`
|
||||
## flag is carried alongside each segment in `pendingRequests` and
|
||||
## stamped onto the eventual `MessageEnvelope`.
|
||||
## then runs the final stage (encryption -> dispatch). The
|
||||
## `persistenceReqType` is carried alongside each segment in
|
||||
## `pendingMessagingRequests` and stamped onto the eventual
|
||||
## `MessageEnvelope`.
|
||||
##
|
||||
## The returned `RequestId` is the parent of one-or-more
|
||||
## delivery-service `RequestId`s; the mapping is recorded in
|
||||
## The returned `RequestId` is the channel-level parent of one-or-more
|
||||
## messaging-layer `RequestId`s; the mapping is recorded in
|
||||
## `self.requestIds`.
|
||||
if payload.len == 0:
|
||||
return err("empty payload")
|
||||
|
||||
let parentReqId = RequestId.new(self.rng)
|
||||
self.requestIds[parentReqId] = @[]
|
||||
let channelReqId = RequestId.new(self.rng)
|
||||
self.requestIds[channelReqId] = @[]
|
||||
|
||||
let persistenceReqType =
|
||||
if ephemeral: MessagePersistence.Ephemeral else: MessagePersistence.Persistent
|
||||
|
||||
for segmentBytes in self.segmentation.performSegmentation(payload):
|
||||
## Segments arrive already encoded; the segmentation module owns
|
||||
@ -155,10 +307,17 @@ proc send*(
|
||||
self.channelId, self.senderId, segmentBytes
|
||||
).valueOr:
|
||||
return err("SDS wrap failed: " & error)
|
||||
self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral))
|
||||
self.pendingMessagingRequests.add(
|
||||
PendingMessagingRequest(
|
||||
channelReqId: channelReqId,
|
||||
messagingReqId: none(RequestId),
|
||||
persistenceReqType: persistenceReqType,
|
||||
segmentSendState: SegmentSendState.AwaitingRateLimit,
|
||||
)
|
||||
)
|
||||
self.rateLimit.enqueueToSend(sdsBytes)
|
||||
|
||||
return ok(parentReqId)
|
||||
return ok(channelReqId)
|
||||
|
||||
proc onMessageReceived(
|
||||
self: ReliableChannel, messageHash: string, payload: seq[byte]
|
||||
@ -206,7 +365,7 @@ proc onMessageReceived(
|
||||
|
||||
proc new*(
|
||||
T: type ReliableChannel,
|
||||
deliveryService: DeliveryService,
|
||||
waku: Waku,
|
||||
channelId: ChannelId,
|
||||
contentTopic: ContentTopic,
|
||||
senderId: SdsParticipantID,
|
||||
@ -214,6 +373,7 @@ proc new*(
|
||||
sdsConfig: SdsConfig,
|
||||
rateConfig: RateLimitConfig,
|
||||
brokerCtx: BrokerContext = globalBrokerContext(),
|
||||
sendHandler: SendHandler = nil,
|
||||
): T =
|
||||
## Pipeline handlers (segmentation/SDS/rate-limit) are constructed
|
||||
## inside the channel rather than handed in by the caller — they are
|
||||
@ -221,8 +381,20 @@ proc new*(
|
||||
## should be wiring up. Encryption is delegated to the `Encrypt`/
|
||||
## `Decrypt` request brokers, so the channel keeps no per-instance
|
||||
## encryption state either.
|
||||
##
|
||||
## `sendHandler` defaults to `waku.send`; tests pass a fake to drive
|
||||
## the send state machine without touching the network.
|
||||
let resolvedSendHandler =
|
||||
if sendHandler.isNil():
|
||||
proc(
|
||||
envelope: MessageEnvelope
|
||||
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
|
||||
return await waku.send(envelope)
|
||||
else:
|
||||
sendHandler
|
||||
|
||||
let chn = T(
|
||||
deliveryService: deliveryService,
|
||||
sendHandler: resolvedSendHandler,
|
||||
channelId: channelId,
|
||||
contentTopic: contentTopic,
|
||||
senderId: senderId,
|
||||
@ -231,20 +403,21 @@ proc new*(
|
||||
sdsHandler: SdsHandler.new(sdsConfig, senderId),
|
||||
rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx),
|
||||
requestIds: initTable[RequestId, seq[RequestId]](),
|
||||
pendingRequests: @[],
|
||||
pendingMessagingRequests: @[],
|
||||
brokerCtx: brokerCtx,
|
||||
)
|
||||
|
||||
## Each channel owns its own egress + ingress listeners on
|
||||
## `chn.brokerCtx`, filtered to traffic addressed to this channel.
|
||||
## Keeping the listeners (and the procs they call) inside the
|
||||
## channel lets `onReadyToSend` and `onMessageReceived` stay private
|
||||
## — the manager doesn't need to know about them.
|
||||
## Each channel owns its own egress + ingress + send-completion
|
||||
## listeners on `chn.brokerCtx`, filtered to traffic addressed to
|
||||
## this channel. Keeping the listeners (and the handler procs they
|
||||
## call) inside the channel lets `onReadyToSend` /
|
||||
## `onMessageReceived` / `onMessageSent` / `onMessageError` stay
|
||||
## private — the manager doesn't need to know about them.
|
||||
discard ReadyToSendEvent.listen(
|
||||
chn.brokerCtx,
|
||||
proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} =
|
||||
if evt.channelId == chn.channelId:
|
||||
await chn.onReadyToSend(evt.msgs)
|
||||
await chn.onReadyToSend(evt)
|
||||
,
|
||||
)
|
||||
|
||||
@ -261,4 +434,20 @@ proc new*(
|
||||
,
|
||||
)
|
||||
|
||||
## Send-completion events are tagged with the per-segment messaging
|
||||
## `requestId` — globally unique, so we don't need any channel filter
|
||||
## up front. The handler scans this channel's pending entries for a
|
||||
## match and is a no-op when the id belongs to a different channel.
|
||||
discard MessageSentEvent.listen(
|
||||
chn.brokerCtx,
|
||||
proc(evt: MessageSentEvent): Future[void] {.async: (raises: []).} =
|
||||
chn.onMessageSent(evt.requestId),
|
||||
)
|
||||
|
||||
discard MessageErrorEvent.listen(
|
||||
chn.brokerCtx,
|
||||
proc(evt: MessageErrorEvent): Future[void] {.async: (raises: []).} =
|
||||
chn.onMessageError(evt.requestId),
|
||||
)
|
||||
|
||||
return chn
|
||||
|
||||
@ -14,6 +14,7 @@ import waku/api/api
|
||||
import waku/api/api_conf
|
||||
import waku/events/message_events as waku_message_events
|
||||
import waku/factory/waku as waku_factory
|
||||
import waku/node/delivery_service/delivery_service
|
||||
import waku/waku_core/topics
|
||||
|
||||
import ./reliable_channel
|
||||
@ -23,11 +24,10 @@ export reliable_channel
|
||||
|
||||
type ReliableChannelManager* = ref object
|
||||
channels: Table[ChannelId, ReliableChannel]
|
||||
deliveryService: DeliveryService
|
||||
## Owned by the manager. The ownership chain is
|
||||
## ReliableChannelManager -> DeliveryService -> Waku -> WakuNode.
|
||||
## Hidden so callers can't substitute their own and bypass the
|
||||
## manager's pipeline.
|
||||
waku: Waku
|
||||
## Owned by the manager. The channel layer reaches the messaging
|
||||
## API through `waku.send(envelope)`; constructing DeliveryTasks
|
||||
## directly would breach the layer boundary.
|
||||
brokerCtx: BrokerContext
|
||||
|
||||
proc new*(
|
||||
@ -38,15 +38,13 @@ proc new*(
|
||||
## TODO !! The proper ownership chain is:
|
||||
## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode,
|
||||
## and this will be implemented in the future. For now, `createNode`
|
||||
## is called here to get a DeliveryService instance, and the WakuNode is immediately discarded.
|
||||
## is called here to get a Waku instance, and the WakuNode is immediately discarded.
|
||||
## This is a temporary workaround to get the API
|
||||
|
||||
let waku = ?(await createNode(conf))
|
||||
|
||||
let manager = T(
|
||||
channels: initTable[ChannelId, ReliableChannel](),
|
||||
deliveryService: waku.deliveryService,
|
||||
brokerCtx: brokerCtx,
|
||||
channels: initTable[ChannelId, ReliableChannel](), waku: waku, brokerCtx: brokerCtx
|
||||
)
|
||||
|
||||
return ok(manager)
|
||||
@ -55,17 +53,18 @@ proc start*(self: ReliableChannelManager): Result[void, string] =
|
||||
## Bring the owned DeliveryService up. Separated from `new` so callers
|
||||
## can register encryption providers / create channels before traffic
|
||||
## starts flowing.
|
||||
self.deliveryService.startDeliveryService()
|
||||
self.waku.deliveryService.startDeliveryService()
|
||||
|
||||
proc stop*(self: ReliableChannelManager) {.async.} =
|
||||
if not self.deliveryService.isNil():
|
||||
await self.deliveryService.stopDeliveryService()
|
||||
if not self.waku.isNil():
|
||||
await self.waku.deliveryService.stopDeliveryService()
|
||||
|
||||
proc createReliableChannel*(
|
||||
self: ReliableChannelManager,
|
||||
channelId: ChannelId,
|
||||
contentTopic: ContentTopic,
|
||||
senderId: SdsParticipantID,
|
||||
sendHandler: SendHandler = nil,
|
||||
): Result[ChannelId, string] =
|
||||
## Spec entry point. The `DeliveryService` and `rng` the channel needs
|
||||
## are sourced from the owning `ReliableChannelManager` rather than
|
||||
@ -75,6 +74,9 @@ proc createReliableChannel*(
|
||||
##
|
||||
## Segmentation, SDS and rate-limit configs will eventually be read
|
||||
## from the node's `NodeConfig`. Defaults for now.
|
||||
##
|
||||
## `sendHandler` is left `nil` in production so the channel uses the
|
||||
## owned `waku.send`; tests pass a fake to bypass the network.
|
||||
if self.channels.hasKey(channelId):
|
||||
return err("channel already exists: " & channelId)
|
||||
|
||||
@ -94,7 +96,7 @@ proc createReliableChannel*(
|
||||
)
|
||||
|
||||
let chn = ReliableChannel.new(
|
||||
deliveryService = self.deliveryService,
|
||||
waku = self.waku,
|
||||
channelId = channelId,
|
||||
contentTopic = contentTopic,
|
||||
senderId = senderId,
|
||||
@ -102,6 +104,7 @@ proc createReliableChannel*(
|
||||
sdsConfig = sdsConfig,
|
||||
rateConfig = rateConfig,
|
||||
brokerCtx = self.brokerCtx,
|
||||
sendHandler = sendHandler,
|
||||
)
|
||||
|
||||
self.channels[channelId] = chn
|
||||
|
||||
@ -147,3 +147,171 @@ suite "Reliable Channel - ingress":
|
||||
check not fired
|
||||
|
||||
await manager.stop()
|
||||
|
||||
suite "Reliable Channel - send state machine":
|
||||
asyncTest "MessageSentEvent finalises the channelReqId as Sent":
|
||||
## Drives the real send pipeline (`send` -> segmentation -> SDS ->
|
||||
## rate_limit -> encrypt -> dispatch) via a fake `SendHandler` that
|
||||
## returns a canned `RequestId` instead of hitting the network.
|
||||
## Emitting the delivery-layer `MessageSentEvent` must drive the
|
||||
## channel-level state machine through `Confirmed` and produce a
|
||||
## `ChannelMessageSentEvent` (channel-level terminal event) for the
|
||||
## `channelReqId` returned by `send()`.
|
||||
const
|
||||
channelId = ChannelId("sm-success-channel")
|
||||
contentTopic = ContentTopic("/reliable-channel/test/sm-success")
|
||||
fakeMsgReqId = RequestId("fake-msg-req-1")
|
||||
|
||||
var manager: ReliableChannelManager
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
|
||||
"Failed to create manager"
|
||||
)
|
||||
|
||||
setNoopEncryption()
|
||||
|
||||
var sendCalls = 0
|
||||
let fakeSend: SendHandler = proc(
|
||||
env: MessageEnvelope
|
||||
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
|
||||
sendCalls.inc
|
||||
return ok(fakeMsgReqId)
|
||||
|
||||
discard manager
|
||||
.createReliableChannel(
|
||||
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
|
||||
)
|
||||
.expect("createReliableChannel")
|
||||
|
||||
let sentFut = newFuture[RequestId]("channel-sent")
|
||||
discard ChannelMessageSentEvent
|
||||
.listen(
|
||||
brokerCtx,
|
||||
proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} =
|
||||
if not sentFut.finished() and evt.channelId == channelId:
|
||||
sentFut.complete(evt.requestId)
|
||||
,
|
||||
)
|
||||
.expect("listen ChannelMessageSentEvent")
|
||||
|
||||
let channelReqId = manager.send(channelId, "hello".toBytes()).expect("send")
|
||||
|
||||
let dispatchDeadline = Moment.now() + 1.seconds
|
||||
while Moment.now() < dispatchDeadline and sendCalls == 0:
|
||||
await sleepAsync(5.milliseconds)
|
||||
check sendCalls == 1
|
||||
|
||||
waku_message_events.MessageSentEvent.emit(
|
||||
brokerCtx,
|
||||
waku_message_events.MessageSentEvent(requestId: fakeMsgReqId, messageHash: ""),
|
||||
)
|
||||
|
||||
let finalised = await sentFut.withTimeout(1.seconds)
|
||||
check finalised
|
||||
if finalised:
|
||||
check sentFut.read() == channelReqId
|
||||
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "two independent channelReqIds are finalised independently":
|
||||
## Two `send()` calls -> two independent `channelReqId`s, each with
|
||||
## one segment under the current segmentation skeleton
|
||||
## (`performSegmentation` always emits exactly one segment). The
|
||||
## fake `SendHandler` returns distinct `messagingReqId`s; finalising
|
||||
## the first emits `ChannelMessageSentEvent` for its `channelReqId`,
|
||||
## finalising the second as a failure emits `ChannelMessageErrorEvent`
|
||||
## for the other.
|
||||
const
|
||||
channelId = ChannelId("sm-multi-channel")
|
||||
contentTopic = ContentTopic("/reliable-channel/test/sm-multi")
|
||||
|
||||
var manager: ReliableChannelManager
|
||||
var brokerCtx: BrokerContext
|
||||
lockNewGlobalBrokerContext:
|
||||
brokerCtx = globalBrokerContext()
|
||||
manager = (await ReliableChannelManager.new(createApiNodeConf())).expect(
|
||||
"Failed to create manager"
|
||||
)
|
||||
|
||||
setNoopEncryption()
|
||||
|
||||
var msgReqIds: seq[RequestId]
|
||||
let fakeSend: SendHandler = proc(
|
||||
env: MessageEnvelope
|
||||
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
|
||||
let id = RequestId("fake-msg-req-" & $(msgReqIds.len + 1))
|
||||
msgReqIds.add(id)
|
||||
return ok(id)
|
||||
|
||||
discard manager
|
||||
.createReliableChannel(
|
||||
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
|
||||
)
|
||||
.expect("createReliableChannel")
|
||||
|
||||
let sentFut = newFuture[RequestId]("channel-sent")
|
||||
let erroredFut = newFuture[RequestId]("channel-errored")
|
||||
discard ChannelMessageSentEvent
|
||||
.listen(
|
||||
brokerCtx,
|
||||
proc(evt: ChannelMessageSentEvent) {.async: (raises: []).} =
|
||||
if not sentFut.finished() and evt.channelId == channelId:
|
||||
sentFut.complete(evt.requestId)
|
||||
,
|
||||
)
|
||||
.expect("listen ChannelMessageSentEvent")
|
||||
discard ChannelMessageErrorEvent
|
||||
.listen(
|
||||
brokerCtx,
|
||||
proc(evt: ChannelMessageErrorEvent) {.async: (raises: []).} =
|
||||
if not erroredFut.finished() and evt.channelId == channelId:
|
||||
erroredFut.complete(evt.requestId)
|
||||
,
|
||||
)
|
||||
.expect("listen ChannelMessageErrorEvent")
|
||||
|
||||
let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1")
|
||||
let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2")
|
||||
|
||||
let dispatchDeadline = Moment.now() + 1.seconds
|
||||
while Moment.now() < dispatchDeadline and msgReqIds.len < 2:
|
||||
await sleepAsync(5.milliseconds)
|
||||
check msgReqIds.len == 2
|
||||
|
||||
waku_message_events.MessageSentEvent.emit(
|
||||
brokerCtx,
|
||||
waku_message_events.MessageSentEvent(requestId: msgReqIds[0], messageHash: ""),
|
||||
)
|
||||
let sentArrived = await sentFut.withTimeout(1.seconds)
|
||||
check sentArrived
|
||||
if sentArrived:
|
||||
check sentFut.read() == channelReqId1
|
||||
## The second `channelReqId` must NOT have finalised yet — its
|
||||
## segment is still `InFlight`.
|
||||
check not erroredFut.finished()
|
||||
|
||||
waku_message_events.MessageErrorEvent.emit(
|
||||
brokerCtx,
|
||||
waku_message_events.MessageErrorEvent(
|
||||
requestId: msgReqIds[1], messageHash: "", error: "synthetic"
|
||||
),
|
||||
)
|
||||
let erroredArrived = await erroredFut.withTimeout(1.seconds)
|
||||
check erroredArrived
|
||||
if erroredArrived:
|
||||
check erroredFut.read() == channelReqId2
|
||||
|
||||
await manager.stop()
|
||||
|
||||
asyncTest "TODO: channelReqId not pruned until ALL its segments are final":
|
||||
## Placeholder for the multi-sibling prune rule. Today's
|
||||
## `performSegmentation` (segmentation skeleton) always emits
|
||||
## exactly one segment per `send()`, so multiple siblings under one
|
||||
## `channelReqId` cannot be produced through the real pipeline.
|
||||
## Implement once segmentation does real chunking: send a payload
|
||||
## larger than `DefaultSegmentSizeBytes`, capture the N
|
||||
## `messagingReqId`s from a fake `SendHandler`, finalise some, and
|
||||
## assert prune only fires once every sibling is final.
|
||||
skip()
|
||||
|
||||
@ -11,6 +11,10 @@ type
|
||||
contentTopic*: ContentTopic
|
||||
payload*: seq[byte]
|
||||
ephemeral*: bool
|
||||
meta*: seq[byte]
|
||||
## Opaque wire-format marker carried on the underlying WakuMessage.
|
||||
## Higher layers (e.g. Reliable Channel) stamp this so peers can route
|
||||
## ingress traffic to their corresponding layer. Empty by default.
|
||||
|
||||
RequestId* = distinct string
|
||||
|
||||
@ -34,12 +38,18 @@ proc init*(
|
||||
contentTopic: ContentTopic,
|
||||
payload: seq[byte] | string,
|
||||
ephemeral: bool = false,
|
||||
meta: seq[byte] = @[],
|
||||
): MessageEnvelope =
|
||||
when payload is seq[byte]:
|
||||
MessageEnvelope(contentTopic: contentTopic, payload: payload, ephemeral: ephemeral)
|
||||
MessageEnvelope(
|
||||
contentTopic: contentTopic, payload: payload, ephemeral: ephemeral, meta: meta
|
||||
)
|
||||
else:
|
||||
MessageEnvelope(
|
||||
contentTopic: contentTopic, payload: payload.toBytes(), ephemeral: ephemeral
|
||||
contentTopic: contentTopic,
|
||||
payload: payload.toBytes(),
|
||||
ephemeral: ephemeral,
|
||||
meta: meta,
|
||||
)
|
||||
|
||||
proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
|
||||
@ -48,6 +58,7 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
|
||||
contentTopic: envelope.contentTopic,
|
||||
payload: envelope.payload,
|
||||
ephemeral: envelope.ephemeral,
|
||||
meta: envelope.meta,
|
||||
timestamp: getNowInNanosecondTime(),
|
||||
)
|
||||
|
||||
|
||||
@ -26,7 +26,7 @@ logScope:
|
||||
# This useful util is missing from sequtils, this extends applyIt with predicate...
|
||||
template applyItIf*(varSeq, pred, op: untyped) =
|
||||
for i in low(varSeq) .. high(varSeq):
|
||||
let it {.inject.} = varSeq[i]
|
||||
var it {.inject.} = varSeq[i]
|
||||
if pred:
|
||||
op
|
||||
varSeq[i] = it
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user