From 74057c66224f43b4aa27b42033d4ed52eed5c7a7 Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Wed, 27 May 2026 23:05:20 +0200 Subject: [PATCH] start basic reliable channel folder (#3886) --- .github/workflows/ci.yml | 1 + channels/encryption/encryption.nim | 25 ++ channels/encryption/noop_encryption.nim | 18 ++ channels/events.nim | 23 ++ .../rate_limit_manager/rate_limit_manager.nim | 80 ++++++ channels/reliable_channel.nim | 264 ++++++++++++++++++ channels/reliable_channel_manager.nim | 138 +++++++++ .../scalable_data_sync/scalable_data_sync.nim | 62 ++++ .../scalable_data_sync/sds_persistence.nim | 25 ++ .../segmentation/segment_message_proto.nim | 34 +++ channels/segmentation/segmentation.nim | 70 +++++ .../segmentation/segmentation_persistence.nim | 20 ++ channels/types.nim | 15 + tests/all_tests_waku.nim | 3 + tests/channels/test_all.nim | 3 + .../test_reliable_channel_send_receive.nim | 149 ++++++++++ 16 files changed, 930 insertions(+) create mode 100644 channels/encryption/encryption.nim create mode 100644 channels/encryption/noop_encryption.nim create mode 100644 channels/events.nim create mode 100644 channels/rate_limit_manager/rate_limit_manager.nim create mode 100644 channels/reliable_channel.nim create mode 100644 channels/reliable_channel_manager.nim create mode 100644 channels/scalable_data_sync/scalable_data_sync.nim create mode 100644 channels/scalable_data_sync/sds_persistence.nim create mode 100644 channels/segmentation/segment_message_proto.nim create mode 100644 channels/segmentation/segmentation.nim create mode 100644 channels/segmentation/segmentation_persistence.nim create mode 100644 channels/types.nim create mode 100644 tests/channels/test_all.nim create mode 100644 tests/channels/test_reliable_channel_send_receive.nim diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 52d20157a..c54d828ae 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -43,6 +43,7 @@ jobs: - 'tools/**' - 'tests/all_tests_v2.nim' - 'tests/**' + - 'channels/**' docker: - 'docker/**' diff --git a/channels/encryption/encryption.nim b/channels/encryption/encryption.nim new file mode 100644 index 000000000..5cb53be2f --- /dev/null +++ b/channels/encryption/encryption.nim @@ -0,0 +1,25 @@ +## Optional encryption hooks for the Reliable Channel API. +## +## Modelled as `RequestBroker`s: the broker pattern lets the channel +## delegate work to a provider that may live in any module without +## introducing a direct dependency. If no provider is registered the +## broker returns an error, so installing the noop providers from +## `noop_encryption` is required when the application does not want +## actual encryption. +## +## Applied per-segment after SDS processing on outgoing, and before +## SDS processing on incoming. No specific scheme is mandated. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import brokers/request_broker + +export request_broker + +RequestBroker: + type Encrypt* = seq[byte] + proc signature*(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} + +RequestBroker: + type Decrypt* = seq[byte] + proc signature*(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} diff --git a/channels/encryption/noop_encryption.nim b/channels/encryption/noop_encryption.nim new file mode 100644 index 000000000..f09ed9cf4 --- /dev/null +++ b/channels/encryption/noop_encryption.nim @@ -0,0 +1,18 @@ +## No-op encryption providers. Install these when the application does +## not want actual encryption so the `Encrypt` / `Decrypt` brokers have +## something to dispatch to. + +import results +import chronos +import ./encryption + +proc setNoopEncryption*() = + discard Encrypt.setProvider( + proc(payload: seq[byte]): Future[Result[Encrypt, string]] {.async.} = + return ok(Encrypt(payload)) + ) + + discard Decrypt.setProvider( + proc(payload: seq[byte]): Future[Result[Decrypt, string]] {.async.} = + return ok(Decrypt(payload)) + ) diff --git a/channels/events.nim b/channels/events.nim new file mode 100644 index 000000000..5a17c99d2 --- /dev/null +++ b/channels/events.nim @@ -0,0 +1,23 @@ +## Reliable Channel event types emitted to API consumers. +## +## Lifecycle events for individual segments (sent / propagated / errored) +## are the same as the network-level ones the DeliveryService already +## emits — `requestId` is shared across layers — so we just re-export +## `waku/events/message_events` and avoid declaring duplicates. +## +## Only the channel-level `MessageReceivedEvent` carries data that has +## no analogue in the lower layer (reassembled application payload, +## senderId, channelId), so it lives here. + +import waku/events/message_events as waku_message_events +import brokers/event_broker + +import ./types as channel_types + +export waku_message_events, channel_types, event_broker + +EventBroker: + type ChannelMessageReceivedEvent* = object + channelId*: ChannelId + senderId*: SdsParticipantID + payload*: seq[byte] diff --git a/channels/rate_limit_manager/rate_limit_manager.nim b/channels/rate_limit_manager/rate_limit_manager.nim new file mode 100644 index 000000000..5ea6486a5 --- /dev/null +++ b/channels/rate_limit_manager/rate_limit_manager.nim @@ -0,0 +1,80 @@ +## Rate Limit Manager for the Reliable Channel API. +## +## Tracks messages sent per RLN epoch and delays dispatch when the +## limit is approached, ensuring RLN compliance on enforcing relays. +## +## For the skeleton this is a pass-through: messages are immediately +## released as ready-to-send. Real epoch budgeting will be added later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/times +import message +import brokers/event_broker +import brokers/broker_context + +export event_broker, broker_context +export message.SdsChannelID + +const + DefaultEpochPeriodSec* = 600 + DefaultMessagesPerEpoch* = 1 + +EventBroker: + ## Emitted by `enqueueToSend` carrying the batch of opaque message + ## blobs that may now leave the rate limiter and continue down the + ## outgoing pipeline (encryption -> dispatch). Bytes only: the rate + ## limiter is intentionally agnostic of SDS, so anything serialisable + ## can flow through it. + ## + ## `channelId` lets listeners filter to their own channel, since all + ## reliable channels share the underlying Waku node's broker context. + type ReadyToSendEvent* = object + channelId*: SdsChannelID + msgs*: seq[seq[byte]] + +type + RateLimitConfig* = object + enabled*: bool ## spec: rate limiting opt-in; SHOULD be true when RLN active + epochPeriodSec*: int + messagesPerEpoch*: int + + RateLimitManager* = ref object + config*: RateLimitConfig + queue*: seq[seq[byte]] + currentEpochStart*: Time + sentInCurrentEpoch*: int + channelId*: SdsChannelID ## tag for the emitted `ReadyToSendEvent` + brokerCtx: BrokerContext + +proc new*( + T: type RateLimitManager, + config: RateLimitConfig, + channelId: SdsChannelID, + brokerCtx: BrokerContext = globalBrokerContext(), +): T = + return T( + config: config, + queue: @[], + currentEpochStart: getTime(), + sentInCurrentEpoch: 0, + channelId: channelId, + brokerCtx: brokerCtx, + ) + +proc enqueueToSend*(self: RateLimitManager, msg: seq[byte]) = + ## Skeleton behaviour: enqueue and immediately release as a single + ## ready batch. Real per-epoch budgeting will park messages on + ## `self.queue` and emit only when the budget allows. + ReadyToSendEvent.emit( + self.brokerCtx, ReadyToSendEvent(channelId: self.channelId, msgs: @[msg]) + ) + +proc dequeueReady*(self: RateLimitManager): seq[seq[byte]] = + ## Returns the set of queued messages that may be dispatched now + ## without exceeding the configured rate limit. + discard + +proc resetEpoch*(self: RateLimitManager) = + self.currentEpochStart = getTime() + self.sentInCurrentEpoch = 0 diff --git a/channels/reliable_channel.nim b/channels/reliable_channel.nim new file mode 100644 index 000000000..2a7d01d35 --- /dev/null +++ b/channels/reliable_channel.nim @@ -0,0 +1,264 @@ +## Reliable Channel type. +## +## A `ReliableChannel` orchestrates segmentation, SDS (end-to-end +## reliability), optional encryption, and rate-limited dispatch on top +## of the Messaging API for a single channel. +## +## Outgoing pipeline: Segment -> SDS -> Rate Limit -> Encrypt -> Dispatch +## Incoming pipeline: Decrypt -> SDS -> Reassemble -> Emit event +## +## Channels are owned by a `ReliableChannelManager`. Lifecycle and send +## operations are addressed by `ChannelId`, so callers only need to keep +## an opaque handle around. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/[options, tables] +import results +import chronos +import bearssl/rand +import stew/byteutils +import libp2p/crypto/crypto as libp2p_crypto + +import waku/node/delivery_service/delivery_service +import waku/node/delivery_service/send_service +import waku/waku_core/topics + +import ./events +import ./segmentation/segmentation +import ./scalable_data_sync/scalable_data_sync +import ./rate_limit_manager/rate_limit_manager +import ./encryption/encryption + +export + delivery_service, send_service, events, segmentation, scalable_data_sync, + rate_limit_manager, encryption + +const LipWireReliableChannelVersion* = "RELIABLE-CHANNEL-API/1" + ## Wire-format spec marker for the Reliable Channel layer, as defined + ## in the reliable-channel-api LIP (`Wire Format / Spec Marker`). + ## A `WakuMessage` whose `meta` field does not equal these bytes is + ## not addressed to this layer and is silently dropped on ingress. + ## The trailing `/N` is the wire-format version and is bumped only + ## on breaking on-the-wire changes; implementations pin one version. + +type ReliableChannel* = ref object + ## Spec-defined public type. Fields are private so callers cannot + ## mutate internals and break invariants. Getters are added below + ## for the few values consumers may need. + deliveryService: DeliveryService + channelId: ChannelId + contentTopic: ContentTopic + senderId: SdsParticipantID + rng: ref HmacDrbgContext + segmentation: SegmentationHandler + sdsHandler: SdsHandler + rateLimit: RateLimitManager + + requestIds: Table[RequestId, seq[RequestId]] + pendingRequests: seq[tuple[parent: RequestId, ephemeral: bool]] + brokerCtx: BrokerContext + ## Captured here so the channel emits `ChannelMessageReceivedEvent` + ## on the same broker context the owning manager registered its + ## listeners on. Without this, an emit via `globalBrokerContext()` + ## would land on whatever context happens to be thread-local at + ## emit time, which is not necessarily the manager's. + +func getChannelId*(self: ReliableChannel): ChannelId {.inline.} = + self.channelId + +func getContentTopic*(self: ReliableChannel): ContentTopic {.inline.} = + self.contentTopic + +func getSenderId*(self: ReliableChannel): SdsParticipantID {.inline.} = + self.senderId + +proc onReadyToSend( + self: ReliableChannel, msgs: seq[seq[byte]] +) {.async: (raises: []).} = + ## Tail of the outgoing pipeline. Invoked from the `ReadyToSendEvent` + ## listener once `rate_limit_manager` releases a batch of opaque + ## blobs (already-encoded SDS messages): + ## + ## ... -> rate_limit_manager -> [encryption] -> dispatch + for m in msgs: + ## Each `m` was preceded by exactly one push onto `pendingRequests` + ## in `send`, so this pop is always safe in the current skeleton. + let pending = self.pendingRequests[0] + self.pendingRequests.delete(0) + + ## TODO: revisit which fields of the SDS message must be encrypted. + ## Encrypting the whole encoded blob forces every receiver to attempt + ## decryption before it can route, which breaks selective dispatch. + ## Leave routing metadata (channelId, causal-history references) in + ## clear and encrypt only the application payload. + let encRes = await Encrypt.request(m) + let encrypted = encRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: pending.parent, + messageHash: "", + error: "encryption failed: " & error, + ), + ) + continue + let wireBytes = seq[byte](encrypted) + + let envelope = MessageEnvelope( + contentTopic: self.contentTopic, payload: wireBytes, ephemeral: pending.ephemeral + ) + + let deliveryReqId = RequestId.new(self.rng) + let deliveryTask = DeliveryTask.new(deliveryReqId, envelope, globalBrokerContext()).valueOr: + ## TODO: emit waku `MessageErrorEvent` for the parent request id. + continue + + ## Stamp the Reliable Channel wire-format spec marker so the ingress + ## side of any peer can route this WakuMessage to its Reliable + ## Channel layer. Done on the constructed WakuMessage rather than + ## via the envelope because `MessageEnvelope` does not expose a + ## `meta` field. + deliveryTask.msg.meta = LipWireReliableChannelVersion.toBytes() + + asyncSpawn self.deliveryService.sendService.send(deliveryTask) + self.requestIds.mgetOrPut(pending.parent, @[]).add(deliveryReqId) + +proc send*( + self: ReliableChannel, payload: seq[byte], ephemeral: bool = false +): Result[RequestId, string] = + ## Single application-level send. The first three stages of the + ## outgoing pipeline are chained explicitly so the flow is visible + ## at a glance: + ## + ## segmentation -> sds -> rate_limit_manager + ## + ## `rate_limit_manager.enqueueToSend` emits a `ReadyToSendEvent` with + ## the SDS messages cleared for transmission; the channel's listener + ## then runs the final stage (encryption -> dispatch). The `ephemeral` + ## flag is carried alongside each segment in `pendingRequests` and + ## stamped onto the eventual `MessageEnvelope`. + ## + ## The returned `RequestId` is the parent of one-or-more + ## delivery-service `RequestId`s; the mapping is recorded in + ## `self.requestIds`. + if payload.len == 0: + return err("empty payload") + + let parentReqId = RequestId.new(self.rng) + self.requestIds[parentReqId] = @[] + + for segmentBytes in self.segmentation.performSegmentation(payload): + ## Segments arrive already encoded; the segmentation module owns + ## the wire format so SDS only ever sees opaque bytes. + let sdsBytes = self.sdsHandler.wrapOutgoing( + self.channelId, self.senderId, segmentBytes + ).valueOr: + return err("SDS wrap failed: " & error) + self.pendingRequests.add((parent: parentReqId, ephemeral: ephemeral)) + self.rateLimit.enqueueToSend(sdsBytes) + + return ok(parentReqId) + +proc onMessageReceived( + self: ReliableChannel, messageHash: string, payload: seq[byte] +) {.async: (raises: []).} = + ## Ingress pipeline made visible: + ## + ## payload -> decrypt -> sds -> reassemble -> emit + ## + ## Invoked from this channel's `MessageReceivedEvent` listener, which + ## already filtered on the spec marker and on `contentTopic`. The + ## channel only sees the raw payload bytes for itself. + + ## Notice that the following "request" is implemented implicitly as a broker call to + ## the `Decrypt` request broker. + let decRes = await Decrypt.request(payload) + let plaintext = decRes.valueOr: + MessageErrorEvent.emit( + self.brokerCtx, + MessageErrorEvent( + requestId: RequestId(""), + messageHash: messageHash, + error: "decryption failed: " & error, + ), + ) + return + let plaintextBytes = seq[byte](plaintext) + + let unwrapped = self.sdsHandler.handleIncoming(plaintextBytes) + if unwrapped.isErr(): + return + + let reassembled = self.segmentation.handleIncomingSegment(unwrapped.get().content) + if reassembled.isSome(): + ## Emit on the captured `brokerCtx` (the manager's), so the + ## application listener that the manager has set up on that same + ## context picks the event up. + ChannelMessageReceivedEvent.emit( + self.brokerCtx, + ChannelMessageReceivedEvent( + channelId: self.channelId, + senderId: self.senderId, + payload: reassembled.get().payload, + ), + ) + +proc new*( + T: type ReliableChannel, + deliveryService: DeliveryService, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, + segConfig: SegmentationConfig, + sdsConfig: SdsConfig, + rateConfig: RateLimitConfig, + brokerCtx: BrokerContext = globalBrokerContext(), +): T = + ## Pipeline handlers (segmentation/SDS/rate-limit) are constructed + ## inside the channel rather than handed in by the caller — they are + ## implementation details of the channel, not knobs the API consumer + ## should be wiring up. Encryption is delegated to the `Encrypt`/ + ## `Decrypt` request brokers, so the channel keeps no per-instance + ## encryption state either. + let chn = T( + deliveryService: deliveryService, + channelId: channelId, + contentTopic: contentTopic, + senderId: senderId, + rng: libp2p_crypto.newRng(), + segmentation: SegmentationHandler.new(segConfig), + sdsHandler: SdsHandler.new(sdsConfig, senderId), + rateLimit: RateLimitManager.new(rateConfig, channelId, brokerCtx), + requestIds: initTable[RequestId, seq[RequestId]](), + pendingRequests: @[], + brokerCtx: brokerCtx, + ) + + ## Each channel owns its own egress + ingress listeners on + ## `chn.brokerCtx`, filtered to traffic addressed to this channel. + ## Keeping the listeners (and the procs they call) inside the + ## channel lets `onReadyToSend` and `onMessageReceived` stay private + ## — the manager doesn't need to know about them. + discard ReadyToSendEvent.listen( + chn.brokerCtx, + proc(evt: ReadyToSendEvent): Future[void] {.async: (raises: []).} = + if evt.channelId == chn.channelId: + await chn.onReadyToSend(evt.msgs) + , + ) + + discard MessageReceivedEvent.listen( + chn.brokerCtx, + proc(evt: MessageReceivedEvent): Future[void] {.async: (raises: []).} = + ## Drop foreign traffic (non-Reliable-Channel `meta`) and traffic + ## for other channels before doing any decode work. + if string.fromBytes(evt.message.meta) != LipWireReliableChannelVersion: + return + if evt.message.contentTopic != chn.contentTopic: + return + await chn.onMessageReceived(evt.messageHash, evt.message.payload) + , + ) + + return chn diff --git a/channels/reliable_channel_manager.nim b/channels/reliable_channel_manager.nim new file mode 100644 index 000000000..ddbdb37a6 --- /dev/null +++ b/channels/reliable_channel_manager.nim @@ -0,0 +1,138 @@ +## Reliable Channel API entry point. +## +## Owns the set of `ReliableChannel` instances and exposes lifecycle and +## send/receive operations addressed by `ChannelId`. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/tables +import results +import chronos +import stew/byteutils + +import waku/api/api +import waku/api/api_conf +import waku/events/message_events as waku_message_events +import waku/factory/waku as waku_factory +import waku/waku_core/topics + +import ./reliable_channel +import ./encryption/noop_encryption + +export reliable_channel + +type ReliableChannelManager* = ref object + channels: Table[ChannelId, ReliableChannel] + deliveryService: DeliveryService + ## Owned by the manager. The ownership chain is + ## ReliableChannelManager -> DeliveryService -> Waku -> WakuNode. + ## Hidden so callers can't substitute their own and bypass the + ## manager's pipeline. + brokerCtx: BrokerContext + +proc new*( + T: type ReliableChannelManager, + conf: WakuNodeConf, + brokerCtx: BrokerContext = globalBrokerContext(), +): Future[Result[T, string]] {.async.} = + ## TODO !! The proper ownership chain is: + ## ReliableChannelManager -> DeliveryService (MessagingClient) -> Waku (Kernel/Protocols) -> WakuNode, + ## and this will be implemented in the future. For now, `createNode` + ## is called here to get a DeliveryService instance, and the WakuNode is immediately discarded. + ## This is a temporary workaround to get the API + + let waku = ?(await createNode(conf)) + + let manager = T( + channels: initTable[ChannelId, ReliableChannel](), + deliveryService: waku.deliveryService, + brokerCtx: brokerCtx, + ) + + return ok(manager) + +proc start*(self: ReliableChannelManager): Result[void, string] = + ## Bring the owned DeliveryService up. Separated from `new` so callers + ## can register encryption providers / create channels before traffic + ## starts flowing. + self.deliveryService.startDeliveryService() + +proc stop*(self: ReliableChannelManager) {.async.} = + if not self.deliveryService.isNil(): + await self.deliveryService.stopDeliveryService() + +proc createReliableChannel*( + self: ReliableChannelManager, + channelId: ChannelId, + contentTopic: ContentTopic, + senderId: SdsParticipantID, +): Result[ChannelId, string] = + ## Spec entry point. The `DeliveryService` and `rng` the channel needs + ## are sourced from the owning `ReliableChannelManager` rather than + ## passed per call. Encryption is wired up through the `Encrypt`/ + ## `Decrypt` request brokers — the application installs its own + ## providers (or `setNoopEncryption()`) before traffic flows. + ## + ## Segmentation, SDS and rate-limit configs will eventually be read + ## from the node's `NodeConfig`. Defaults for now. + if self.channels.hasKey(channelId): + return err("channel already exists: " & channelId) + + let segConfig = SegmentationConfig( + segmentSizeBytes: DefaultSegmentSizeBytes, + enableReedSolomon: false, + persistence: nil, + ) + let sdsConfig = SdsConfig( + acknowledgementTimeoutMs: DefaultAcknowledgementTimeoutMs, + maxRetransmissions: DefaultMaxRetransmissions, + causalHistorySize: DefaultCausalHistorySize, + persistence: nil, + ) + let rateConfig = RateLimitConfig( + epochPeriodSec: DefaultEpochPeriodSec, messagesPerEpoch: DefaultMessagesPerEpoch + ) + + let chn = ReliableChannel.new( + deliveryService = self.deliveryService, + channelId = channelId, + contentTopic = contentTopic, + senderId = senderId, + segConfig = segConfig, + sdsConfig = sdsConfig, + rateConfig = rateConfig, + brokerCtx = self.brokerCtx, + ) + + self.channels[channelId] = chn + return ok(channelId) + +proc closeChannel*( + self: ReliableChannelManager, channelId: ChannelId +): Result[void, string] = + ## Flush state, persist outstanding SDS buffers, release resources. + if not self.channels.hasKey(channelId): + return err("unknown channel: " & channelId) + self.channels.del(channelId) + return ok() + +proc send*( + self: ReliableChannelManager, + channelId: ChannelId, + appPayload: seq[byte], + ephemeral: bool = false, +): Result[RequestId, string] = + ## Spec-level entry point. Looks the channel up by id and delegates + ## to `ReliableChannel.send`, which exposes the visible pipeline + ## segmentation -> sds -> rate_limit_manager -> encryption. + let chn = self.channels.getOrDefault(channelId) + if chn.isNil(): + return err("unknown channel: " & channelId) + return chn.send(appPayload, ephemeral) + +## Inbound messages are not handed to the manager by direct call. Each +## `ReliableChannel` installs its own `MessageReceivedEvent` listener +## in `ReliableChannel.new`, filters by spec marker and `contentTopic`, +## and routes to its private `onMessageReceived`. This keeps the lower +## layer (MessagingAPI/Waku) unaware of the existence of ReliableChannel +## and keeps the manager out of per-channel event dispatch. diff --git a/channels/scalable_data_sync/scalable_data_sync.nim b/channels/scalable_data_sync/scalable_data_sync.nim new file mode 100644 index 000000000..30ad0e02b --- /dev/null +++ b/channels/scalable_data_sync/scalable_data_sync.nim @@ -0,0 +1,62 @@ +## Scalable Data Sync (SDS) component for the Reliable Channel API. +## +## Provides end-to-end delivery guarantees via causal history tracking, +## acknowledgements, and retransmission of unacknowledged segments. +## +## Skeleton: `wrapOutgoing` and `handleIncoming` are pass-throughs so +## the send/receive circuit can exercise the surrounding pipeline. +## Real SDS wrapping will plug in via `nim-sds` later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import results +import message as sds_message + +import ./sds_persistence + +export sds_message, sds_persistence + +const + DefaultAcknowledgementTimeoutMs* = 5_000 + DefaultMaxRetransmissions* = 5 + DefaultCausalHistorySize* = 2 + +type + SdsConfig* = object + acknowledgementTimeoutMs*: int + maxRetransmissions*: int + causalHistorySize*: int + persistence*: SdsPersistence + + SdsHandler* = ref object + config*: SdsConfig + participantId*: SdsParticipantID + +proc new*( + T: type SdsHandler, + config: SdsConfig, + participantId: SdsParticipantID = SdsParticipantID(""), +): T = + return T(config: config, participantId: participantId) + +proc wrapOutgoing*( + self: SdsHandler, + channelId: SdsChannelID, + senderId: SdsParticipantID, + payload: seq[byte], +): Result[seq[byte], string] = + ## Stage 2 of the outgoing pipeline (segmentation -> sds -> rate_limit_manager -> encryption). + ## Skeleton: pass the encoded segment through unchanged. Real causal + ## history / lamport / bloom-filter population will replace this. + return ok(payload) + +proc handleIncoming*( + self: SdsHandler, msg: seq[byte] +): Result[tuple[content: seq[byte], channelId: SdsChannelID], string] = + ## Skeleton: pass the bytes through; channel id is left empty until + ## the real wire format provides it. + return ok((content: msg, channelId: SdsChannelID(""))) + +proc tickRetransmissions*(self: SdsHandler) = + ## Drives retransmissions of unacknowledged messages. + discard diff --git a/channels/scalable_data_sync/sds_persistence.nim b/channels/scalable_data_sync/sds_persistence.nim new file mode 100644 index 000000000..8089595ea --- /dev/null +++ b/channels/scalable_data_sync/sds_persistence.nim @@ -0,0 +1,25 @@ +## Persistence backend for SDS outgoing buffer and causal history. +## +## TODO (raised in PR review): this surface is duplicating concerns that +## should come from the SDS module itself. Once the SDS module exposes a +## complete persistence contract, drop this file and import that surface +## instead of re-declaring it here. + +import message + +type + SdsPersistenceKind* {.pure.} = enum + InMemory + Sqlite + + SdsPersistence* = ref object of RootObj + kind*: SdsPersistenceKind + +method storeOutgoing*(self: SdsPersistence, msg: SdsMessage) {.base.} = + discard + +method markAcknowledged*(self: SdsPersistence, messageId: SdsMessageID) {.base.} = + discard + +method unackedOlderThan*(self: SdsPersistence, ageMs: int): seq[SdsMessage] {.base.} = + discard diff --git a/channels/segmentation/segment_message_proto.nim b/channels/segmentation/segment_message_proto.nim new file mode 100644 index 000000000..f19cdc27f --- /dev/null +++ b/channels/segmentation/segment_message_proto.nim @@ -0,0 +1,34 @@ +## Wire format for a single segment, per the Reliable Channel API spec. +## +## Skeleton: encode/decode treat the segment as just its payload bytes, +## since for now we only ever produce a single segment per send. + +type SegmentMessageProto* = object + entireMessageHash*: seq[byte] ## Keccak256(original payload), 32 bytes + dataSegmentIndex*: uint32 ## zero-indexed sequence number for data segments + dataSegmentCount*: uint32 ## number of data segments (>= 1) + payload*: seq[byte] ## segment payload (data or parity shard) + paritySegmentIndex*: uint32 ## zero-based sequence number for parity segments + paritySegmentCount*: uint32 ## number of parity segments + isParity*: bool ## true for parity segments, false (default) for data segments + +proc isParityMessage*(self: SegmentMessageProto): bool = + self.isParity + +proc isValid*(self: SegmentMessageProto): bool = + ## Validates hash length (32 bytes), segment indices and counts. + discard + +proc encode*(self: SegmentMessageProto): seq[byte] = + self.payload + +proc decode*(T: type SegmentMessageProto, buf: seq[byte]): T = + T( + entireMessageHash: @[], + dataSegmentIndex: 0, + dataSegmentCount: 1, + payload: buf, + paritySegmentIndex: 0, + paritySegmentCount: 0, + isParity: false, + ) diff --git a/channels/segmentation/segmentation.nim b/channels/segmentation/segmentation.nim new file mode 100644 index 000000000..9fc7964c0 --- /dev/null +++ b/channels/segmentation/segmentation.nim @@ -0,0 +1,70 @@ +## Segmentation component for the Reliable Channel API. +## +## Splits large application payloads into transmittable segments and +## reassembles them on reception. Supports optional Reed-Solomon parity +## segments for loss recovery, as per the Reliable Channel API spec. +## +## For the skeleton everything fits in a single segment: real chunking +## and Reed-Solomon parity will be plugged in later. +## +## See: https://lip.logos.co/messaging/raw/reliable-channel-api.html + +import std/options +import ./segment_message_proto +import ./segmentation_persistence + +export segment_message_proto, segmentation_persistence + +const + DefaultSegmentSizeBytes* = 102_400 + SegmentsParityRate* = 0.125 + SegmentsReedSolomonMaxCount* = 256 + +type + SegmentationConfig* = object + segmentSizeBytes*: int + enableReedSolomon*: bool + persistence*: SegmentationPersistence + + SegmentationHandler* = ref object + config*: SegmentationConfig + + ReassemblyResult* = object + payload*: seq[byte] + entireMessageHash*: seq[byte] + +proc new*(T: type SegmentationHandler, config: SegmentationConfig): T = + return T(config: config) + +proc performSegmentation*( + self: SegmentationHandler, payload: seq[byte] +): seq[seq[byte]] = + ## Skeleton behaviour: emit exactly one segment carrying the whole + ## payload. Real chunking and Reed-Solomon parity will replace this. + let segment = SegmentMessageProto( + entireMessageHash: @[], + dataSegmentIndex: 0, + dataSegmentCount: 1, + payload: payload, + paritySegmentIndex: 0, + paritySegmentCount: 0, + isParity: false, + ) + return @[segment.encode()] + +proc handleIncomingSegment*( + self: SegmentationHandler, segmentBytes: seq[byte] +): Option[ReassemblyResult] = + ## Skeleton behaviour: every segment is already a complete message + ## (since `performSegmentation` always emits one), so just hand the + ## payload straight back. + let segment = SegmentMessageProto.decode(segmentBytes) + return some( + ReassemblyResult( + payload: segment.payload, entireMessageHash: segment.entireMessageHash + ) + ) + +proc cleanupSegments*(self: SegmentationHandler) = + ## Drop expired partial-reassembly state. + discard diff --git a/channels/segmentation/segmentation_persistence.nim b/channels/segmentation/segmentation_persistence.nim new file mode 100644 index 000000000..cc34c36d2 --- /dev/null +++ b/channels/segmentation/segmentation_persistence.nim @@ -0,0 +1,20 @@ +## Persistence backend interface for segmentation reassembly state. +## +## Allows partial reassembly state to survive process restarts. + +type + SegmentationPersistenceKind* {.pure.} = enum + InMemory + Sqlite + + SegmentationPersistence* = ref object of RootObj + kind*: SegmentationPersistenceKind + +method put*(self: SegmentationPersistence, key: seq[byte], value: seq[byte]) {.base.} = + discard + +method get*(self: SegmentationPersistence, key: seq[byte]): seq[byte] {.base.} = + discard + +method delete*(self: SegmentationPersistence, key: seq[byte]) {.base.} = + discard diff --git a/channels/types.nim b/channels/types.nim new file mode 100644 index 000000000..4070ed620 --- /dev/null +++ b/channels/types.nim @@ -0,0 +1,15 @@ +## Core identifier types for the Reliable Channel API. + +import std/hashes +import waku/api/types as api_types + +import ./scalable_data_sync/scalable_data_sync + +export scalable_data_sync +export api_types + +type ChannelId* = SdsChannelID + +proc hash*(r: RequestId): Hash = + ## Allows `RequestId` to be used as a `Table` key. + hash(string(r)) diff --git a/tests/all_tests_waku.nim b/tests/all_tests_waku.nim index e64922f4c..963a948a3 100644 --- a/tests/all_tests_waku.nim +++ b/tests/all_tests_waku.nim @@ -88,3 +88,6 @@ import ./tools/test_all # Persistency library tests import ./persistency/test_all + +# Reliable Channel API tests +import ./channels/test_all diff --git a/tests/channels/test_all.nim b/tests/channels/test_all.nim new file mode 100644 index 000000000..04b448707 --- /dev/null +++ b/tests/channels/test_all.nim @@ -0,0 +1,3 @@ +{.used.} + +import ./test_reliable_channel_send_receive diff --git a/tests/channels/test_reliable_channel_send_receive.nim b/tests/channels/test_reliable_channel_send_receive.nim new file mode 100644 index 000000000..052cd35c9 --- /dev/null +++ b/tests/channels/test_reliable_channel_send_receive.nim @@ -0,0 +1,149 @@ +{.used.} + +import std/[net] +import chronos, testutils/unittests, stew/byteutils +import brokers/broker_context + +import ../testlib/[common, wakucore, wakunode, testasync] + +import waku +import waku/[waku_node, waku_core] +import waku/factory/waku_conf +import waku/events/message_events as waku_message_events +import tools/confutils/cli_args + +import channels/reliable_channel_manager +import channels/encryption/noop_encryption + +const TestTimeout = chronos.seconds(15) + +proc createApiNodeConf(): WakuNodeConf = + var conf = defaultWakuNodeConf().valueOr: + raiseAssert error + conf.mode = cli_args.WakuMode.Core + conf.listenAddress = parseIpAddress("0.0.0.0") + conf.tcpPort = Port(0) + conf.discv5UdpPort = Port(0) + conf.clusterId = 3'u16 + conf.numShardsInNetwork = 1 + conf.reliabilityEnabled = true + conf.rest = false + return conf + +suite "Reliable Channel - ingress": + asyncTest "manager dispatches marked WakuMessage to the right channel": + ## Unit test for the receive side of the API: instead of standing + ## up two libp2p nodes and a relay mesh, we drive the manager + ## directly by emitting a `MessageReceivedEvent` (the exact event + ## the DeliveryService emits when a `WakuMessage` arrives off the + ## wire). The manager must: + ## - drop traffic missing the Reliable Channel spec marker + ## - dispatch the matching channel's `onMessageReceived` + ## - emit `ChannelMessageReceivedEvent` with the payload + const + channelId = ChannelId("test-channel") + contentTopic = ContentTopic("/reliable-channel/test/proto") + let appPayload = "hello reliable channel".toBytes() + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + ## Noop encryption providers so the Encrypt/Decrypt brokers have + ## something to dispatch to; without this the channel falls back to + ## plaintext anyway, but installing them is the documented setup. + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + let received = newFuture[seq[byte]]("channel-message-received") + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if not received.finished() and evt.channelId == channelId: + received.complete(evt.payload) + , + ) + .expect("listen ChannelMessageReceivedEvent") + + ## Build a `WakuMessage` that looks like one that came in off the + ## wire from a peer: the spec marker on `meta` plus the right content + ## topic. The manager's ingress listener should pick it up, + ## decrypt (noop), unwrap SDS (pass-through), reassemble (one + ## segment), and finally emit `ChannelMessageReceivedEvent`. + let inboundMsg = WakuMessage( + payload: appPayload, + contentTopic: contentTopic, + version: 0, + meta: LipWireReliableChannelVersion.toBytes(), + ) + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + ) + + let arrived = await received.withTimeout(TestTimeout) + check arrived + if arrived: + check received.read() == appPayload + + await manager.stop() + + asyncTest "manager drops unmarked WakuMessage": + ## Mirror of the above: same content topic, but `meta` is empty + ## (i.e. foreign traffic). The channel-level event must NOT fire. + const + channelId = ChannelId("test-channel-2") + contentTopic = ContentTopic("/reliable-channel/test/proto") + let appPayload = "foreign payload".toBytes() + + var manager: ReliableChannelManager + var brokerCtx: BrokerContext + lockNewGlobalBrokerContext: + brokerCtx = globalBrokerContext() + manager = (await ReliableChannelManager.new(createApiNodeConf())).expect( + "Failed to create manager" + ) + + setNoopEncryption() + + discard manager + .createReliableChannel(channelId, contentTopic, SdsParticipantID("local")) + .expect("createReliableChannel") + + var fired = false + discard ChannelMessageReceivedEvent + .listen( + brokerCtx, + proc(evt: ChannelMessageReceivedEvent) {.async: (raises: []).} = + if evt.channelId == channelId: + fired = true + , + ) + .expect("listen ChannelMessageReceivedEvent") + + let inboundMsg = WakuMessage( + payload: appPayload, + contentTopic: contentTopic, + version: 0, + meta: @[], ## no Reliable Channel spec marker + ) + + waku_message_events.MessageReceivedEvent.emit( + brokerCtx, + waku_message_events.MessageReceivedEvent(messageHash: "", message: inboundMsg), + ) + + ## Give the event broker a chance to fan out. + await sleepAsync(100.milliseconds) + check not fired + + await manager.stop()