From e4c89917279bf5b83e53b695fda0dc9a0b0fae95 Mon Sep 17 00:00:00 2001 From: Ivan FB Date: Fri, 26 Jun 2026 14:03:26 +0200 Subject: [PATCH] messaging: drive delivery services through the Waku kernel `SendService`/`RecvService` took a raw `WakuNode` and reached into its internals (`wakuStoreClient`, `subscriptionManager`, `peerManager`), which breaks the layering: the messaging layer should depend on the Waku kernel, not the node. Widen the Waku api surface with the operations these services need (`storeQueryToAny`, `isStoreMounted`, `hasStorePeer`, `isContentSubscribed`, `subscribedContentTopics`) and switch both services to hold `Waku` and call that surface instead. The send-processor chain still pulls raw publish handles (relay/lightpush/RLN/peer manager) from `waku.node`, since the kernel API does not expose publishing primitives yet; this is isolated to the constructor and flagged with a comment. Also make `MessagingClient.new` return explicitly. Co-Authored-By: Claude Opus 4.8 --- .../recv_service/recv_service.nim | 32 +++++++------------ .../send_service/send_service.nim | 31 ++++++++++-------- logos_delivery/messaging/messaging_client.nim | 6 ++-- logos_delivery/waku/api/store.nim | 27 +++++++++++++++- logos_delivery/waku/api/subscriptions.nim | 14 ++++++++ 5 files changed, 73 insertions(+), 37 deletions(-) diff --git a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim index 25b35222d..1090be223 100644 --- a/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim +++ b/logos_delivery/messaging/delivery_service/recv_service/recv_service.nim @@ -7,15 +7,9 @@ import std/[tables, sequtils, options, sets] import chronos, chronicles, libp2p/utility import brokers/broker_context import - logos_delivery/waku/[ - waku_core, - waku_core/topics, - waku_store/client, - waku_store/common, - waku_filter_v2/client, - waku_node, - node/subscription_manager, - ] + logos_delivery/waku/[waku_core, waku_core/topics, waku_store/common], + logos_delivery/waku/waku, + logos_delivery/waku/api/[store, subscriptions] import logos_delivery/api/kernel_api, # MessageSeenEvent logos_delivery/api/messaging_client_api, # MessageReceivedEvent @@ -38,7 +32,7 @@ type RecvMessage = object type RecvService* = ref object of RootObj brokerCtx: BrokerContext - node: WakuNode + waku: Waku seenMsgListener: MessageSeenEventListener connStatusListener: EventConnectionStatusChangeListener @@ -60,7 +54,7 @@ proc getMissingMsgsFromStore( self: RecvService, msgHashes: seq[WakuMessageHash] ): Future[Result[seq[TupleHashAndMsg], string]] {.async.} = let storeResp: StoreQueryResponse = ( - await self.node.wakuStoreClient.queryToAny( + await self.waku.storeQueryToAny( StoreQueryRequest(includeData: true, messageHashes: msgHashes) ) ).valueOr: @@ -85,9 +79,7 @@ proc processIncomingMessage( ## or if the message is a duplicate (recently-seen). Otherwise, save it as ## recently-seen, emit a MessageReceivedEvent, and return true. - if not self.node.subscriptionManager.isContentSubscribed( - pubsubTopic, message.contentTopic - ): + if not self.waku.isContentSubscribed(pubsubTopic, message.contentTopic): trace "skipping message as I am not subscribed", shard = pubsubTopic, contentTopic = message.contentTopic return false @@ -108,16 +100,16 @@ proc processIncomingMessage( proc checkStore*(self: RecvService) {.async.} = ## Checks the store for messages that were not received directly and ## delivers them via MessageReceivedEvent. - if self.node.wakuStoreClient.isNil(): + if not self.waku.isStoreMounted(): error "recv service has no store client mounted, skipping store check" return self.endTimeToCheck = getNowInNanosecondTime() ## query store and deliver new recovered messages per subscribed topic - for pubsubTopic, contentTopics in self.node.subscriptionManager.subscribedContentTopics: + for (pubsubTopic, contentTopics) in self.waku.subscribedContentTopics(): let storeResp: StoreQueryResponse = ( - await self.node.wakuStoreClient.queryToAny( + await self.waku.storeQueryToAny( StoreQueryRequest( includeData: false, pubsubTopic: some(pubsubTopic), @@ -171,14 +163,14 @@ proc onConnectionStatusChange(self: RecvService, status: ConnectionStatus) = info "recv service backfilling missed messages after coming back online" self.backfillHandler = self.checkStore() -proc new*(T: typedesc[RecvService], node: WakuNode): T = +proc new*(T: typedesc[RecvService], waku: Waku): T = ## The storeClient will help to acquire any possible missed messages let now = getNowInNanosecondTime() var recvService = RecvService( - node: node, + waku: waku, startTimeToCheck: now, - brokerCtx: node.brokerCtx, + brokerCtx: waku.brokerCtx, recentReceivedMsgs: @[], ) diff --git a/logos_delivery/messaging/delivery_service/send_service/send_service.nim b/logos_delivery/messaging/delivery_service/send_service/send_service.nim index f77c6858c..026acee98 100644 --- a/logos_delivery/messaging/delivery_service/send_service/send_service.nim +++ b/logos_delivery/messaging/delivery_service/send_service/send_service.nim @@ -10,15 +10,15 @@ import logos_delivery/waku/[ waku_core, node/waku_node, - node/subscription_manager, node/peer_manager, - waku_store/client, waku_store/common, waku_relay/protocol, waku_rln_relay/rln_relay, waku_lightpush/client, waku_lightpush/callbacks, - ] + ], + logos_delivery/waku/waku, + logos_delivery/waku/api/[store, subscriptions] import logos_delivery/api/messaging_client_api logScope: @@ -57,7 +57,7 @@ type SendService* = ref object of RootObj serviceLoopHandle: Future[void] ## handle that allows to stop the async task sendProcessor: BaseSendProcessor - node: WakuNode + waku: Waku checkStoreForMessages: bool lastStoreCheckTime: Moment ## throttles store validation queries to ArchiveTime cadence @@ -97,26 +97,31 @@ proc setupSendProcessorChain( return ok(processors[0]) proc new*( - T: typedesc[SendService], preferP2PReliability: bool, w: WakuNode + T: typedesc[SendService], preferP2PReliability: bool, waku: Waku ): Result[T, string] = - if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil(): + # The send-processor chain needs raw publish handles (relay, lightpush client, + # RLN, peer manager) that the kernel API does not expose yet, so it is built + # from `waku.node`. Everything else goes through the Waku api surface. + let node = waku.node + if node.wakuRelay.isNil() and node.wakuLightpushClient.isNil(): return err( "Could not create SendService. wakuRelay or wakuLightpushClient should be set" ) - let checkStoreForMessages = preferP2PReliability and not w.wakuStoreClient.isNil() + let checkStoreForMessages = preferP2PReliability and waku.isStoreMounted() let sendProcessorChain = setupSendProcessorChain( - w.peerManager, w.wakuLightPushClient, w.wakuRelay, w.wakuRlnRelay, w.brokerCtx + node.peerManager, node.wakuLightPushClient, node.wakuRelay, node.wakuRlnRelay, + waku.brokerCtx, ).valueOr: return err("failed to setup SendProcessorChain: " & $error) let sendService = SendService( - brokerCtx: w.brokerCtx, + brokerCtx: waku.brokerCtx, taskCache: newSeq[DeliveryTask](), serviceLoopHandle: nil, sendProcessor: sendProcessorChain, - node: w, + waku: waku, checkStoreForMessages: checkStoreForMessages, lastStoreCheckTime: Moment.now(), ) @@ -127,7 +132,7 @@ proc addTask(self: SendService, task: DeliveryTask) = self.taskCache.addUnique(task) proc isStorePeerAvailable*(sendService: SendService): bool = - return sendService.node.peerManager.selectPeer(WakuStoreCodec).isSome() + return sendService.waku.hasStorePeer() proc checkMsgsInStore(self: SendService, tasksToValidate: seq[DeliveryTask]) {.async.} = if tasksToValidate.len() == 0: @@ -142,7 +147,7 @@ proc checkMsgsInStore(self: SendService, tasksToValidate: seq[DeliveryTask]) {.a # TODO: confirm hash format for store query!!! let storeResp: StoreQueryResponse = ( - await self.node.wakuStoreClient.queryToAny( + await self.waku.storeQueryToAny( StoreQueryRequest(includeData: false, messageHashes: hashesToValidate) ) ).valueOr: @@ -292,7 +297,7 @@ proc send*(self: SendService, task: DeliveryTask) {.async.} = info "SendService.send: processing delivery task", requestId = task.requestId, msgHash = task.msgHash.to0xHex() - self.node.subscriptionManager.subscribe(task.msg.contentTopic).isOkOr: + self.waku.subscribe(task.msg.contentTopic).isOkOr: error "SendService.send: failed to subscribe to content topic", contentTopic = task.msg.contentTopic, error = error diff --git a/logos_delivery/messaging/messaging_client.nim b/logos_delivery/messaging/messaging_client.nim index cf2db623b..4e02c7a82 100644 --- a/logos_delivery/messaging/messaging_client.nim +++ b/logos_delivery/messaging/messaging_client.nim @@ -28,9 +28,9 @@ proc new*( ): Result[T, string] = ## The messaging layer chains onto Waku: it drives the underlying Waku kernel ## for transport while exposing its own send/recv API. - let sendService = ?SendService.new(conf.useP2PReliability, waku.node) - let recvService = RecvService.new(waku.node) - ok(T(waku: waku, sendService: sendService, recvService: recvService)) + let sendService = ?SendService.new(conf.useP2PReliability, waku) + let recvService = RecvService.new(waku) + return ok(T(waku: waku, sendService: sendService, recvService: recvService)) proc start*(self: MessagingClient): Result[void, string] = if self.started: diff --git a/logos_delivery/waku/api/store.nim b/logos_delivery/waku/api/store.nim index 5095dbb0d..a54ebe1f3 100644 --- a/logos_delivery/waku/api/store.nim +++ b/logos_delivery/waku/api/store.nim @@ -1,11 +1,36 @@ ## Waku layer API — store (historical query) operations. {.push raises: [].} +import std/options import results, chronos, chronicles import logos_delivery/waku/waku import - logos_delivery/waku/[waku_core, node/waku_node, waku_store/common, waku_store/client] + logos_delivery/waku/ + [waku_core, node/waku_node, node/peer_manager, waku_store/common, waku_store/client] + +proc isStoreMounted*(self: Waku): bool = + ## True if a store client is mounted (the node can run store queries). + return not self.node.wakuStoreClient.isNil() + +proc hasStorePeer*(self: Waku): bool = + ## True if at least one store service peer is available to query. + return self.node.peerManager.selectPeer(WakuStoreCodec).isSome() + +proc storeQueryToAny*( + self: Waku, request: StoreQueryRequest +): Future[Result[StoreQueryResponse, string]] {.async.} = + ## Runs a store query against any available store peer (retries across peers). + try: + if self.node.wakuStoreClient.isNil(): + return err("wakuStoreClient is not mounted") + + let queryResponse = (await self.node.wakuStoreClient.queryToAny(request)).valueOr: + return err($error) + + return ok(queryResponse) + except CatchableError as e: + return err(e.msg) proc storeQuery*( self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int diff --git a/logos_delivery/waku/api/subscriptions.nim b/logos_delivery/waku/api/subscriptions.nim index ec73bd9bf..7638abf3e 100644 --- a/logos_delivery/waku/api/subscriptions.nim +++ b/logos_delivery/waku/api/subscriptions.nim @@ -5,6 +5,7 @@ ## kernel-level entry point so they never reach into `waku.node` internals. {.push raises: [].} +import std/sets import results import logos_delivery/waku/waku @@ -21,3 +22,16 @@ proc unsubscribe*(self: Waku, contentTopic: ContentTopic): Result[void, string] proc isSubscribed*(self: Waku, contentTopic: ContentTopic): Result[bool, string] = ## True if the node already subscribes to `contentTopic`. return self.node.subscriptionManager.isSubscribed(contentTopic) + +proc isContentSubscribed*( + self: Waku, shard: PubsubTopic, contentTopic: ContentTopic +): bool = + ## True if `contentTopic` is subscribed on the given `shard` (pubsub topic). + return self.node.subscriptionManager.isContentSubscribed(shard, contentTopic) + +proc subscribedContentTopics*(self: Waku): seq[(PubsubTopic, HashSet[ContentTopic])] = + ## Snapshot of every shard with its non-empty content-topic set. + var res: seq[(PubsubTopic, HashSet[ContentTopic])] + for shard, contentTopics in self.node.subscriptionManager.subscribedContentTopics: + res.add((shard, contentTopics)) + return res