feat: wire ReliabilityManager into the Reliable Channel (#3942)

This commit is contained in:
Darshan 2026-06-16 10:40:59 +05:30 committed by GitHub
parent 6837ae0c1f
commit 7e98489a24
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
9 changed files with 1076 additions and 106 deletions

View File

@ -116,6 +116,7 @@ if defined(android):
switch("passC", "--sysroot=" & sysRoot)
switch("passL", "--sysroot=" & sysRoot)
switch("cincludes", sysRoot & "/usr/include/")
# begin Nimble config (version 2)
--noNimblePath
when withDir(thisDir(), system.fileExists("nimble.paths")):

View File

@ -63,7 +63,7 @@ requires "nim >= 2.2.4",
requires "https://github.com/logos-messaging/nim-ffi#v0.1.3"
requires "https://github.com/logos-messaging/nim-sds.git#abdd40cc645f1b024c3ee99cced7e287c4e4c441"
requires "https://github.com/logos-messaging/nim-sds.git#b12f5ee07c5b764303b51fb948b32a4ade1de3b5"
requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.1.1"

View File

@ -125,6 +125,10 @@ func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} =
func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} =
self.senderId
proc stop*(self: ReliableChannel) {.async: (raises: []).} =
## Stops the SDS background loops. Persisted SDS state survives.
await self.sdsHandler.stop()
proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) =
## Tries to finalize the channel-level request identified by `channelReqId` if
## certain conditions are met, i.e., no segments are still awaiting dispatch or in flight,
@ -290,7 +294,7 @@ proc onReadyToSend(
proc send*(
self: ReliableChannel, payload: seq[byte], ephemeral: bool = false
): Result[RequestId, string] =
): Future[Result[RequestId, string]] {.async: (raises: []).} =
## Single application-level send. The first three stages of the
## outgoing pipeline are chained explicitly so the flow is visible
## at a glance:
@ -316,9 +320,7 @@ proc send*(
for segmentBytes in self.segmentation.performSegmentation(payload):
## Segments arrive already encoded; the segmentation module owns
## the wire format so SDS only ever sees opaque bytes.
let sdsBytes = self.sdsHandler.wrapOutgoing(
self.channelId, self.senderId, segmentBytes
).valueOr:
let sdsBytes = (await self.sdsHandler.wrapOutgoing(segmentBytes)).valueOr:
return err("SDS wrap failed: " & error)
enqueued.add(sdsBytes)
segmentCount.inc()
@ -331,6 +333,48 @@ proc send*(
return ok(channelReqId)
proc reportReceived(self: ReliableChannel, content: seq[byte]) =
## Tail of the ingress pipeline (reassemble -> emit).
let reassembled = self.segmentation.handleIncomingSegment(content)
if reassembled.isSome():
## Emit on the captured `brokerCtx` (the manager's), so the
## application listener that the manager has set up on that same
## context picks the event up.
ChannelMessageReceivedEvent.emit(
self.brokerCtx,
ChannelMessageReceivedEvent(
channelId: self.channelId,
senderId: self.senderId,
payload: reassembled.get().payload,
),
)
proc dispatchRepair(self: ReliableChannel, wire: seq[byte]) {.async: (raises: []).} =
## Repair rebroadcasts skip the rate-limit queue — its emissions are
## claimed FIFO by pending sends. Pacing is done by SDS itself.
let encRes = await Encrypt.request(wire)
let encrypted = encRes.valueOr:
debug "SDS repair rebroadcast dropped: encryption failed",
channelId = self.channelId, error = error
return
## Ephemeral: the original message is already store-persisted.
let envelope = MessageEnvelope(
contentTopic: self.contentTopic,
payload: seq[byte](encrypted),
ephemeral: true,
meta: LipWireReliableChannelVersion.toBytes(),
)
let sendRes =
try:
await self.sendHandler(envelope)
except CatchableError as e:
Result[RequestId, string].err("messaging send raised: " & e.msg)
if sendRes.isErr():
debug "SDS repair rebroadcast dropped: dispatch failed",
channelId = self.channelId, error = sendRes.error
proc onMessageReceived(
self: ReliableChannel, messageHash: string, payload: seq[byte]
) {.async: (raises: []).} =
@ -357,23 +401,13 @@ proc onMessageReceived(
return
let plaintextBytes = seq[byte](plaintext)
let unwrapped = self.sdsHandler.handleIncoming(plaintextBytes)
if unwrapped.isErr():
## SDS returns every payload deliverable now, in causal order — the
## message itself plus any parked segments it released. Empty = consumed
## by SDS; `err` = not a decodable SDS envelope. Both drop here.
let deliverable = (await self.sdsHandler.handleIncoming(plaintextBytes)).valueOr:
return
let reassembled = self.segmentation.handleIncomingSegment(unwrapped.get().content)
if reassembled.isSome():
## Emit on the captured `brokerCtx` (the manager's), so the
## application listener that the manager has set up on that same
## context picks the event up.
ChannelMessageReceivedEvent.emit(
self.brokerCtx,
ChannelMessageReceivedEvent(
channelId: self.channelId,
senderId: self.senderId,
payload: reassembled.get().payload,
),
)
for content in deliverable:
self.reportReceived(content)
proc new*(
T: type ReliableChannel,
@ -403,12 +437,17 @@ proc new*(
senderId: senderId,
rng: libp2p_crypto.newRng(),
segmentation: SegmentationHandler.new(segConfig),
sdsHandler: SdsHandler.new(sdsConfig, senderId),
sdsHandler: SdsHandler.new(sdsConfig, channelId, senderId),
rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx),
channelReqs: initOrderedTable[RequestId, ChannelReqState](),
brokerCtx: brokerCtx,
)
## SDS-R repair rebroadcasts go straight to the dispatch tail.
chn.sdsHandler.onRebroadcast = proc(wire: seq[byte]) {.gcsafe, raises: [].} =
asyncSpawn chn.dispatchRepair(wire)
chn.sdsHandler.start()
## Each channel owns its own egress + ingress + send-completion
## listeners on `chn.brokerCtx`, filtered to traffic addressed to
## this channel. Keeping the listeners (and the handler procs they

View File

@ -5,9 +5,10 @@
##
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
import std/tables
import std/[options, tables]
import results
import chronos
import chronicles
import stew/byteutils
import brokers/broker_context
@ -15,12 +16,17 @@ import brokers/broker_context
import logos_delivery/waku/events/message_events as waku_message_events
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/waku_core/topics
import logos_delivery/waku/persistency/sds_persistency
import ./reliable_channel
import ./encryption/noop_encryption
export reliable_channel
const SdsJobId = "sds"
## One persistency job shared by every channel's SDS state; rows are
## keyed by channelId.
type ReliableChannelManager* = ref object
channels: Table[ChannelId, ReliableChannel]
messagingClient: MessagingClient ## Borrowed from the owning `Waku`.
@ -57,8 +63,22 @@ proc start*(self: ReliableChannelManager): Result[void, string] =
ok()
proc stop*(self: ReliableChannelManager) {.async.} =
## Placeholder mirror of `start`.
discard
## Stops every channel's SDS background loops. Persisted state survives.
for chn in self.channels.values:
await chn.stop()
self.channels.clear()
proc sdsPersistence(): Option[Persistence] =
## SDS backend from the Persistency singleton; memory-only fallback when
## it is unavailable (e.g. unit tests).
let p = Persistency.instance().valueOr:
info "SDS persistence disabled, running memory-only", reason = $error
return none(Persistence)
let job = p.openJob(SdsJobId).valueOr:
warn "SDS persistence disabled, could not open persistency job",
jobId = SdsJobId, reason = $error
return none(Persistence)
return some(newSdsPersistence(job))
proc createReliableChannel*(
self: ReliableChannelManager,
@ -90,7 +110,7 @@ proc createReliableChannel*(
acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs,
maxRetransmissions: DefaultMaxRetransmissions,
causalHistorySize: DefaultCausalHistorySize,
persistence: nil,
persistence: sdsPersistence(),
)
let rateConfig = RateLimitConfig(
epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch
@ -114,11 +134,14 @@ proc createReliableChannel*(
proc closeChannel*(
self: ReliableChannelManager, channelId: ChannelId
): Result[void, string] =
## Flush state, persist outstanding SDS buffers, release resources.
if not self.channels.hasKey(channelId):
): Future[Result[void, string]] {.async: (raises: []).} =
## Stops the channel's SDS loops and releases the channel. Persisted SDS
## state survives, so re-creating the channel restores it.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
self.channels.del(channelId)
await chn.stop()
return ok()
proc send*(
@ -126,14 +149,14 @@ proc send*(
channelId: ChannelId,
appPayload: seq[byte],
ephemeral: bool = false,
): Result[RequestId, string] =
): Future[Result[RequestId, string]] {.async: (raises: []).} =
## Spec-level entry point. Looks the channel up by id and delegates
## to `ReliableChannel.send`, which exposes the visible pipeline
## segmentation -> sds -> rate_limit_manager -> encryption.
let chn = self.channels.getOrDefault(channelId)
if chn.isNil():
return err("unknown channel: " & channelId)
return chn.send(appPayload, ephemeral)
return await chn.send(appPayload, ephemeral)
## Inbound messages are not handed to the manager by direct call. Each
## `ReliableChannel` installs its own `MessageReceivedEvent` listener

View File

@ -1,62 +1,216 @@
## Scalable Data Sync (SDS) component for the Reliable Channel API.
##
## Provides end-to-end delivery guarantees via causal history tracking,
## acknowledgements, and retransmission of unacknowledged segments.
##
## Skeleton: `wrapOutgoing` and `handleIncoming` are pass-throughs so
## the send/receive circuit can exercise the surrounding pipeline.
## Real SDS wrapping will plug in via `nim-sds` later.
## `SdsHandler` adapts one nim-sds `ReliabilityManager` to a single channel:
## `wrapOutgoing` adds reliability metadata to outgoing segments,
## `handleIncoming` unwraps incoming ones and enforces causal-order delivery.
##
## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html
import results
{.push raises: [].}
import std/[options, tables]
import results, chronos, chronicles
import nimcrypto/keccak
import stew/byteutils
from std/times import initDuration, getTime, toUnix, nanosecond
import sds
import message as sds_message
import types/persistence as sds_persistence_types
import ./sds_persistence
export sds_message, sds_persistence_types
export sds_message, sds_persistence
logScope:
topics = "sds-handler"
const
DefaultAcknowledgementTimeoutMs* = 5_000
DefaultMaxRetransmissions* = 5
DefaultCausalHistorySize* = 2
MaxPendingContent = 1024
## Bound on segments parked while their causal dependencies are missing.
type
SdsConfig* = object
acknowledgementTimeoutMs*: int
maxRetransmissions*: int
causalHistorySize*: int
persistence*: SdsPersistence
persistence*: Option[Persistence]
## Durability backend. `none` runs memory-only: reliability still
## works, state does not survive a restart.
RebroadcastHandler* = proc(wire: seq[byte]) {.gcsafe, raises: [].}
## Invoked with a full SDS envelope to rebroadcast (SDS-R repair).
SdsHandler* = ref object
config*: SdsConfig
participantId*: SdsParticipantID
rm: ReliabilityManager
channelId: SdsChannelID
pendingContent: OrderedTable[SdsMessageID, seq[byte]]
## Segments parked until their causal dependencies arrive.
released: seq[seq[byte]]
## Parked segments released by the unwrap currently in flight;
## filled via `onMessageReady`, drained by `handleIncoming`.
ingressLock: AsyncLock
## Serializes `handleIncoming` so `released` belongs to exactly one
## in-flight unwrap and delivery order stays causal.
participantId: SdsParticipantID
onRebroadcast*: RebroadcastHandler
## Set by the owning `ReliableChannel` after construction — the closure
## captures the channel to run its dispatch tail, so it cannot be
## passed to `new`. The other callbacks need no channel and are wired
## internally in `installCallbacks`.
proc computeMessageId(self: SdsHandler, payload: seq[byte]): SdsMessageID =
## keccak-256(senderId + wrap-time nanoseconds + content): unique per
## segment, so identical content is not collapsed by the SDS dedup.
let now = getTime()
var ctx: keccak256
ctx.init()
ctx.update(string(self.participantId))
ctx.update($(now.toUnix() * 1_000_000_000 + now.nanosecond()))
ctx.update(payload)
SdsMessageID(byteutils.toHex(ctx.finish().data))
proc installCallbacks(self: SdsHandler) =
## Direct field assignment is race-free here: no periodic task or protocol
## op has started yet.
self.rm.onMessageReady = proc(
messageId: SdsMessageID, channelId: SdsChannelID
) {.gcsafe.} =
## Fires during unwrap, under the manager lock — must stay synchronous.
## Collect only; `handleIncoming` delivers after the direct content.
## The manager owns a single channel, so `channelId` is always ours; the
## check documents that invariant and guards against future misuse.
if channelId == self.channelId and messageId in self.pendingContent:
debug "SDS releasing buffered message, dependencies met", channelId, messageId
self.released.add(self.pendingContent.getOrDefault(messageId))
self.pendingContent.del(messageId)
self.rm.onMessageSent = proc(
messageId: SdsMessageID, channelId: SdsChannelID
) {.gcsafe.} =
debug "SDS message acknowledged", channelId, messageId
self.rm.onMissingDependencies = proc(
messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID
) {.gcsafe.} =
## Recovery via SDS sync / SDS-R for now; targeted store fetch by
## retrieval hint is a planned follow-up.
debug "SDS message has missing dependencies",
channelId, messageId, missing = missingDeps.len
self.rm.onRepairReady = proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} =
if not self.onRebroadcast.isNil():
self.onRebroadcast(message)
proc new*(
T: type SdsHandler,
config: SdsConfig,
participantId: SdsParticipantID = SdsParticipantID(""),
channelId: SdsChannelID,
participantId: SdsParticipantID,
): T =
return T(config: config, participantId: participantId)
## One `ReliabilityManager` per channel. `participantId` feeds SDS-R
## response groups; an empty id disables repair participation.
let reliabilityConfig = ReliabilityConfig.init(
maxCausalHistory = config.causalHistorySize,
resendInterval = initDuration(milliseconds = config.acknowledgementTimeoutMs),
maxResendAttempts = config.maxRetransmissions,
)
let rm = ReliabilityManager.new(
participantId, reliabilityConfig, config.persistence.get(noOpPersistence())
)
let handler = T(
rm: rm,
channelId: channelId,
pendingContent: initOrderedTable[SdsMessageID, seq[byte]](),
released: @[],
ingressLock: newAsyncLock(),
participantId: participantId,
)
handler.installCallbacks()
return handler
proc start*(self: SdsHandler) =
## Starts the SDS background loops. Persisted channel state is restored
## lazily on first use: `wrapOutgoing` and `handleIncoming` both ensure
## the channel, and `handleIncoming` loads before its duplicate check so a
## replay right after a restart is still caught.
self.rm.startPeriodicTasks()
proc stop*(self: SdsHandler) {.async: (raises: []).} =
## Cancels the background loops. Persisted state is left intact.
await self.rm.cleanup()
proc wrapOutgoing*(
self: SdsHandler,
channelId: SdsChannelID,
senderId: SdsParticipantID,
payload: seq[byte],
): Result[seq[byte], string] =
## Stage 2 of the outgoing pipeline (segmentation -> sds -> rate_limit_manager -> encryption).
## Skeleton: pass the encoded segment through unchanged. Real causal
## history / lamport / bloom-filter population will replace this.
return ok(payload)
self: SdsHandler, payload: seq[byte]
): Future[Result[seq[byte], string]] {.async: (raises: []).} =
## Wraps a segment with reliability metadata and registers it in the SDS
## outgoing buffer awaiting end-to-end acknowledgement.
let wrapped = (
await self.rm.wrapOutgoingMessage(
payload, self.computeMessageId(payload), self.channelId
)
).valueOr:
return err("SDS wrap failed: " & $error)
return ok(wrapped)
proc handleIncoming*(
self: SdsHandler, msg: seq[byte]
): Result[tuple[content: seq[byte], channelId: SdsChannelID], string] =
## Skeleton: pass the bytes through; channel id is left empty until
## the real wire format provides it.
return ok((content: msg, channelId: SdsChannelID("")))
self: SdsHandler, wire: seq[byte]
): Future[Result[seq[seq[byte]], string]] {.async: (raises: []).} =
## Returns the payloads deliverable now, in causal order. Empty when SDS
## consumed the message; `err` when the bytes are not an SDS envelope.
let msg = deserializeMessage(wire).valueOr:
return err("SDS deserialization failed")
proc tickRetransmissions*(self: SdsHandler) =
## Drives retransmissions of unacknowledged messages.
discard
## Pre-filter: `unwrapReceivedMessage` auto-creates the channel it sees on
## the wire, so foreign traffic must not reach it.
if msg.channelId != self.channelId:
debug "dropping SDS message for foreign channel",
channelId = self.channelId, wireChannelId = msg.channelId
return ok(newSeq[seq[byte]]())
try:
await self.ingressLock.acquire()
try:
## Load persisted state before the duplicate check, so a replay right
## after a restart is not re-delivered. Idempotent, cheap once loaded.
(await self.rm.ensureChannel(self.channelId)).isOkOr:
return err("SDS ensureChannel failed: " & $error)
## The unwrap result does not distinguish first delivery from
## duplicate, so capture delivered-before up front.
let ctx = self.rm.channels.getOrDefault(self.channelId)
let isDuplicate = not ctx.isNil() and msg.messageId in ctx.messageHistory
self.released.setLen(0)
let unwrapped = (await self.rm.unwrapReceivedMessage(wire)).valueOr:
return err("SDS unwrap failed: " & $error)
if isDuplicate:
return ok(newSeq[seq[byte]]())
if unwrapped.missingDeps.len > 0:
if self.pendingContent.len >= MaxPendingContent:
var oldest: SdsMessageID
for k in self.pendingContent.keys:
oldest = k
break
self.pendingContent.del(oldest)
warn "SDS pending-content stash full, dropping oldest entry",
channelId = self.channelId, dropped = oldest
self.pendingContent[msg.messageId] = unwrapped.message
return ok(newSeq[seq[byte]]())
var deliverable = newSeq[seq[byte]]()
if unwrapped.message.len > 0:
## Empty content is sync traffic: causal metadata only.
deliverable.add(unwrapped.message)
deliverable.add(self.released)
self.released.setLen(0)
return ok(deliverable)
finally:
self.ingressLock.release()
except CatchableError:
return err("SDS handleIncoming failed: " & getCurrentExceptionMsg())
{.pop.}

View File

@ -1,25 +0,0 @@
## Persistence backend for SDS outgoing buffer and causal history.
##
## TODO (raised in PR review): this surface is duplicating concerns that
## should come from the SDS module itself. Once the SDS module exposes a
## complete persistence contract, drop this file and import that surface
## instead of re-declaring it here.
import message
type
SdsPersistenceKind* {.pure.} = enum
InMemory
Sqlite
SdsPersistence* = ref object of RootObj
kind*: SdsPersistenceKind
method storeOutgoing*(self: SdsPersistence, msg: SdsMessage) {.base.} =
discard
method markAcknowledged*(self: SdsPersistence, messageId: SdsMessageID) {.base.} =
discard
method unackedOlderThan*(self: SdsPersistence, ageMs: int): seq[SdsMessage] {.base.} =
discard

View File

@ -623,8 +623,8 @@
}
},
"sds": {
"version": "#abdd40cc645f1b024c3ee99cced7e287c4e4c441",
"vcsRevision": "abdd40cc645f1b024c3ee99cced7e287c4e4c441",
"version": "#b12f5ee07c5b764303b51fb948b32a4ade1de3b5",
"vcsRevision": "b12f5ee07c5b764303b51fb948b32a4ade1de3b5",
"url": "https://github.com/logos-messaging/nim-sds.git",
"downloadMethod": "git",
"dependencies": [
@ -636,10 +636,10 @@
"stint",
"metrics",
"results",
"taskpools"
"ffi"
],
"checksums": {
"sha1": "61c4ae13c6896bfa70e662520e8660a78c7f438c"
"sha1": "175f65038b9877cdf974b07c5f83081f810d5fbe"
}
},
"ffi": {
@ -720,4 +720,4 @@
}
},
"tasks": {}
}
}

View File

@ -277,9 +277,10 @@
};
sds = pkgs.fetchgit {
# Keep in sync with the nim-sds pin in nimble.lock.
url = "https://github.com/logos-messaging/nim-sds.git";
rev = "abdd40cc645f1b024c3ee99cced7e287c4e4c441";
sha256 = "01k49sljxnzjy82jljcffwqkaqvhpj1aiz605gv429sbzgyfr8mm";
rev = "b12f5ee07c5b764303b51fb948b32a4ade1de3b5";
sha256 = "1z8f0v1ww7y6zssdacjxfs6s4862dwckw25df3yn1v0qnz40rpc8";
fetchSubmodules = true;
};

View File

@ -1,6 +1,7 @@
{.used.}
import std/[net, options]
import std/[net, options, os]
from std/times import epochTime
import chronos, testutils/unittests, stew/byteutils
import brokers/broker_context
@ -14,6 +15,13 @@ import tools/confutils/cli_args
import logos_delivery/channels/reliable_channel_manager
import logos_delivery/channels/encryption/noop_encryption
import logos_delivery/waku/persistency/keys
import logos_delivery/waku/persistency/sds_persistency
## Full nim-sds API: ingress tests act as the remote peer producing real
## SDS envelopes; protocol-semantics tests decode wires and meta snapshots.
import sds
import snapshot_codec
const TestTimeout = chronos.seconds(15)
@ -75,13 +83,19 @@ suite "Reliable Channel - ingress":
)
.expect("listen ChannelMessageReceivedEvent")
## Build a `WakuMessage` that looks like one that came in off the
## wire from a peer: the spec marker on `meta` plus the right content
## topic. The manager's ingress listener should pick it up,
## decrypt (noop), unwrap SDS (pass-through), reassemble (one
## segment), and finally emit `ChannelMessageReceivedEvent`.
## Build a `WakuMessage` as it would arrive off the wire: spec marker
## on `meta`, right content topic, payload wrapped in a real SDS
## envelope by a stand-in remote peer.
let remotePeer =
ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init())
let sdsWire = (
await remotePeer.wrapOutgoingMessage(
appPayload, "ingress-test-msg-1", SdsChannelID(channelId)
)
).expect("wrapOutgoingMessage")
let inboundMsg = WakuMessage(
payload: appPayload,
payload: sdsWire,
contentTopic: contentTopic,
version: 0,
meta: LipWireReliableChannelVersion.toBytes(),
@ -202,7 +216,7 @@ suite "Reliable Channel - send state machine":
)
.expect("listen ChannelMessageSentEvent")
let channelReqId = manager.send(channelId, "hello".toBytes()).expect("send")
let channelReqId = (await manager.send(channelId, "hello".toBytes())).expect("send")
let dispatchDeadline = Moment.now() + 1.seconds
while Moment.now() < dispatchDeadline and sendCalls == 0:
@ -280,8 +294,10 @@ suite "Reliable Channel - send state machine":
)
.expect("listen ChannelMessageErrorEvent")
let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1")
let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2")
let channelReqId1 =
(await manager.send(channelId, "first".toBytes())).expect("send 1")
let channelReqId2 =
(await manager.send(channelId, "second".toBytes())).expect("send 2")
let dispatchDeadline = Moment.now() + 1.seconds
while Moment.now() < dispatchDeadline and msgReqIds.len < 2:
@ -386,7 +402,8 @@ suite "Reliable Channel - send state machine":
)
.expect("listen ChannelMessageSentEvent")
let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1")
let channelReqId1 =
(await manager.send(channelId, "first".toBytes())).expect("send 1")
## Drain the first segment fully before queueing the second, so
## the rate-limit FIFO between sibling sends isn't itself under
@ -396,7 +413,8 @@ suite "Reliable Channel - send state machine":
await sleepAsync(5.milliseconds)
check msgReqIds.len == 1
let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2")
let channelReqId2 =
(await manager.send(channelId, "second".toBytes())).expect("send 2")
## Wait until `fakeSend(m2)` has fully returned and yield once
## more so `onReadyToSend`'s post-await continuation gets a chance
@ -424,3 +442,762 @@ suite "Reliable Channel - send state machine":
check channelReqId2 in finalisedReqIds
(await waku.stop()).expect("stop")
suite "Reliable Channel - SDS persistence":
asyncTest "send persists SDS channel state through the persistency job":
## A send must flush `sds.meta` + `sds.log` through the shared "sds"
## job. Writes resolve on enqueue, so reads poll.
const
channelId = ChannelId("sds-persist-channel")
contentTopic = ContentTopic("/reliable-channel/test/persist")
Persistency.reset()
let root = getTempDir() / ("reliable_channel_sds_" & $epochTime().int)
removeDir(root)
let persistency = Persistency.instance(root).expect("persistency init")
defer:
Persistency.reset()
removeDir(root)
var waku: Waku
var manager: ReliableChannelManager
lockNewGlobalBrokerContext:
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption()
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
return ok(RequestId("persist-msg-req-1"))
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.expect("createReliableChannel")
discard (await manager.send(channelId, "persist me".toBytes())).expect("send")
## Same handle the channel layer writes through (`openJob` is idempotent).
let job = persistency.openJob("sds").expect("openJob sds")
let chanKey = toKey(SdsChannelID(channelId))
proc pollMetaExists(): Future[bool] {.async.} =
let deadline = Moment.now() + 2.seconds
while Moment.now() < deadline:
let r = await job.exists(CatMeta, chanKey)
if r.isOk() and r.get():
return true
await sleepAsync(5.milliseconds)
return false
proc pollLogRow(): Future[bool] {.async.} =
## `sds.log` keys rows by (channelId, messageId) — scan the prefix.
let deadline = Moment.now() + 2.seconds
while Moment.now() < deadline:
let r = await job.scanPrefix(CatLog, chanKey)
if r.isOk() and r.get().len > 0:
return true
await sleepAsync(5.milliseconds)
return false
check await pollMetaExists()
check await pollLogRow()
(await waku.stop()).expect("stop")
## A marked WakuMessage carrying an SDS envelope, as it arrives off the wire.
proc sdsWakuMessage(contentTopic: ContentTopic, sdsWire: seq[byte]): WakuMessage =
WakuMessage(
payload: sdsWire,
contentTopic: contentTopic,
version: 0,
meta: LipWireReliableChannelVersion.toBytes(),
)
suite "Reliable Channel - SDS lifecycle":
asyncTest "out-of-order segments are parked and delivered in causal order":
## m2 depends on m1: m2 alone delivers nothing; m1 then delivers both,
## in causal order.
const
channelId = ChannelId("sds-causal-channel")
contentTopic = ContentTopic("/reliable-channel/test/causal")
let payload1 = "first message".toBytes()
let payload2 = "second message".toBytes()
var waku: Waku
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption()
discard manager
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
var deliveries: seq[seq[byte]]
discard ChannelMessageReceivedEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
if evt.channelId == channelId:
deliveries.add(evt.payload)
,
)
.expect("listen ChannelMessageReceivedEvent")
let remotePeer =
ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init())
let wire1 = (
await remotePeer.wrapOutgoingMessage(
payload1, "causal-m1", SdsChannelID(channelId)
)
).expect("wrap m1")
let wire2 = (
await remotePeer.wrapOutgoingMessage(
payload2, "causal-m2", SdsChannelID(channelId)
)
).expect("wrap m2")
## m2 first: missing dependency m1 -> parked, nothing delivered.
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "hash-m2", message: sdsWakuMessage(contentTopic, wire2)
),
)
await sleepAsync(100.milliseconds)
check deliveries.len == 0
## m1 arrives: m1 delivered, then the parked m2 released after it.
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "hash-m1", message: sdsWakuMessage(contentTopic, wire1)
),
)
let deadline = Moment.now() + 2.seconds
while Moment.now() < deadline and deliveries.len < 2:
await sleepAsync(5.milliseconds)
check deliveries.len == 2
if deliveries.len == 2:
check deliveries[0] == payload1
check deliveries[1] == payload2
(await waku.stop()).expect("stop")
asyncTest "duplicate SDS envelope is delivered to the app only once":
const
channelId = ChannelId("sds-dup-channel")
contentTopic = ContentTopic("/reliable-channel/test/dup")
let appPayload = "deliver once".toBytes()
var waku: Waku
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption()
discard manager
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
var deliveryCount = 0
discard ChannelMessageReceivedEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
if evt.channelId == channelId:
deliveryCount.inc()
,
)
.expect("listen ChannelMessageReceivedEvent")
let remotePeer =
ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init())
let wire = (
await remotePeer.wrapOutgoingMessage(
appPayload, "dup-m1", SdsChannelID(channelId)
)
).expect("wrap")
## Same envelope twice (different hashes) — the second must be suppressed.
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "dup-hash-1", message: sdsWakuMessage(contentTopic, wire)
),
)
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "dup-hash-2", message: sdsWakuMessage(contentTopic, wire)
),
)
await sleepAsync(200.milliseconds)
check deliveryCount == 1
(await waku.stop()).expect("stop")
asyncTest "SDS envelope for a foreign channel is dropped":
## Same content topic, different SDS channel id — dropped before unwrap.
const
channelId = ChannelId("sds-foreign-channel")
contentTopic = ContentTopic("/reliable-channel/test/foreign")
var waku: Waku
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption()
discard manager
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
var fired = false
discard ChannelMessageReceivedEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
if evt.channelId == channelId:
fired = true
,
)
.expect("listen ChannelMessageReceivedEvent")
let remotePeer =
ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init())
let wire = (
await remotePeer.wrapOutgoingMessage(
"not for you".toBytes(), "foreign-m1", SdsChannelID("some-other-channel")
)
).expect("wrap")
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "foreign-hash", message: sdsWakuMessage(contentTopic, wire)
),
)
await sleepAsync(200.milliseconds)
check not fired
(await waku.stop()).expect("stop")
asyncTest "received history survives channel close and re-create":
## Receive m1, close, re-create, replay m1: the duplicate is only
## suppressed if the history was actually restored from SQLite.
const
channelId = ChannelId("sds-restore-channel")
contentTopic = ContentTopic("/reliable-channel/test/restore")
let appPayload = "survive restart".toBytes()
Persistency.reset()
let root = getTempDir() / ("reliable_channel_sds_restore_" & $epochTime().int)
removeDir(root)
let persistency = Persistency.instance(root).expect("persistency init")
defer:
Persistency.reset()
removeDir(root)
var waku: Waku
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption()
discard manager
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
var deliveryCount = 0
discard ChannelMessageReceivedEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
if evt.channelId == channelId:
deliveryCount.inc()
,
)
.expect("listen ChannelMessageReceivedEvent")
let remotePeer =
ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init())
let wire = (
await remotePeer.wrapOutgoingMessage(
appPayload, "restore-m1", SdsChannelID(channelId)
)
).expect("wrap")
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "restore-hash-1", message: sdsWakuMessage(contentTopic, wire)
),
)
var deadline = Moment.now() + 2.seconds
while Moment.now() < deadline and deliveryCount < 1:
await sleepAsync(5.milliseconds)
check deliveryCount == 1
## Writes resolve on enqueue — wait until the row is applied before closing.
let job = persistency.openJob("sds").expect("openJob sds")
let chanKey = toKey(SdsChannelID(channelId))
deadline = Moment.now() + 2.seconds
var logVisible = false
while Moment.now() < deadline and not logVisible:
let r = await job.scanPrefix(CatLog, chanKey)
logVisible = r.isOk() and r.get().len > 0
if not logVisible:
await sleepAsync(5.milliseconds)
check logVisible
(await manager.closeChannel(channelId)).expect("closeChannel")
discard manager
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("re-createReliableChannel")
## Replay the same envelope. Only a restored history suppresses it.
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "restore-hash-2", message: sdsWakuMessage(contentTopic, wire)
),
)
await sleepAsync(300.milliseconds)
check deliveryCount == 1
(await waku.stop()).expect("stop")
suite "Reliable Channel - SDS protocol semantics":
asyncTest "a reply references the received message and advances the lamport clock":
## After receiving m1, our outgoing wire must reference m1 in its causal
## history (that reference IS the ack) with a higher lamport.
const
channelId = ChannelId("sds-semantics-channel")
contentTopic = ContentTopic("/reliable-channel/test/semantics")
var waku: Waku
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption()
var capturedWires: seq[seq[byte]]
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
## Noop encryption is identity, so the envelope payload IS the SDS wire.
capturedWires.add(env.payload)
return ok(RequestId("semantics-req-" & $capturedWires.len))
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.expect("createReliableChannel")
let remotePeer =
ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init())
let wire1 = (
await remotePeer.wrapOutgoingMessage(
"from remote".toBytes(), "semantics-m1", SdsChannelID(channelId)
)
).expect("wrap m1")
let m1 = deserializeMessage(wire1).expect("deserialize m1")
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "semantics-hash-1", message: sdsWakuMessage(contentTopic, wire1)
),
)
await sleepAsync(100.milliseconds)
discard (await manager.send(channelId, "reply".toBytes())).expect("send")
var deadline = Moment.now() + 2.seconds
while Moment.now() < deadline and capturedWires.len < 1:
await sleepAsync(5.milliseconds)
check capturedWires.len == 1
let reply = deserializeMessage(capturedWires[0]).expect("deserialize reply")
check SdsMessageID("semantics-m1") in reply.causalHistory.getMessageIds()
check reply.lamportTimestamp > m1.lamportTimestamp
(await waku.stop()).expect("stop")
asyncTest "an unacknowledged send is acked by a later remote message":
## Our send sits in the outgoing buffer (visible in the persisted meta)
## until any later remote message references it — then the buffer drains.
const
channelId = ChannelId("sds-ack-channel")
contentTopic = ContentTopic("/reliable-channel/test/ack")
Persistency.reset()
let root = getTempDir() / ("reliable_channel_sds_ack_" & $epochTime().int)
removeDir(root)
let persistency = Persistency.instance(root).expect("persistency init")
defer:
Persistency.reset()
removeDir(root)
var waku: Waku
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption()
var capturedWires: seq[seq[byte]]
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
capturedWires.add(env.payload)
return ok(RequestId("ack-req-" & $capturedWires.len))
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.expect("createReliableChannel")
discard (await manager.send(channelId, "needs ack".toBytes())).expect("send")
var deadline = Moment.now() + 2.seconds
while Moment.now() < deadline and capturedWires.len < 1:
await sleepAsync(5.milliseconds)
check capturedWires.len == 1
let job = persistency.openJob("sds").expect("openJob sds")
let chanKey = toKey(SdsChannelID(channelId))
proc outgoingBufferLen(): Future[int] {.async.} =
## Decode the persisted meta snapshot; -1 while not yet readable.
let r = await job.get(CatMeta, chanKey)
if r.isErr() or r.get().isNone():
return -1
let meta = ChannelMeta.decode(r.get().get()).valueOr:
return -1
return meta.outgoingBuffer.len
## After the send the message must sit unacknowledged in the buffer.
deadline = Moment.now() + 2.seconds
var bufLen = -1
while Moment.now() < deadline and bufLen != 1:
bufLen = await outgoingBufferLen()
if bufLen != 1:
await sleepAsync(5.milliseconds)
check bufLen == 1
## The remote received our wire; its next message references it.
let remotePeer =
ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init())
discard
(await remotePeer.unwrapReceivedMessage(capturedWires[0])).expect("remote unwrap")
let ackCarrier = (
await remotePeer.wrapOutgoingMessage(
"any later message".toBytes(), "ack-carrier-1", SdsChannelID(channelId)
)
).expect("wrap ack carrier")
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "ack-hash-1", message: sdsWakuMessage(contentTopic, ackCarrier)
),
)
## Receiving it must drain the outgoing buffer (op-end meta flush).
deadline = Moment.now() + 2.seconds
bufLen = -1
while Moment.now() < deadline and bufLen != 0:
bufLen = await outgoingBufferLen()
if bufLen != 0:
await sleepAsync(5.milliseconds)
check bufLen == 0
(await waku.stop()).expect("stop")
asyncTest "three-deep dependency chain is released in causal order":
## m1 <- m2 <- m3 arriving as m3, m2, m1: all held until m1 lands,
## then released as m1, m2, m3.
const
channelId = ChannelId("sds-chain-channel")
contentTopic = ContentTopic("/reliable-channel/test/chain")
let payloads =
@["chain first".toBytes(), "chain second".toBytes(), "chain third".toBytes()]
var waku: Waku
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption()
discard manager
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
var deliveries: seq[seq[byte]]
discard ChannelMessageReceivedEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
if evt.channelId == channelId:
deliveries.add(evt.payload)
,
)
.expect("listen ChannelMessageReceivedEvent")
let remotePeer =
ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init())
var wires: seq[seq[byte]]
for i in 0 .. 2:
wires.add(
(
await remotePeer.wrapOutgoingMessage(
payloads[i], "chain-m" & $(i + 1), SdsChannelID(channelId)
)
).expect("wrap chain-m" & $(i + 1))
)
## Deepest first: m3, then m2 — both must be parked.
for i in [2, 1]:
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "chain-hash-" & $(i + 1),
message: sdsWakuMessage(contentTopic, wires[i]),
),
)
await sleepAsync(150.milliseconds)
check deliveries.len == 0
## The root arrives: everything drains in causal order.
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "chain-hash-1", message: sdsWakuMessage(contentTopic, wires[0])
),
)
let deadline = Moment.now() + 2.seconds
while Moment.now() < deadline and deliveries.len < 3:
await sleepAsync(5.milliseconds)
check deliveries.len == 3
if deliveries.len == 3:
check deliveries[0] == payloads[0]
check deliveries[1] == payloads[1]
check deliveries[2] == payloads[2]
(await waku.stop()).expect("stop")
asyncTest "sync envelope without app payload is consumed silently":
## Sync traffic has no app payload: no event, and normal traffic
## keeps flowing afterwards.
const
channelId = ChannelId("sds-sync-channel")
contentTopic = ContentTopic("/reliable-channel/test/sync")
let appPayload = "real message".toBytes()
var waku: Waku
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption()
discard manager
.createReliableChannel(channelId, contentTopic, SdsParticipantID("local"))
.expect("createReliableChannel")
var deliveryCount = 0
discard ChannelMessageReceivedEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
if evt.channelId == channelId:
deliveryCount.inc()
,
)
.expect("listen ChannelMessageReceivedEvent")
## Hand-built sync envelope: valid SDS message, empty content.
let syncMsg = SdsMessage.init(
messageId = SdsMessageID("sync-1"),
lamportTimestamp = 42,
causalHistory = @[],
channelId = SdsChannelID(channelId),
content = @[],
bloomFilter = @[],
)
let syncWire = serializeMessage(syncMsg).expect("serialize sync")
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "sync-hash-1", message: sdsWakuMessage(contentTopic, syncWire)
),
)
await sleepAsync(150.milliseconds)
check deliveryCount == 0
let remotePeer =
ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init())
let wire = (
await remotePeer.wrapOutgoingMessage(
appPayload, "sync-m1", SdsChannelID(channelId)
)
).expect("wrap")
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "sync-hash-2", message: sdsWakuMessage(contentTopic, wire)
),
)
let deadline = Moment.now() + 2.seconds
while Moment.now() < deadline and deliveryCount < 1:
await sleepAsync(5.milliseconds)
check deliveryCount == 1
(await waku.stop()).expect("stop")
asyncTest "identical payloads get distinct message ids and both deliver":
## Identical content sent twice must get distinct message ids and
## reach the app twice — not collapse via the SDS duplicate check.
const
channelId = ChannelId("sds-unique-id-channel")
contentTopic = ContentTopic("/reliable-channel/test/unique-id")
let appPayload = "ok".toBytes()
var waku: Waku
var manager: ReliableChannelManager
var brokerCtx: BrokerContext
lockNewGlobalBrokerContext:
brokerCtx = globalBrokerContext()
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
setNoopEncryption()
var capturedWires: seq[seq[byte]]
let fakeSend: SendHandler = proc(
env: MessageEnvelope
): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} =
capturedWires.add(env.payload)
return ok(RequestId("unique-req-" & $capturedWires.len))
discard manager
.createReliableChannel(
channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend
)
.expect("createReliableChannel")
var deliveries: seq[seq[byte]]
discard ChannelMessageReceivedEvent
.listen(
brokerCtx,
proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} =
if evt.channelId == channelId:
deliveries.add(evt.payload)
,
)
.expect("listen ChannelMessageReceivedEvent")
## Send side: the same payload twice must produce two distinct ids.
discard (await manager.send(channelId, appPayload)).expect("send 1")
discard (await manager.send(channelId, appPayload)).expect("send 2")
var deadline = Moment.now() + 2.seconds
while Moment.now() < deadline and capturedWires.len < 2:
await sleepAsync(5.milliseconds)
check capturedWires.len == 2
let id1 = deserializeMessage(capturedWires[0]).expect("wire 1").messageId
let id2 = deserializeMessage(capturedWires[1]).expect("wire 2").messageId
check id1 != id2
## Receive side: identical content under distinct ids delivers twice.
let remotePeer =
ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init())
for i in 1 .. 2:
let wire = (
await remotePeer.wrapOutgoingMessage(
appPayload, "unique-m" & $i, SdsChannelID(channelId)
)
).expect("wrap " & $i)
waku_message_events.MessageReceivedEvent.emit(
brokerCtx,
waku_message_events.MessageReceivedEvent(
messageHash: "unique-hash-" & $i, message: sdsWakuMessage(contentTopic, wire)
),
)
deadline = Moment.now() + 2.seconds
while Moment.now() < deadline and deliveries.len < 2:
await sleepAsync(5.milliseconds)
check deliveries.len == 2
(await waku.stop()).expect("stop")
asyncTest "manager rejects operations on unknown channels":
var waku: Waku
var manager: ReliableChannelManager
lockNewGlobalBrokerContext:
waku = (await createNode(createApiNodeConf())).expect("createNode")
waku.mountMessagingClient().expect("mountMessagingClient")
waku.mountReliableChannelManager().expect("mountReliableChannelManager")
manager = waku.reliableChannelManager
check (await manager.send(ChannelId("no-such-channel"), "x".toBytes())).isErr()
check (await manager.closeChannel(ChannelId("no-such-channel"))).isErr()
(await waku.stop()).expect("stop")