From b95a74c6002216ff09895d08eefbfcc26f84de45 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Fri, 26 Jun 2026 11:56:47 +0200 Subject: [PATCH] messaging: depend on the Waku kernel, not the raw WakuNode MessagingClient sat one layer below where the documented layering (Waku <- MessagingClient <- ReliableChannelManager) places it: it took a raw WakuNode and reached around the Waku kernel to its internals. Make the messaging layer hold the Waku kernel and read `waku.node` from there, so the declared dependency matches the layering and the layer holds the kernel handle it will later route through. The health-monitor test hand-builds a WakuNode, so it now wraps it in a minimal Waku (conf/stateInfo only satisfy {.requiresInit.}; the messaging path reads neither). Co-Authored-By: Claude Opus 4.8 --- logos_delivery/logos_delivery.nim | 2 +- logos_delivery/messaging/api/send.nim | 14 +++++++++----- logos_delivery/messaging/api/subscription.nim | 5 +++-- logos_delivery/messaging/messaging_client.nim | 16 ++++++++-------- tests/node/test_wakunode_health_monitor.nim | 18 ++++++++++++++++-- 5 files changed, 37 insertions(+), 18 deletions(-) diff --git a/logos_delivery/logos_delivery.nim b/logos_delivery/logos_delivery.nim index 435bb968d..e23766dba 100644 --- a/logos_delivery/logos_delivery.nim +++ b/logos_delivery/logos_delivery.nim @@ -97,7 +97,7 @@ proc new*( let waku = (await Waku.new(layerConf.waku, appCallbacks)).valueOr: return err("failed to create Waku: " & error) - let messagingClient = MessagingClient.new(layerConf.messaging, waku.node).valueOr: + let messagingClient = MessagingClient.new(layerConf.messaging, waku).valueOr: return err("failed to create MessagingClient: " & error) let reliableChannelManager = ReliableChannelManager.new( diff --git a/logos_delivery/messaging/api/send.nim b/logos_delivery/messaging/api/send.nim index cc6312471..ed909b6d0 100644 --- a/logos_delivery/messaging/api/send.nim +++ b/logos_delivery/messaging/api/send.nim @@ -3,6 +3,7 @@ import results, chronos, chronicles import logos_delivery/api/types import logos_delivery/messaging/messaging_client +import logos_delivery/waku/waku import logos_delivery/waku/node/[waku_node, subscription_manager] import logos_delivery/messaging/delivery_service/send_service import logos_delivery/messaging/delivery_service/send_service/delivery_task @@ -16,17 +17,20 @@ proc send*( ## id the caller can correlate with `MessageSentEvent` / `MessageErrorEvent`. ?self.checkApiAvailability() - let isSubbed = - self.node.subscriptionManager.isSubscribed(envelope.contentTopic).valueOr(false) + let isSubbed = self.waku.node.subscriptionManager.isSubscribed( + envelope.contentTopic + ).valueOr(false) if not isSubbed: info "Auto-subscribing to topic on send", contentTopic = envelope.contentTopic - self.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: + self.waku.node.subscriptionManager.subscribe(envelope.contentTopic).isOkOr: warn "Failed to auto-subscribe", error = error return err("Failed to auto-subscribe before sending: " & error) - let requestId = RequestId.new(self.node.rng) + let requestId = RequestId.new(self.waku.node.rng) - let deliveryTask = DeliveryTask.new(requestId, envelope, self.node.brokerCtx).valueOr: + let deliveryTask = DeliveryTask.new( + requestId, envelope, self.waku.node.brokerCtx + ).valueOr: return err("MessagingClient.send: Failed to create delivery task: " & error) asyncSpawn self.sendService.send(deliveryTask) diff --git a/logos_delivery/messaging/api/subscription.nim b/logos_delivery/messaging/api/subscription.nim index 35b8c7e53..f8fe0bfe6 100644 --- a/logos_delivery/messaging/api/subscription.nim +++ b/logos_delivery/messaging/api/subscription.nim @@ -3,16 +3,17 @@ import results, chronos import logos_delivery/api/types import logos_delivery/messaging/messaging_client +import logos_delivery/waku/waku import logos_delivery/waku/node/[waku_node, subscription_manager] proc subscribe*( self: MessagingClient, contentTopic: ContentTopic ): Future[Result[void, string]] {.async.} = ?self.checkApiAvailability() - return self.node.subscriptionManager.subscribe(contentTopic) + return self.waku.node.subscriptionManager.subscribe(contentTopic) proc unsubscribe*( self: MessagingClient, contentTopic: ContentTopic ): Result[void, string] = ?self.checkApiAvailability() - return self.node.subscriptionManager.unsubscribe(contentTopic) + return self.waku.node.subscriptionManager.unsubscribe(contentTopic) diff --git a/logos_delivery/messaging/messaging_client.nim b/logos_delivery/messaging/messaging_client.nim index 69c6c520b..cf2db623b 100644 --- a/logos_delivery/messaging/messaging_client.nim +++ b/logos_delivery/messaging/messaging_client.nim @@ -4,7 +4,7 @@ import results, chronos import logos_delivery/api/messaging_client_api, - logos_delivery/waku/node/waku_node, + logos_delivery/waku/waku, logos_delivery/messaging/delivery_service/[recv_service, send_service] # Surfaces the messaging API interface (and its Message* events) to consumers. @@ -18,19 +18,19 @@ type useP2PReliability*: bool MessagingClient* = ref object of IMessagingClient - node*: WakuNode ## Waku core driven by this layer; read by `messaging/api.nim`. + waku*: Waku ## The Waku kernel this layer drives; read by `messaging/api/*`. sendService*: SendService recvService*: RecvService started: bool proc new*( - T: type MessagingClient, conf: MessagingClientConf, node: WakuNode + T: type MessagingClient, conf: MessagingClientConf, waku: Waku ): Result[T, string] = - ## The messaging layer chains onto Waku: it drives the underlying - ## `WakuNode` (Waku's core) for transport while exposing its own send/recv API. - let sendService = ?SendService.new(conf.useP2PReliability, node) - let recvService = RecvService.new(node) - ok(T(node: node, sendService: sendService, recvService: recvService)) + ## The messaging layer chains onto Waku: it drives the underlying Waku kernel + ## for transport while exposing its own send/recv API. + let sendService = ?SendService.new(conf.useP2PReliability, waku.node) + let recvService = RecvService.new(waku.node) + ok(T(waku: waku, sendService: sendService, recvService: recvService)) proc start*(self: MessagingClient): Result[void, string] = if self.started: diff --git a/tests/node/test_wakunode_health_monitor.nim b/tests/node/test_wakunode_health_monitor.nim index acf0bc09a..148c93330 100644 --- a/tests/node/test_wakunode_health_monitor.nim +++ b/tests/node/test_wakunode_health_monitor.nim @@ -26,6 +26,8 @@ import import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils import logos_delivery/waku/node/subscription_manager +import logos_delivery/waku/waku +import logos_delivery/waku/factory/waku_state_info import logos_delivery/messaging/messaging_client const MockDLow = 4 # Mocked GossipSub DLow value @@ -228,8 +230,14 @@ suite "Health Monitor - events": nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata") await nodeA.start() + # MessagingClient now depends on the Waku kernel, not the raw node. Only + # `waku.node` is read on the messaging path; `conf`/`stateInfo` are supplied + # solely to satisfy Waku's {.requiresInit.} fields. + let waku = Waku( + node: nodeA, conf: defaultTestWakuConf(), stateInfo: WakuStateInfo.init(nodeA) + ) let ds = MessagingClient - .new(MessagingClientConf(useP2PReliability: false), nodeA) + .new(MessagingClientConf(useP2PReliability: false), waku) .expect("Failed to create MessagingClient") ds.start().expect("Failed to start MessagingClient") @@ -333,8 +341,14 @@ suite "Health Monitor - events": nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata") await nodeA.start() + # MessagingClient now depends on the Waku kernel, not the raw node. Only + # `waku.node` is read on the messaging path; `conf`/`stateInfo` are supplied + # solely to satisfy Waku's {.requiresInit.} fields. + let waku = Waku( + node: nodeA, conf: defaultTestWakuConf(), stateInfo: WakuStateInfo.init(nodeA) + ) let ds = MessagingClient - .new(MessagingClientConf(useP2PReliability: false), nodeA) + .new(MessagingClientConf(useP2PReliability: false), waku) .expect("Failed to create MessagingClient") ds.start().expect("Failed to start MessagingClient") let subMgr = nodeA.subscriptionManager