messaging: drive delivery services through the Waku kernel

`SendService`/`RecvService` took a raw `WakuNode` and reached into its
internals (`wakuStoreClient`, `subscriptionManager`, `peerManager`),
which breaks the layering: the messaging layer should depend on the Waku
kernel, not the node.

Widen the Waku api surface with the operations these services need
(`storeQueryToAny`, `isStoreMounted`, `hasStorePeer`, `isContentSubscribed`,
`subscribedContentTopics`) and switch both services to hold `Waku` and
call that surface instead. The send-processor chain still pulls raw
publish handles (relay/lightpush/RLN/peer manager) from `waku.node`,
since the kernel API does not expose publishing primitives yet; this is
isolated to the constructor and flagged with a comment.

Also make `MessagingClient.new` return explicitly.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Ivan FB 2026-06-26 14:03:26 +02:00
parent 0590b2bf68
commit e4c8991727
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
5 changed files with 73 additions and 37 deletions

View File

@ -7,15 +7,9 @@ import std/[tables, sequtils, options, sets]
import chronos, chronicles, libp2p/utility
import brokers/broker_context
import
logos_delivery/waku/[
waku_core,
waku_core/topics,
waku_store/client,
waku_store/common,
waku_filter_v2/client,
waku_node,
node/subscription_manager,
]
logos_delivery/waku/[waku_core, waku_core/topics, waku_store/common],
logos_delivery/waku/waku,
logos_delivery/waku/api/[store, subscriptions]
import
logos_delivery/api/kernel_api, # MessageSeenEvent
logos_delivery/api/messaging_client_api, # MessageReceivedEvent
@ -38,7 +32,7 @@ type RecvMessage = object
type RecvService* = ref object of RootObj
brokerCtx: BrokerContext
node: WakuNode
waku: Waku
seenMsgListener: MessageSeenEventListener
connStatusListener: EventConnectionStatusChangeListener
@ -60,7 +54,7 @@ proc getMissingMsgsFromStore(
self: RecvService, msgHashes: seq[WakuMessageHash]
): Future[Result[seq[TupleHashAndMsg], string]] {.async.} =
let storeResp: StoreQueryResponse = (
await self.node.wakuStoreClient.queryToAny(
await self.waku.storeQueryToAny(
StoreQueryRequest(includeData: true, messageHashes: msgHashes)
)
).valueOr:
@ -85,9 +79,7 @@ proc processIncomingMessage(
## or if the message is a duplicate (recently-seen). Otherwise, save it as
## recently-seen, emit a MessageReceivedEvent, and return true.
if not self.node.subscriptionManager.isContentSubscribed(
pubsubTopic, message.contentTopic
):
if not self.waku.isContentSubscribed(pubsubTopic, message.contentTopic):
trace "skipping message as I am not subscribed",
shard = pubsubTopic, contentTopic = message.contentTopic
return false
@ -108,16 +100,16 @@ proc processIncomingMessage(
proc checkStore*(self: RecvService) {.async.} =
## Checks the store for messages that were not received directly and
## delivers them via MessageReceivedEvent.
if self.node.wakuStoreClient.isNil():
if not self.waku.isStoreMounted():
error "recv service has no store client mounted, skipping store check"
return
self.endTimeToCheck = getNowInNanosecondTime()
## query store and deliver new recovered messages per subscribed topic
for pubsubTopic, contentTopics in self.node.subscriptionManager.subscribedContentTopics:
for (pubsubTopic, contentTopics) in self.waku.subscribedContentTopics():
let storeResp: StoreQueryResponse = (
await self.node.wakuStoreClient.queryToAny(
await self.waku.storeQueryToAny(
StoreQueryRequest(
includeData: false,
pubsubTopic: some(pubsubTopic),
@ -171,14 +163,14 @@ proc onConnectionStatusChange(self: RecvService, status: ConnectionStatus) =
info "recv service backfilling missed messages after coming back online"
self.backfillHandler = self.checkStore()
proc new*(T: typedesc[RecvService], node: WakuNode): T =
proc new*(T: typedesc[RecvService], waku: Waku): T =
## The storeClient will help to acquire any possible missed messages
let now = getNowInNanosecondTime()
var recvService = RecvService(
node: node,
waku: waku,
startTimeToCheck: now,
brokerCtx: node.brokerCtx,
brokerCtx: waku.brokerCtx,
recentReceivedMsgs: @[],
)

View File

@ -10,15 +10,15 @@ import
logos_delivery/waku/[
waku_core,
node/waku_node,
node/subscription_manager,
node/peer_manager,
waku_store/client,
waku_store/common,
waku_relay/protocol,
waku_rln_relay/rln_relay,
waku_lightpush/client,
waku_lightpush/callbacks,
]
],
logos_delivery/waku/waku,
logos_delivery/waku/api/[store, subscriptions]
import logos_delivery/api/messaging_client_api
logScope:
@ -57,7 +57,7 @@ type SendService* = ref object of RootObj
serviceLoopHandle: Future[void] ## handle that allows to stop the async task
sendProcessor: BaseSendProcessor
node: WakuNode
waku: Waku
checkStoreForMessages: bool
lastStoreCheckTime: Moment ## throttles store validation queries to ArchiveTime cadence
@ -97,26 +97,31 @@ proc setupSendProcessorChain(
return ok(processors[0])
proc new*(
T: typedesc[SendService], preferP2PReliability: bool, w: WakuNode
T: typedesc[SendService], preferP2PReliability: bool, waku: Waku
): Result[T, string] =
if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil():
# The send-processor chain needs raw publish handles (relay, lightpush client,
# RLN, peer manager) that the kernel API does not expose yet, so it is built
# from `waku.node`. Everything else goes through the Waku api surface.
let node = waku.node
if node.wakuRelay.isNil() and node.wakuLightpushClient.isNil():
return err(
"Could not create SendService. wakuRelay or wakuLightpushClient should be set"
)
let checkStoreForMessages = preferP2PReliability and not w.wakuStoreClient.isNil()
let checkStoreForMessages = preferP2PReliability and waku.isStoreMounted()
let sendProcessorChain = setupSendProcessorChain(
w.peerManager, w.wakuLightPushClient, w.wakuRelay, w.wakuRlnRelay, w.brokerCtx
node.peerManager, node.wakuLightPushClient, node.wakuRelay, node.wakuRlnRelay,
waku.brokerCtx,
).valueOr:
return err("failed to setup SendProcessorChain: " & $error)
let sendService = SendService(
brokerCtx: w.brokerCtx,
brokerCtx: waku.brokerCtx,
taskCache: newSeq[DeliveryTask](),
serviceLoopHandle: nil,
sendProcessor: sendProcessorChain,
node: w,
waku: waku,
checkStoreForMessages: checkStoreForMessages,
lastStoreCheckTime: Moment.now(),
)
@ -127,7 +132,7 @@ proc addTask(self: SendService, task: DeliveryTask) =
self.taskCache.addUnique(task)
proc isStorePeerAvailable*(sendService: SendService): bool =
return sendService.node.peerManager.selectPeer(WakuStoreCodec).isSome()
return sendService.waku.hasStorePeer()
proc checkMsgsInStore(self: SendService, tasksToValidate: seq[DeliveryTask]) {.async.} =
if tasksToValidate.len() == 0:
@ -142,7 +147,7 @@ proc checkMsgsInStore(self: SendService, tasksToValidate: seq[DeliveryTask]) {.a
# TODO: confirm hash format for store query!!!
let storeResp: StoreQueryResponse = (
await self.node.wakuStoreClient.queryToAny(
await self.waku.storeQueryToAny(
StoreQueryRequest(includeData: false, messageHashes: hashesToValidate)
)
).valueOr:
@ -292,7 +297,7 @@ proc send*(self: SendService, task: DeliveryTask) {.async.} =
info "SendService.send: processing delivery task",
requestId = task.requestId, msgHash = task.msgHash.to0xHex()
self.node.subscriptionManager.subscribe(task.msg.contentTopic).isOkOr:
self.waku.subscribe(task.msg.contentTopic).isOkOr:
error "SendService.send: failed to subscribe to content topic",
contentTopic = task.msg.contentTopic, error = error

View File

@ -28,9 +28,9 @@ proc new*(
): Result[T, string] =
## The messaging layer chains onto Waku: it drives the underlying Waku kernel
## for transport while exposing its own send/recv API.
let sendService = ?SendService.new(conf.useP2PReliability, waku.node)
let recvService = RecvService.new(waku.node)
ok(T(waku: waku, sendService: sendService, recvService: recvService))
let sendService = ?SendService.new(conf.useP2PReliability, waku)
let recvService = RecvService.new(waku)
return ok(T(waku: waku, sendService: sendService, recvService: recvService))
proc start*(self: MessagingClient): Result[void, string] =
if self.started:

View File

@ -1,11 +1,36 @@
## Waku layer API — store (historical query) operations.
{.push raises: [].}
import std/options
import results, chronos, chronicles
import logos_delivery/waku/waku
import
logos_delivery/waku/[waku_core, node/waku_node, waku_store/common, waku_store/client]
logos_delivery/waku/
[waku_core, node/waku_node, node/peer_manager, waku_store/common, waku_store/client]
proc isStoreMounted*(self: Waku): bool =
## True if a store client is mounted (the node can run store queries).
return not self.node.wakuStoreClient.isNil()
proc hasStorePeer*(self: Waku): bool =
## True if at least one store service peer is available to query.
return self.node.peerManager.selectPeer(WakuStoreCodec).isSome()
proc storeQueryToAny*(
self: Waku, request: StoreQueryRequest
): Future[Result[StoreQueryResponse, string]] {.async.} =
## Runs a store query against any available store peer (retries across peers).
try:
if self.node.wakuStoreClient.isNil():
return err("wakuStoreClient is not mounted")
let queryResponse = (await self.node.wakuStoreClient.queryToAny(request)).valueOr:
return err($error)
return ok(queryResponse)
except CatchableError as e:
return err(e.msg)
proc storeQuery*(
self: Waku, request: StoreQueryRequest, peer: string, timeoutMs: int

View File

@ -5,6 +5,7 @@
## kernel-level entry point so they never reach into `waku.node` internals.
{.push raises: [].}
import std/sets
import results
import logos_delivery/waku/waku
@ -21,3 +22,16 @@ proc unsubscribe*(self: Waku, contentTopic: ContentTopic): Result[void, string]
proc isSubscribed*(self: Waku, contentTopic: ContentTopic): Result[bool, string] =
## True if the node already subscribes to `contentTopic`.
return self.node.subscriptionManager.isSubscribed(contentTopic)
proc isContentSubscribed*(
self: Waku, shard: PubsubTopic, contentTopic: ContentTopic
): bool =
## True if `contentTopic` is subscribed on the given `shard` (pubsub topic).
return self.node.subscriptionManager.isContentSubscribed(shard, contentTopic)
proc subscribedContentTopics*(self: Waku): seq[(PubsubTopic, HashSet[ContentTopic])] =
## Snapshot of every shard with its non-empty content-topic set.
var res: seq[(PubsubTopic, HashSet[ContentTopic])]
for shard, contentTopics in self.node.subscriptionManager.subscribedContentTopics:
res.add((shard, contentTopics))
return res