From 0590b2bf68b211ad5d6ffb29f5908df3e9051205 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Fri, 26 Jun 2026 13:40:40 +0200 Subject: [PATCH] messaging: route api send/subscription through the Waku kernel The messaging api layer reached around the Waku kernel into the libp2p node (`waku.node.subscriptionManager`, `waku.node.rng`, `waku.node.brokerCtx`). That breaks the declared layering: messaging depends on the Waku *kernel* abstraction, not the raw node. Add a content-topic subscription api on the Waku layer (`waku/api/subscriptions.nim`) and switch the messaging send/subscription paths onto it, plus the kernel's own `rng`/`brokerCtx` fields, so the layer no longer touches `node` internals. Co-Authored-By: Claude Opus 4.8 --- logos_delivery/logos_delivery.nim | 7 +++--- logos_delivery/messaging/api/send.nim | 12 ++++------ logos_delivery/messaging/api/subscription.nim | 6 ++--- logos_delivery/waku/api/subscriptions.nim | 23 +++++++++++++++++++ 4 files changed, 35 insertions(+), 13 deletions(-) create mode 100644 logos_delivery/waku/api/subscriptions.nim diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim index e23766dba..d0fb33b90 100644 --- a/logos_delivery/logos_delivery.nim +++ b/logos_delivery/logos_delivery.nim @@ -25,11 +25,12 @@ import logos_delivery/waku/waku export waku import logos_delivery/waku/api/[ - topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health, - ping, + topics, relay, subscriptions, filter, lightpush, store, peer_manager, discovery, + debug, health, ping, ] export - topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health, ping + topics, relay, subscriptions, filter, lightpush, store, peer_manager, discovery, + debug, health, ping # `MessageSeenEvent` is surfaced via `export waku` (Kernel interface); the # remaining waku health events live here. import logos_delivery/waku/api/events/health_events diff --git a/logos_delivery/messaging/api/send.nim b/logos_delivery/messaging/api/send.nim index ca7256b3e..27c86d0ab 100644 --- a/logos_delivery/messaging/api/send.nim +++ b/logos_delivery/messaging/api/send.nim @@ -4,7 +4,7 @@ import results, chronos, chronicles import logos_delivery/api/types import logos_delivery/messaging/messaging_client import logos_delivery/waku/waku -import logos_delivery/waku/node/[waku_node, subscription_manager] +import logos_delivery/waku/api/subscriptions import logos_delivery/messaging/delivery_service/send_service import logos_delivery/messaging/delivery_service/send_service/delivery_task @@ -17,18 +17,16 @@ proc send*( ## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`. ?self.checkApiAvailability() - let isSubbed = self.waku.node.subscriptionManager - .isSubscribed(envelope.contentTopic) - .valueOr(false) + let isSubbed = self.waku.isSubscribed(envelope.contentTopic).valueOr(false) if not isSubbed: info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic - self.waku.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: + self.waku.subscribe(envelope.contentTopic).isOkOr: warn "Failed to auto-subscribe", error = error return err("Failed to auto-subscribe before sending: " & error) - let requestId = RequestId.new(self.waku.node.rng) + let requestId = RequestId.new(self.waku.rng) - let deliveryTask = DeliveryTask.new(requestId, envelope, self.waku.node.brokerCtx).valueOr: + let deliveryTask = DeliveryTask.new(requestId, envelope, self.waku.brokerCtx).valueOr: return err("MessagingClient.send: Failed to create delivery task: " & error) asyncSpawn self.sendService.send(deliveryTask) diff --git a/logos_delivery/messaging/api/subscription.nim b/logos_delivery/messaging/api/subscription.nim index f8fe0bfe6..8893af7f1 100644 --- a/logos_delivery/messaging/api/subscription.nim +++ b/logos_delivery/messaging/api/subscription.nim @@ -4,16 +4,16 @@ import results, chronos import logos_delivery/api/types import logos_delivery/messaging/messaging_client import logos_delivery/waku/waku -import logos_delivery/waku/node/[waku_node, subscription_manager] +import logos_delivery/waku/api/subscriptions proc subscribe*( self: MessagingClient, contentTopic: ContentTopic ): Future[Result[void, string]] {.async.} = ?self.checkApiAvailability() - return self.waku.node.subscriptionManager.subscribe(contentTopic) + return self.waku.subscribe(contentTopic) proc unsubscribe*( self: MessagingClient, contentTopic: ContentTopic ): Result[void, string] = ?self.checkApiAvailability() - return self.waku.node.subscriptionManager.unsubscribe(contentTopic) + return self.waku.unsubscribe(contentTopic) diff --git a/logos_delivery/waku/api/subscriptions.nim b/logos_delivery/waku/api/subscriptions.nim new file mode 100644 index 000000000..ec73bd9bf --- /dev/null +++ b/logos_delivery/waku/api/subscriptions.nim @@ -0,0 +1,23 @@ +## Waku layer API — content-topic subscription operations. +## +## These wrap the node's `SubscriptionManager`, which resolves each content +## topic to its autosharding shard. They give the layers above (messaging) a +## kernel-level entry point so they never reach into `waku.node` internals. +{.push raises: [].} + +import results + +import logos_delivery/waku/waku +import logos_delivery/waku/[waku_core, node/waku_node, node/subscription_manager] + +proc subscribe*(self: Waku, contentTopic: ContentTopic): Result[void, string] = + ## Subscribes to `contentTopic`, resolving its shard via autosharding. + return self.node.subscriptionManager.subscribe(contentTopic) + +proc unsubscribe*(self: Waku, contentTopic: ContentTopic): Result[void, string] = + ## Unsubscribes from `contentTopic`, resolving its shard via autosharding. + return self.node.subscriptionManager.unsubscribe(contentTopic) + +proc isSubscribed*(self: Waku, contentTopic: ContentTopic): Result[bool, string] = + ## True if the node already subscribes to `contentTopic`. + return self.node.subscriptionManager.isSubscribed(contentTopic)