messaging: route api send/subscription through the Waku kernel

The messaging api layer reached around the Waku kernel into the libp2p
node (`waku.node.subscriptionManager`, `waku.node.rng`,
`waku.node.brokerCtx`). That breaks the declared layering: messaging
depends on the Waku *kernel* abstraction, not the raw node.

Add a content-topic subscription api on the Waku layer
(`waku/api/subscriptions.nim`) and switch the messaging send/subscription
paths onto it, plus the kernel's own `rng`/`brokerCtx` fields, so the
layer no longer touches `node` internals.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Ivan FB 2026-06-26 13:40:40 +02:00
parent dcdc958c8e
commit 0590b2bf68
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
4 changed files with 35 additions and 13 deletions

View File

@ -25,11 +25,12 @@ import logos_delivery/waku/waku
export waku
import
logos_delivery/waku/api/[
topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health,
ping,
topics, relay, subscriptions, filter, lightpush, store, peer_manager, discovery,
debug, health, ping,
]
export
topics, relay, filter, lightpush, store, peer_manager, discovery, debug, health, ping
topics, relay, subscriptions, filter, lightpush, store, peer_manager, discovery,
debug, health, ping
# `MessageSeenEvent` is surfaced via `export waku` (Kernel interface); the
# remaining waku health events live here.
import logos_delivery/waku/api/events/health_events

View File

@ -4,7 +4,7 @@ import results, chronos, chronicles
import logos_delivery/api/types
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/waku
import logos_delivery/waku/node/[waku_node, subscription_manager]
import logos_delivery/waku/api/subscriptions
import logos_delivery/messaging/delivery_service/send_service
import logos_delivery/messaging/delivery_service/send_service/delivery_task
@ -17,18 +17,16 @@ proc send*(
## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`.
?self.checkApiAvailability()
let isSubbed = self.waku.node.subscriptionManager
.isSubscribed(envelope.contentTopic)
.valueOr(false)
let isSubbed = self.waku.isSubscribed(envelope.contentTopic).valueOr(false)
if not isSubbed:
info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic
self.waku.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr:
self.waku.subscribe(envelope.contentTopic).isOkOr:
warn "Failed to auto-subscribe", error = error
return err("Failed to auto-subscribe before sending: " & error)
let requestId = RequestId.new(self.waku.node.rng)
let requestId = RequestId.new(self.waku.rng)
let deliveryTask = DeliveryTask.new(requestId, envelope, self.waku.node.brokerCtx).valueOr:
let deliveryTask = DeliveryTask.new(requestId, envelope, self.waku.brokerCtx).valueOr:
return err("MessagingClient.send: Failed to create delivery task: " & error)
asyncSpawn self.sendService.send(deliveryTask)

View File

@ -4,16 +4,16 @@ import results, chronos
import logos_delivery/api/types
import logos_delivery/messaging/messaging_client
import logos_delivery/waku/waku
import logos_delivery/waku/node/[waku_node, subscription_manager]
import logos_delivery/waku/api/subscriptions
proc subscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Future[Result[void, string]] {.async.} =
?self.checkApiAvailability()
return self.waku.node.subscriptionManager.subscribe(contentTopic)
return self.waku.subscribe(contentTopic)
proc unsubscribe*(
self: MessagingClient, contentTopic: ContentTopic
): Result[void, string] =
?self.checkApiAvailability()
return self.waku.node.subscriptionManager.unsubscribe(contentTopic)
return self.waku.unsubscribe(contentTopic)

View File

@ -0,0 +1,23 @@
## Waku layer API — content-topic subscription operations.
##
## These wrap the node's `SubscriptionManager`, which resolves each content
## topic to its autosharding shard. They give the layers above (messaging) a
## kernel-level entry point so they never reach into `waku.node` internals.
{.push raises: [].}
import results
import logos_delivery/waku/waku
import logos_delivery/waku/[waku_core, node/waku_node, node/subscription_manager]
proc subscribe*(self: Waku, contentTopic: ContentTopic): Result[void, string] =
## Subscribes to `contentTopic`, resolving its shard via autosharding.
return self.node.subscriptionManager.subscribe(contentTopic)
proc unsubscribe*(self: Waku, contentTopic: ContentTopic): Result[void, string] =
## Unsubscribes from `contentTopic`, resolving its shard via autosharding.
return self.node.subscriptionManager.unsubscribe(contentTopic)
proc isSubscribed*(self: Waku, contentTopic: ContentTopic): Result[bool, string] =
## True if the node already subscribes to `contentTopic`.
return self.node.subscriptionManager.isSubscribed(contentTopic)