diff --git a/config.nims b/config.nims index ebe501db8..efff8a552 100644 --- a/config.nims +++ b/config.nims @@ -116,6 +116,7 @@ if defined(android): switch("passC", "--sysroot=" & sysRoot) switch("passL", "--sysroot=" & sysRoot) switch("cincludes", sysRoot & "/usr/include/") + # begin Nimble config (version 2) --noNimblePath when withDir(thisDir(), system.fileExists("nimble.paths")): diff --git a/logos_delivery.nimble b/logos_delivery.nimble index 8f9bbd8d9..efe118595 100644 --- a/logos_delivery.nimble +++ b/logos_delivery.nimble @@ -63,7 +63,7 @@ requires "nim >= 2.2.4", requires "https://github.com/logos-messaging/nim-ffi#v0.1.3" -requires "https://github.com/logos-messaging/nim-sds.git#abdd40cc645f1b024c3ee99cced7e287c4e4c441" +requires "https://github.com/logos-messaging/nim-sds.git#b12f5ee07c5b764303b51fb948b32a4ade1de3b5" requires "https://github.com/NagyZoltanPeter/nim-brokers.git#v3.1.1" diff --git a/logos_delivery/channels/reliable_channel.nim b/logos_delivery/channels/reliable_channel.nim index 5edacc619..de4365a3d 100644 --- a/logos_delivery/channels/reliable_channel.nim +++ b/logos_delivery/channels/reliable_channel.nim @@ -125,6 +125,10 @@ func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} = func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = self.senderId +proc stop*(self: ReliableChannel) {.async: (raises: []).} = + ## Stops the SDS background loops. Persisted SDS state survives. + await self.sdsHandler.stop() + proc tryFinalizeChannelReq(self: ReliableChannel, channelReqId: RequestId) = ## Tries to finalize the channel-level request identified by `channelReqId` if ## certain conditions are met, i.e., no segments are still awaiting dispatch or in flight, @@ -290,7 +294,7 @@ proc onReadyToSend( proc send*( self: ReliableChannel, payload: seq[byte], ephemeral: bool = false -): Result[RequestId, string] = +): Future[Result[RequestId, string]] {.async: (raises: []).} = ## Single application-level send. The first three stages of the ## outgoing pipeline are chained explicitly so the flow is visible ## at a glance: @@ -316,9 +320,7 @@ proc send*( for segmentBytes in self.segmentation.performSegmentation(payload): ## Segments arrive already encoded; the segmentation module owns ## the wire format so SDS only ever sees opaque bytes. - let sdsBytes = self.sdsHandler.wrapOutgoing( - self.channelId, self.senderId, segmentBytes - ).valueOr: + let sdsBytes = (await self.sdsHandler.wrapOutgoing(segmentBytes)).valueOr: return err("SDS wrap failed: " & error) enqueued.add(sdsBytes) segmentCount.inc() @@ -331,6 +333,48 @@ proc send*( return ok(channelReqId) +proc reportReceived(self: ReliableChannel, content: seq[byte]) = + ## Tail of the ingress pipeline (reassemble -> emit). + let reassembled = self.segmentation.handleIncomingSegment(content) + if reassembled.isSome(): + ## Emit on the captured `brokerCtx` (the manager's), so the + ## application listener that the manager has set up on that same + ## context picks the event up. + ChannelMessageReceivedEvent.emit( + self.brokerCtx, + ChannelMessageReceivedEvent( + channelId: self.channelId, + senderId: self.senderId, + payload: reassembled.get().payload, + ), + ) + +proc dispatchRepair(self: ReliableChannel, wire: seq[byte]) {.async: (raises: []).} = + ## Repair rebroadcasts skip the rate-limit queue — its emissions are + ## claimed FIFO by pending sends. Pacing is done by SDS itself. + let encRes = await Encrypt.request(wire) + let encrypted = encRes.valueOr: + debug "SDS repair rebroadcast dropped: encryption failed", + channelId = self.channelId, error = error + return + + ## Ephemeral: the original message is already store-persisted. + let envelope = MessageEnvelope( + contentTopic: self.contentTopic, + payload: seq[byte](encrypted), + ephemeral: true, + meta: LipWireReliableChannelVersion.toBytes(), + ) + + let sendRes = + try: + await self.sendHandler(envelope) + except CatchableError as e: + Result[RequestId, string].err("messaging send raised: " & e.msg) + if sendRes.isErr(): + debug "SDS repair rebroadcast dropped: dispatch failed", + channelId = self.channelId, error = sendRes.error + proc onMessageReceived( self: ReliableChannel, messageHash: string, payload: seq[byte] ) {.async: (raises: []).} = @@ -357,23 +401,13 @@ proc onMessageReceived( return let plaintextBytes = seq[byte](plaintext) - let unwrapped = self.sdsHandler.handleIncoming(plaintextBytes) - if unwrapped.isErr(): + ## SDS returns every payload deliverable now, in causal order — the + ## message itself plus any parked segments it released. Empty = consumed + ## by SDS; `err` = not a decodable SDS envelope. Both drop here. + let deliverable = (await self.sdsHandler.handleIncoming(plaintextBytes)).valueOr: return - - let reassembled = self.segmentation.handleIncomingSegment(unwrapped.get().content) - if reassembled.isSome(): - ## Emit on the captured `brokerCtx` (the manager's), so the - ## application listener that the manager has set up on that same - ## context picks the event up. - ChannelMessageReceivedEvent.emit( - self.brokerCtx, - ChannelMessageReceivedEvent( - channelId: self.channelId, - senderId: self.senderId, - payload: reassembled.get().payload, - ), - ) + for content in deliverable: + self.reportReceived(content) proc new*( T: type ReliableChannel, @@ -403,12 +437,17 @@ proc new*( senderId: senderId, rng: libp2p_crypto.newRng(), segmentation: SegmentationHandler.new(segConfig), - sdsHandler: SdsHandler.new(sdsConfig, senderId), + sdsHandler: SdsHandler.new(sdsConfig, channelId, senderId), rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx), channelReqs: initOrderedTable[RequestId, ChannelReqState](), brokerCtx: brokerCtx, ) + ## SDS-R repair rebroadcasts go straight to the dispatch tail. + chn.sdsHandler.onRebroadcast = proc(wire: seq[byte]) {.gcsafe, raises: [].} = + asyncSpawn chn.dispatchRepair(wire) + chn.sdsHandler.start() + ## Each channel owns its own egress + ingress + send-completion ## listeners on `chn.brokerCtx`, filtered to traffic addressed to ## this channel. Keeping the listeners (and the handler procs they diff --git a/logos_delivery/channels/reliable_channel_manager.nim b/logos_delivery/channels/reliable_channel_manager.nim index a21dfc54e..0717eb3f8 100644 --- a/logos_delivery/channels/reliable_channel_manager.nim +++ b/logos_delivery/channels/reliable_channel_manager.nim @@ -5,9 +5,10 @@ ## ## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html -import std/tables +import std/[options, tables] import results import chronos +import chronicles import stew/byteutils import brokers/broker_context @@ -15,12 +16,17 @@ import brokers/broker_context import logos_delivery/waku/events/message_events as waku_message_events import logos_delivery/messaging/messaging_client import logos_delivery/waku/waku_core/topics +import logos_delivery/waku/persistency/sds_persistency import ./reliable_channel import ./encryption/noop_encryption export reliable_channel +const SdsJobId = "sds" + ## One persistency job shared by every channel's SDS state; rows are + ## keyed by channelId. + type ReliableChannelManager* = ref object channels: Table[ChannelId, ReliableChannel] messagingClient: MessagingClient ## Borrowed from the owning `Waku`. @@ -57,8 +63,22 @@ proc start*(self: ReliableChannelManager): Result[void, string] = ok() proc stop*(self: ReliableChannelManager) {.async.} = - ## Placeholder mirror of `start`. - discard + ## Stops every channel's SDS background loops. Persisted state survives. + for chn in self.channels.values: + await chn.stop() + self.channels.clear() + +proc sdsPersistence(): Option[Persistence] = + ## SDS backend from the Persistency singleton; memory-only fallback when + ## it is unavailable (e.g. unit tests). + let p = Persistency.instance().valueOr: + info "SDS persistence disabled, running memory-only", reason = $error + return none(Persistence) + let job = p.openJob(SdsJobId).valueOr: + warn "SDS persistence disabled, could not open persistency job", + jobId = SdsJobId, reason = $error + return none(Persistence) + return some(newSdsPersistence(job)) proc createReliableChannel*( self: ReliableChannelManager, @@ -90,7 +110,7 @@ proc createReliableChannel*( acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, maxRetransmissions: DefaultMaxRetransmissions, causalHistorySize: DefaultCausalHistorySize, - persistence: nil, + persistence: sdsPersistence(), ) let rateConfig = RateLimitConfig( epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch @@ -114,11 +134,14 @@ proc createReliableChannel*( proc closeChannel*( self: ReliableChannelManager, channelId: ChannelId -): Result[void, string] = - ## Flush state, persist outstanding SDS buffers, release resources. - if not self.channels.hasKey(channelId): +): Future[Result[void, string]] {.async: (raises: []).} = + ## Stops the channel's SDS loops and releases the channel. Persisted SDS + ## state survives, so re-creating the channel restores it. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): return err("unknown channel: " & channelId) self.channels.del(channelId) + await chn.stop() return ok() proc send*( @@ -126,14 +149,14 @@ proc send*( channelId: ChannelId, appPayload: seq[byte], ephemeral: bool = false, -): Result[RequestId, string] = +): Future[Result[RequestId, string]] {.async: (raises: []).} = ## Spec-level entry point. Looks the channel up by id and delegates ## to `ReliableChannel.send`, which exposes the visible pipeline ## segmentation -> sds -> rate_limit_manager -> encryption. let chn = self.channels.getOrDefault(channelId) if chn.isNil(): return err("unknown channel: " & channelId) - return chn.send(appPayload, ephemeral) + return await chn.send(appPayload, ephemeral) ## Inbound messages are not handed to the manager by direct call. Each ## `ReliableChannel` installs its own `MessageReceivedEvent` listener diff --git a/logos_delivery/channels/scalable_data_sync/scalable_data_sync.nim b/logos_delivery/channels/scalable_data_sync/scalable_data_sync.nim index 30ad0e02b..975e02461 100644 --- a/logos_delivery/channels/scalable_data_sync/scalable_data_sync.nim +++ b/logos_delivery/channels/scalable_data_sync/scalable_data_sync.nim @@ -1,62 +1,216 @@ ## Scalable Data Sync (SDS) component for the Reliable Channel API. ## -## Provides end-to-end delivery guarantees via causal history tracking, -## acknowledgements, and retransmission of unacknowledged segments. -## -## Skeleton: `wrapOutgoing` and `handleIncoming` are pass-throughs so -## the send/receive circuit can exercise the surrounding pipeline. -## Real SDS wrapping will plug in via `nim-sds` later. +## `SdsHandler` adapts one nim-sds `ReliabilityManager` to a single channel: +## `wrapOutgoing` adds reliability metadata to outgoing segments, +## `handleIncoming` unwraps incoming ones and enforces causal-order delivery. ## ## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html -import results +{.push raises: [].} + +import std/[options, tables] +import results, chronos, chronicles +import nimcrypto/keccak +import stew/byteutils +from std/times import initDuration, getTime, toUnix, nanosecond + +import sds import message as sds_message +import types/persistence as sds_persistence_types -import ./sds_persistence +export sds_message, sds_persistence_types -export sds_message, sds_persistence +logScope: + topics = "sds-handler" const DefaultAcknowledgementTimeoutMs* = 5_000 DefaultMaxRetransmissions* = 5 DefaultCausalHistorySize* = 2 + MaxPendingContent = 1024 + ## Bound on segments parked while their causal dependencies are missing. type SdsConfig* = object acknowledgementTimeoutMs*: int maxRetransmissions*: int causalHistorySize*: int - persistence*: SdsPersistence + persistence*: Option[Persistence] + ## Durability backend. `none` runs memory-only: reliability still + ## works, state does not survive a restart. + + RebroadcastHandler* = proc(wire: seq[byte]) {.gcsafe, raises: [].} + ## Invoked with a full SDS envelope to rebroadcast (SDS-R repair). SdsHandler* = ref object - config*: SdsConfig - participantId*: SdsParticipantID + rm: ReliabilityManager + channelId: SdsChannelID + pendingContent: OrderedTable[SdsMessageID, seq[byte]] + ## Segments parked until their causal dependencies arrive. + released: seq[seq[byte]] + ## Parked segments released by the unwrap currently in flight; + ## filled via `onMessageReady`, drained by `handleIncoming`. + ingressLock: AsyncLock + ## Serializes `handleIncoming` so `released` belongs to exactly one + ## in-flight unwrap and delivery order stays causal. + participantId: SdsParticipantID + onRebroadcast*: RebroadcastHandler + ## Set by the owning `ReliableChannel` after construction — the closure + ## captures the channel to run its dispatch tail, so it cannot be + ## passed to `new`. The other callbacks need no channel and are wired + ## internally in `installCallbacks`. + +proc computeMessageId(self: SdsHandler, payload: seq[byte]): SdsMessageID = + ## keccak-256(senderId + wrap-time nanoseconds + content): unique per + ## segment, so identical content is not collapsed by the SDS dedup. + let now = getTime() + var ctx: keccak256 + ctx.init() + ctx.update(string(self.participantId)) + ctx.update($(now.toUnix() * 1_000_000_000 + now.nanosecond())) + ctx.update(payload) + SdsMessageID(byteutils.toHex(ctx.finish().data)) + +proc installCallbacks(self: SdsHandler) = + ## Direct field assignment is race-free here: no periodic task or protocol + ## op has started yet. + self.rm.onMessageReady = proc( + messageId: SdsMessageID, channelId: SdsChannelID + ) {.gcsafe.} = + ## Fires during unwrap, under the manager lock — must stay synchronous. + ## Collect only; `handleIncoming` delivers after the direct content. + ## The manager owns a single channel, so `channelId` is always ours; the + ## check documents that invariant and guards against future misuse. + if channelId == self.channelId and messageId in self.pendingContent: + debug "SDS releasing buffered message, dependencies met", channelId, messageId + self.released.add(self.pendingContent.getOrDefault(messageId)) + self.pendingContent.del(messageId) + + self.rm.onMessageSent = proc( + messageId: SdsMessageID, channelId: SdsChannelID + ) {.gcsafe.} = + debug "SDS message acknowledged", channelId, messageId + + self.rm.onMissingDependencies = proc( + messageId: SdsMessageID, missingDeps: seq[HistoryEntry], channelId: SdsChannelID + ) {.gcsafe.} = + ## Recovery via SDS sync / SDS-R for now; targeted store fetch by + ## retrieval hint is a planned follow-up. + debug "SDS message has missing dependencies", + channelId, messageId, missing = missingDeps.len + + self.rm.onRepairReady = proc(message: seq[byte], channelId: SdsChannelID) {.gcsafe.} = + if not self.onRebroadcast.isNil(): + self.onRebroadcast(message) proc new*( T: type SdsHandler, config: SdsConfig, - participantId: SdsParticipantID = SdsParticipantID(""), + channelId: SdsChannelID, + participantId: SdsParticipantID, ): T = - return T(config: config, participantId: participantId) + ## One `ReliabilityManager` per channel. `participantId` feeds SDS-R + ## response groups; an empty id disables repair participation. + let reliabilityConfig = ReliabilityConfig.init( + maxCausalHistory = config.causalHistorySize, + resendInterval = initDuration(milliseconds = config.acknowledgementTimeoutMs), + maxResendAttempts = config.maxRetransmissions, + ) + let rm = ReliabilityManager.new( + participantId, reliabilityConfig, config.persistence.get(noOpPersistence()) + ) + let handler = T( + rm: rm, + channelId: channelId, + pendingContent: initOrderedTable[SdsMessageID, seq[byte]](), + released: @[], + ingressLock: newAsyncLock(), + participantId: participantId, + ) + handler.installCallbacks() + return handler + +proc start*(self: SdsHandler) = + ## Starts the SDS background loops. Persisted channel state is restored + ## lazily on first use: `wrapOutgoing` and `handleIncoming` both ensure + ## the channel, and `handleIncoming` loads before its duplicate check so a + ## replay right after a restart is still caught. + self.rm.startPeriodicTasks() + +proc stop*(self: SdsHandler) {.async: (raises: []).} = + ## Cancels the background loops. Persisted state is left intact. + await self.rm.cleanup() proc wrapOutgoing*( - self: SdsHandler, - channelId: SdsChannelID, - senderId: SdsParticipantID, - payload: seq[byte], -): Result[seq[byte], string] = - ## Stage 2 of the outgoing pipeline (segmentation -> sds -> rate_limit_manager -> encryption). - ## Skeleton: pass the encoded segment through unchanged. Real causal - ## history / lamport / bloom-filter population will replace this. - return ok(payload) + self: SdsHandler, payload: seq[byte] +): Future[Result[seq[byte], string]] {.async: (raises: []).} = + ## Wraps a segment with reliability metadata and registers it in the SDS + ## outgoing buffer awaiting end-to-end acknowledgement. + let wrapped = ( + await self.rm.wrapOutgoingMessage( + payload, self.computeMessageId(payload), self.channelId + ) + ).valueOr: + return err("SDS wrap failed: " & $error) + return ok(wrapped) proc handleIncoming*( - self: SdsHandler, msg: seq[byte] -): Result[tuple[content: seq[byte], channelId: SdsChannelID], string] = - ## Skeleton: pass the bytes through; channel id is left empty until - ## the real wire format provides it. - return ok((content: msg, channelId: SdsChannelID(""))) + self: SdsHandler, wire: seq[byte] +): Future[Result[seq[seq[byte]], string]] {.async: (raises: []).} = + ## Returns the payloads deliverable now, in causal order. Empty when SDS + ## consumed the message; `err` when the bytes are not an SDS envelope. + let msg = deserializeMessage(wire).valueOr: + return err("SDS deserialization failed") -proc tickRetransmissions*(self: SdsHandler) = - ## Drives retransmissions of unacknowledged messages. - discard + ## Pre-filter: `unwrapReceivedMessage` auto-creates the channel it sees on + ## the wire, so foreign traffic must not reach it. + if msg.channelId != self.channelId: + debug "dropping SDS message for foreign channel", + channelId = self.channelId, wireChannelId = msg.channelId + return ok(newSeq[seq[byte]]()) + + try: + await self.ingressLock.acquire() + try: + ## Load persisted state before the duplicate check, so a replay right + ## after a restart is not re-delivered. Idempotent, cheap once loaded. + (await self.rm.ensureChannel(self.channelId)).isOkOr: + return err("SDS ensureChannel failed: " & $error) + + ## The unwrap result does not distinguish first delivery from + ## duplicate, so capture delivered-before up front. + let ctx = self.rm.channels.getOrDefault(self.channelId) + let isDuplicate = not ctx.isNil() and msg.messageId in ctx.messageHistory + + self.released.setLen(0) + let unwrapped = (await self.rm.unwrapReceivedMessage(wire)).valueOr: + return err("SDS unwrap failed: " & $error) + + if isDuplicate: + return ok(newSeq[seq[byte]]()) + + if unwrapped.missingDeps.len > 0: + if self.pendingContent.len >= MaxPendingContent: + var oldest: SdsMessageID + for k in self.pendingContent.keys: + oldest = k + break + self.pendingContent.del(oldest) + warn "SDS pending-content stash full, dropping oldest entry", + channelId = self.channelId, dropped = oldest + self.pendingContent[msg.messageId] = unwrapped.message + return ok(newSeq[seq[byte]]()) + + var deliverable = newSeq[seq[byte]]() + if unwrapped.message.len > 0: + ## Empty content is sync traffic: causal metadata only. + deliverable.add(unwrapped.message) + deliverable.add(self.released) + self.released.setLen(0) + return ok(deliverable) + finally: + self.ingressLock.release() + except CatchableError: + return err("SDS handleIncoming failed: " & getCurrentExceptionMsg()) + +{.pop.} diff --git a/logos_delivery/channels/scalable_data_sync/sds_persistence.nim b/logos_delivery/channels/scalable_data_sync/sds_persistence.nim deleted file mode 100644 index 8089595ea..000000000 --- a/logos_delivery/channels/scalable_data_sync/sds_persistence.nim +++ /dev/null @@ -1,25 +0,0 @@ -## Persistence backend for SDS outgoing buffer and causal history. -## -## TODO (raised in PR review): this surface is duplicating concerns that -## should come from the SDS module itself. Once the SDS module exposes a -## complete persistence contract, drop this file and import that surface -## instead of re-declaring it here. - -import message - -type - SdsPersistenceKind* {.pure.} = enum - InMemory - Sqlite - - SdsPersistence* = ref object of RootObj - kind*: SdsPersistenceKind - -method storeOutgoing*(self: SdsPersistence, msg: SdsMessage) {.base.} = - discard - -method markAcknowledged*(self: SdsPersistence, messageId: SdsMessageID) {.base.} = - discard - -method unackedOlderThan*(self: SdsPersistence, ageMs: int): seq[SdsMessage] {.base.} = - discard diff --git a/nimble.lock b/nimble.lock index 67e4296ff..18ebde258 100644 --- a/nimble.lock +++ b/nimble.lock @@ -623,8 +623,8 @@ } }, "sds": { - "version": "#abdd40cc645f1b024c3ee99cced7e287c4e4c441", - "vcsRevision": "abdd40cc645f1b024c3ee99cced7e287c4e4c441", + "version": "#b12f5ee07c5b764303b51fb948b32a4ade1de3b5", + "vcsRevision": "b12f5ee07c5b764303b51fb948b32a4ade1de3b5", "url": "https://github.com/logos-messaging/nim-sds.git", "downloadMethod": "git", "dependencies": [ @@ -636,10 +636,10 @@ "stint", "metrics", "results", - "taskpools" + "ffi" ], "checksums": { - "sha1": "61c4ae13c6896bfa70e662520e8660a78c7f438c" + "sha1": "175f65038b9877cdf974b07c5f83081f810d5fbe" } }, "ffi": { @@ -720,4 +720,4 @@ } }, "tasks": {} -} +} \ No newline at end of file diff --git a/nix/deps.nix b/nix/deps.nix index 1f2735457..00dea27c2 100644 --- a/nix/deps.nix +++ b/nix/deps.nix @@ -277,9 +277,10 @@ }; sds = pkgs.fetchgit { + # Keep in sync with the nim-sds pin in nimble.lock. url = "https://github.com/logos-messaging/nim-sds.git"; - rev = "abdd40cc645f1b024c3ee99cced7e287c4e4c441"; - sha256 = "01k49sljxnzjy82jljcffwqkaqvhpj1aiz605gv429sbzgyfr8mm"; + rev = "b12f5ee07c5b764303b51fb948b32a4ade1de3b5"; + sha256 = "1z8f0v1ww7y6zssdacjxfs6s4862dwckw25df3yn1v0qnz40rpc8"; fetchSubmodules = true; }; diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim index 5a80d104c..ca6177e9d 100644 --- a/tests/channels/test_reliable_channel_send_receive.nim +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -1,6 +1,7 @@ {.used.} -import std/[net, options] +import std/[net, options, os] +from std/times import epochTime import chronos, testutils/unittests, stew/byteutils import brokers/broker_context @@ -14,6 +15,13 @@ import tools/confutils/cli_args import logos_delivery/channels/reliable_channel_manager import logos_delivery/channels/encryption/noop_encryption +import logos_delivery/waku/persistency/keys +import logos_delivery/waku/persistency/sds_persistency + +## Full nim-sds API: ingress tests act as the remote peer producing real +## SDS envelopes; protocol-semantics tests decode wires and meta snapshots. +import sds +import snapshot_codec const TestTimeout = chronos.seconds(15) @@ -75,13 +83,19 @@ suite "Reliable Channel - ingress": ) .expect("listen ChannelMessageReceivedEvent") - ## Build a `WakuMessage` that looks like one that came in off the - ## wire from a peer: the spec marker on `meta` plus the right content - ## topic. The manager's ingress listener should pick it up, - ## decrypt (noop), unwrap SDS (pass-through), reassemble (one - ## segment), and finally emit `ChannelMessageReceivedEvent`. + ## Build a `WakuMessage` as it would arrive off the wire: spec marker + ## on `meta`, right content topic, payload wrapped in a real SDS + ## envelope by a stand-in remote peer. + let remotePeer = + ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init()) + let sdsWire = ( + await remotePeer.wrapOutgoingMessage( + appPayload, "ingress-test-msg-1", SdsChannelID(channelId) + ) + ).expect("wrapOutgoingMessage") + let inboundMsg = WakuMessage( - payload: appPayload, + payload: sdsWire, contentTopic: contentTopic, version: 0, meta: LipWireReliableChannelVersion.toBytes(), @@ -202,7 +216,7 @@ suite "Reliable Channel - send state machine": ) .expect("listen ChannelMessageSentEvent") - let channelReqId = manager.send(channelId, "hello".toBytes()).expect("send") + let channelReqId = (await manager.send(channelId, "hello".toBytes())).expect("send") let dispatchDeadline = Moment.now() + 1.seconds while Moment.now() < dispatchDeadline and sendCalls == 0: @@ -280,8 +294,10 @@ suite "Reliable Channel - send state machine": ) .expect("listen ChannelMessageErrorEvent") - let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1") - let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2") + let channelReqId1 = + (await manager.send(channelId, "first".toBytes())).expect("send 1") + let channelReqId2 = + (await manager.send(channelId, "second".toBytes())).expect("send 2") let dispatchDeadline = Moment.now() + 1.seconds while Moment.now() < dispatchDeadline and msgReqIds.len < 2: @@ -386,7 +402,8 @@ suite "Reliable Channel - send state machine": ) .expect("listen ChannelMessageSentEvent") - let channelReqId1 = manager.send(channelId, "first".toBytes()).expect("send 1") + let channelReqId1 = + (await manager.send(channelId, "first".toBytes())).expect("send 1") ## Drain the first segment fully before queueing the second, so ## the rate-limit FIFO between sibling sends isn't itself under @@ -396,7 +413,8 @@ suite "Reliable Channel - send state machine": await sleepAsync(5.milliseconds) check msgReqIds.len == 1 - let channelReqId2 = manager.send(channelId, "second".toBytes()).expect("send 2") + let channelReqId2 = + (await manager.send(channelId, "second".toBytes())).expect("send 2") ## Wait until `fakeSend(m2)` has fully returned and yield once ## more so `onReadyToSend`'s post-await continuation gets a chance @@ -424,3 +442,762 @@ suite "Reliable Channel - send state machine": check channelReqId2 in finalisedReqIds (await waku.stop()).expect("stop") + +suite "Reliable Channel - SDS persistence": + asyncTest "send persists SDS channel state through the persistency job": + ## A send must flush `sds.meta` + `sds.log` through the shared "sds" + ## job. Writes resolve on enqueue, so reads poll. + const + channelId = ChannelId("sds-persist-channel") + contentTopic = ContentTopic("/reliable-channel/test/persist") + + Persistency.reset() + let root = getTempDir() / ("reliable_channel_sds_" & $epochTime().int) + removeDir(root) + let persistency = Persistency.instance(root).expect("persistency init") + defer: + Persistency.reset() + removeDir(root) + + var waku: Waku + var manager: ReliableChannelManager + lockNewGlobalBrokerContext: + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + return ok(RequestId("persist-msg-req-1")) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + discard (await manager.send(channelId, "persist me".toBytes())).expect("send") + + ## Same handle the channel layer writes through (`openJob` is idempotent). + let job = persistency.openJob("sds").expect("openJob sds") + let chanKey = toKey(SdsChannelID(channelId)) + + proc pollMetaExists(): Future[bool] {.async.} = + let deadline = Moment.now() + 2.seconds + while Moment.now() < deadline: + let r = await job.exists(CatMeta, chanKey) + if r.isOk() and r.get(): + return true + await sleepAsync(5.milliseconds) + return false + + proc pollLogRow(): Future[bool] {.async.} = + ## `sds.log` keys rows by (channelId, messageId) — scan the prefix. + let deadline = Moment.now() + 2.seconds + while Moment.now() < deadline: + let r = await job.scanPrefix(CatLog, chanKey) + if r.isOk() and r.get().len > 0: + return true + await sleepAsync(5.milliseconds) + return false + + check await pollMetaExists() + check await pollLogRow() + + (await waku.stop()).expect("stop") + +## A marked WakuMessage carrying an SDS envelope, as it arrives off the wire. +proc sdsWakuMessage(contentTopic: ContentTopic, sdsWire: seq[byte]): WakuMessage = + WakuMessage( + payload: sdsWire, + contentTopic: contentTopic, + version: 0, + meta: LipWireReliableChannelVersion.toBytes(), + ) + +suite "Reliable Channel - SDS lifecycle": + asyncTest "out-of-order segments are parked and delivered in causal order": + ## m2 depends on m1: m2 alone delivers nothing; m1 then delivers both, + ## in causal order. + const + channelId = ChannelId("sds-causal-channel") + contentTopic = ContentTopic("/reliable-channel/test/causal") + let payload1 = "first message".toBytes() + let payload2 = "second message".toBytes() + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + var deliveries: seq[seq[byte]] + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + deliveries.add(evt.payload) + , + ) + .expect("listen ChannelMessageReceivedEvent") + + let remotePeer = + ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init()) + let wire1 = ( + await remotePeer.wrapOutgoingMessage( + payload1, "causal-m1", SdsChannelID(channelId) + ) + ).expect("wrap m1") + let wire2 = ( + await remotePeer.wrapOutgoingMessage( + payload2, "causal-m2", SdsChannelID(channelId) + ) + ).expect("wrap m2") + + ## m2 first: missing dependency m1 -> parked, nothing delivered. + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "hash-m2", message: sdsWakuMessage(contentTopic, wire2) + ), + ) + await sleepAsync(100.milliseconds) + check deliveries.len == 0 + + ## m1 arrives: m1 delivered, then the parked m2 released after it. + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "hash-m1", message: sdsWakuMessage(contentTopic, wire1) + ), + ) + let deadline = Moment.now() + 2.seconds + while Moment.now() < deadline and deliveries.len < 2: + await sleepAsync(5.milliseconds) + check deliveries.len == 2 + if deliveries.len == 2: + check deliveries[0] == payload1 + check deliveries[1] == payload2 + + (await waku.stop()).expect("stop") + + asyncTest "duplicate SDS envelope is delivered to the app only once": + const + channelId = ChannelId("sds-dup-channel") + contentTopic = ContentTopic("/reliable-channel/test/dup") + let appPayload = "deliver once".toBytes() + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + var deliveryCount = 0 + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + deliveryCount.inc() + , + ) + .expect("listen ChannelMessageReceivedEvent") + + let remotePeer = + ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init()) + let wire = ( + await remotePeer.wrapOutgoingMessage( + appPayload, "dup-m1", SdsChannelID(channelId) + ) + ).expect("wrap") + + ## Same envelope twice (different hashes) — the second must be suppressed. + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "dup-hash-1", message: sdsWakuMessage(contentTopic, wire) + ), + ) + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "dup-hash-2", message: sdsWakuMessage(contentTopic, wire) + ), + ) + await sleepAsync(200.milliseconds) + check deliveryCount == 1 + + (await waku.stop()).expect("stop") + + asyncTest "SDS envelope for a foreign channel is dropped": + ## Same content topic, different SDS channel id — dropped before unwrap. + const + channelId = ChannelId("sds-foreign-channel") + contentTopic = ContentTopic("/reliable-channel/test/foreign") + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + var fired = false + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + fired = true + , + ) + .expect("listen ChannelMessageReceivedEvent") + + let remotePeer = + ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init()) + let wire = ( + await remotePeer.wrapOutgoingMessage( + "not for you".toBytes(), "foreign-m1", SdsChannelID("some-other-channel") + ) + ).expect("wrap") + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "foreign-hash", message: sdsWakuMessage(contentTopic, wire) + ), + ) + await sleepAsync(200.milliseconds) + check not fired + + (await waku.stop()).expect("stop") + + asyncTest "received history survives channel close and re-create": + ## Receive m1, close, re-create, replay m1: the duplicate is only + ## suppressed if the history was actually restored from SQLite. + const + channelId = ChannelId("sds-restore-channel") + contentTopic = ContentTopic("/reliable-channel/test/restore") + let appPayload = "survive restart".toBytes() + + Persistency.reset() + let root = getTempDir() / ("reliable_channel_sds_restore_" & $epochTime().int) + removeDir(root) + let persistency = Persistency.instance(root).expect("persistency init") + defer: + Persistency.reset() + removeDir(root) + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + var deliveryCount = 0 + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + deliveryCount.inc() + , + ) + .expect("listen ChannelMessageReceivedEvent") + + let remotePeer = + ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init()) + let wire = ( + await remotePeer.wrapOutgoingMessage( + appPayload, "restore-m1", SdsChannelID(channelId) + ) + ).expect("wrap") + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "restore-hash-1", message: sdsWakuMessage(contentTopic, wire) + ), + ) + var deadline = Moment.now() + 2.seconds + while Moment.now() < deadline and deliveryCount < 1: + await sleepAsync(5.milliseconds) + check deliveryCount == 1 + + ## Writes resolve on enqueue — wait until the row is applied before closing. + let job = persistency.openJob("sds").expect("openJob sds") + let chanKey = toKey(SdsChannelID(channelId)) + deadline = Moment.now() + 2.seconds + var logVisible = false + while Moment.now() < deadline and not logVisible: + let r = await job.scanPrefix(CatLog, chanKey) + logVisible = r.isOk() and r.get().len > 0 + if not logVisible: + await sleepAsync(5.milliseconds) + check logVisible + + (await manager.closeChannel(channelId)).expect("closeChannel") + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("re-createReliableChannel") + + ## Replay the same envelope. Only a restored history suppresses it. + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "restore-hash-2", message: sdsWakuMessage(contentTopic, wire) + ), + ) + await sleepAsync(300.milliseconds) + check deliveryCount == 1 + + (await waku.stop()).expect("stop") + +suite "Reliable Channel - SDS protocol semantics": + asyncTest "a reply references the received message and advances the lamport clock": + ## After receiving m1, our outgoing wire must reference m1 in its causal + ## history (that reference IS the ack) with a higher lamport. + const + channelId = ChannelId("sds-semantics-channel") + contentTopic = ContentTopic("/reliable-channel/test/semantics") + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + var capturedWires: seq[seq[byte]] + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + ## Noop encryption is identity, so the envelope payload IS the SDS wire. + capturedWires.add(env.payload) + return ok(RequestId("semantics-req-" & $capturedWires.len)) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + let remotePeer = + ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init()) + let wire1 = ( + await remotePeer.wrapOutgoingMessage( + "from remote".toBytes(), "semantics-m1", SdsChannelID(channelId) + ) + ).expect("wrap m1") + let m1 = deserializeMessage(wire1).expect("deserialize m1") + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "semantics-hash-1", message: sdsWakuMessage(contentTopic, wire1) + ), + ) + await sleepAsync(100.milliseconds) + + discard (await manager.send(channelId, "reply".toBytes())).expect("send") + var deadline = Moment.now() + 2.seconds + while Moment.now() < deadline and capturedWires.len < 1: + await sleepAsync(5.milliseconds) + check capturedWires.len == 1 + + let reply = deserializeMessage(capturedWires[0]).expect("deserialize reply") + check SdsMessageID("semantics-m1") in reply.causalHistory.getMessageIds() + check reply.lamportTimestamp > m1.lamportTimestamp + + (await waku.stop()).expect("stop") + + asyncTest "an unacknowledged send is acked by a later remote message": + ## Our send sits in the outgoing buffer (visible in the persisted meta) + ## until any later remote message references it — then the buffer drains. + const + channelId = ChannelId("sds-ack-channel") + contentTopic = ContentTopic("/reliable-channel/test/ack") + + Persistency.reset() + let root = getTempDir() / ("reliable_channel_sds_ack_" & $epochTime().int) + removeDir(root) + let persistency = Persistency.instance(root).expect("persistency init") + defer: + Persistency.reset() + removeDir(root) + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + var capturedWires: seq[seq[byte]] + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + capturedWires.add(env.payload) + return ok(RequestId("ack-req-" & $capturedWires.len)) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + discard (await manager.send(channelId, "needs ack".toBytes())).expect("send") + var deadline = Moment.now() + 2.seconds + while Moment.now() < deadline and capturedWires.len < 1: + await sleepAsync(5.milliseconds) + check capturedWires.len == 1 + + let job = persistency.openJob("sds").expect("openJob sds") + let chanKey = toKey(SdsChannelID(channelId)) + + proc outgoingBufferLen(): Future[int] {.async.} = + ## Decode the persisted meta snapshot; -1 while not yet readable. + let r = await job.get(CatMeta, chanKey) + if r.isErr() or r.get().isNone(): + return -1 + let meta = ChannelMeta.decode(r.get().get()).valueOr: + return -1 + return meta.outgoingBuffer.len + + ## After the send the message must sit unacknowledged in the buffer. + deadline = Moment.now() + 2.seconds + var bufLen = -1 + while Moment.now() < deadline and bufLen != 1: + bufLen = await outgoingBufferLen() + if bufLen != 1: + await sleepAsync(5.milliseconds) + check bufLen == 1 + + ## The remote received our wire; its next message references it. + let remotePeer = + ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init()) + discard + (await remotePeer.unwrapReceivedMessage(capturedWires[0])).expect("remote unwrap") + let ackCarrier = ( + await remotePeer.wrapOutgoingMessage( + "any later message".toBytes(), "ack-carrier-1", SdsChannelID(channelId) + ) + ).expect("wrap ack carrier") + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "ack-hash-1", message: sdsWakuMessage(contentTopic, ackCarrier) + ), + ) + + ## Receiving it must drain the outgoing buffer (op-end meta flush). + deadline = Moment.now() + 2.seconds + bufLen = -1 + while Moment.now() < deadline and bufLen != 0: + bufLen = await outgoingBufferLen() + if bufLen != 0: + await sleepAsync(5.milliseconds) + check bufLen == 0 + + (await waku.stop()).expect("stop") + + asyncTest "three-deep dependency chain is released in causal order": + ## m1 <- m2 <- m3 arriving as m3, m2, m1: all held until m1 lands, + ## then released as m1, m2, m3. + const + channelId = ChannelId("sds-chain-channel") + contentTopic = ContentTopic("/reliable-channel/test/chain") + let payloads = + @["chain first".toBytes(), "chain second".toBytes(), "chain third".toBytes()] + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + var deliveries: seq[seq[byte]] + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + deliveries.add(evt.payload) + , + ) + .expect("listen ChannelMessageReceivedEvent") + + let remotePeer = + ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init()) + var wires: seq[seq[byte]] + for i in 0 .. 2: + wires.add( + ( + await remotePeer.wrapOutgoingMessage( + payloads[i], "chain-m" & $(i + 1), SdsChannelID(channelId) + ) + ).expect("wrap chain-m" & $(i + 1)) + ) + + ## Deepest first: m3, then m2 — both must be parked. + for i in [2, 1]: + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "chain-hash-" & $(i + 1), + message: sdsWakuMessage(contentTopic, wires[i]), + ), + ) + await sleepAsync(150.milliseconds) + check deliveries.len == 0 + + ## The root arrives: everything drains in causal order. + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "chain-hash-1", message: sdsWakuMessage(contentTopic, wires[0]) + ), + ) + let deadline = Moment.now() + 2.seconds + while Moment.now() < deadline and deliveries.len < 3: + await sleepAsync(5.milliseconds) + check deliveries.len == 3 + if deliveries.len == 3: + check deliveries[0] == payloads[0] + check deliveries[1] == payloads[1] + check deliveries[2] == payloads[2] + + (await waku.stop()).expect("stop") + + asyncTest "sync envelope without app payload is consumed silently": + ## Sync traffic has no app payload: no event, and normal traffic + ## keeps flowing afterwards. + const + channelId = ChannelId("sds-sync-channel") + contentTopic = ContentTopic("/reliable-channel/test/sync") + let appPayload = "real message".toBytes() + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + var deliveryCount = 0 + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + deliveryCount.inc() + , + ) + .expect("listen ChannelMessageReceivedEvent") + + ## Hand-built sync envelope: valid SDS message, empty content. + let syncMsg = SdsMessage.init( + messageId = SdsMessageID("sync-1"), + lamportTimestamp = 42, + causalHistory = @[], + channelId = SdsChannelID(channelId), + content = @[], + bloomFilter = @[], + ) + let syncWire = serializeMessage(syncMsg).expect("serialize sync") + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "sync-hash-1", message: sdsWakuMessage(contentTopic, syncWire) + ), + ) + await sleepAsync(150.milliseconds) + check deliveryCount == 0 + + let remotePeer = + ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init()) + let wire = ( + await remotePeer.wrapOutgoingMessage( + appPayload, "sync-m1", SdsChannelID(channelId) + ) + ).expect("wrap") + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "sync-hash-2", message: sdsWakuMessage(contentTopic, wire) + ), + ) + let deadline = Moment.now() + 2.seconds + while Moment.now() < deadline and deliveryCount < 1: + await sleepAsync(5.milliseconds) + check deliveryCount == 1 + + (await waku.stop()).expect("stop") + + asyncTest "identical payloads get distinct message ids and both deliver": + ## Identical content sent twice must get distinct message ids and + ## reach the app twice — not collapse via the SDS duplicate check. + const + channelId = ChannelId("sds-unique-id-channel") + contentTopic = ContentTopic("/reliable-channel/test/unique-id") + let appPayload = "ok".toBytes() + + var waku: Waku + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + setNoopEncryption() + + var capturedWires: seq[seq[byte]] + let fakeSend: SendHandler = proc( + env: MessageEnvelope + ): Future[Result[RequestId, string]] {.async: (raises: [CatchableError]), gcsafe.} = + capturedWires.add(env.payload) + return ok(RequestId("unique-req-" & $capturedWires.len)) + + discard manager + .createReliableChannel( + channelId, contentTopic, SdsParticipantID("local"), sendHandler = fakeSend + ) + .expect("createReliableChannel") + + var deliveries: seq[seq[byte]] + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + deliveries.add(evt.payload) + , + ) + .expect("listen ChannelMessageReceivedEvent") + + ## Send side: the same payload twice must produce two distinct ids. + discard (await manager.send(channelId, appPayload)).expect("send 1") + discard (await manager.send(channelId, appPayload)).expect("send 2") + var deadline = Moment.now() + 2.seconds + while Moment.now() < deadline and capturedWires.len < 2: + await sleepAsync(5.milliseconds) + check capturedWires.len == 2 + let id1 = deserializeMessage(capturedWires[0]).expect("wire 1").messageId + let id2 = deserializeMessage(capturedWires[1]).expect("wire 2").messageId + check id1 != id2 + + ## Receive side: identical content under distinct ids delivers twice. + let remotePeer = + ReliabilityManager.new(SdsParticipantID("remote"), ReliabilityConfig.init()) + for i in 1 .. 2: + let wire = ( + await remotePeer.wrapOutgoingMessage( + appPayload, "unique-m" & $i, SdsChannelID(channelId) + ) + ).expect("wrap " & $i) + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent( + messageHash: "unique-hash-" & $i, message: sdsWakuMessage(contentTopic, wire) + ), + ) + deadline = Moment.now() + 2.seconds + while Moment.now() < deadline and deliveries.len < 2: + await sleepAsync(5.milliseconds) + check deliveries.len == 2 + + (await waku.stop()).expect("stop") + + asyncTest "manager rejects operations on unknown channels": + var waku: Waku + var manager: ReliableChannelManager + lockNewGlobalBrokerContext: + waku = (await createNode(createApiNodeConf())).expect("createNode") + waku.mountMessagingClient().expect("mountMessagingClient") + waku.mountReliableChannelManager().expect("mountReliableChannelManager") + manager = waku.reliableChannelManager + + check (await manager.send(ChannelId("no-such-channel"), "x".toBytes())).isErr() + check (await manager.closeChannel(ChannelId("no-such-channel"))).isErr() + + (await waku.stop()).expect("stop")