messaging: route the send-processor chain through the Waku kernel

The send pipeline still reached into `waku.node` for its publish handles
(`wakuRelay`, `wakuRlnRelay`, `wakuLightpushClient`, `peerManager`) when
building the processor chain -- the last node coupling the previous step
had to leave in place with a comment.

Add `waku/api/publish.nim`, a Waku-layer surface for the send pipeline:
relay/lightpush availability (`hasRelay`/`hasLightpush`), the relay push
handler factory (`relayPushHandler`, which folds in the RLN proof), and
lightpush peer selection + publish (`lightpushPeerAvailable`,
`lightpushPublishToAny`). These keep the rich `WakuLightPushResult` the
processors branch on for retry decisions.

`SendService` and `LightpushSendProcessor` now drive that surface and
hold only a `Waku`; no part of the send pipeline inspects `waku.node`
anymore. "No lightpush peer" now surfaces as SERVICE_NOT_AVAILABLE, which
the processor already maps to NextRoundRetry, so behaviour is unchanged.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
This commit is contained in:
Ivan FB 2026-06-26 14:21:30 +02:00
parent e4c8991727
commit e2448115b3
No known key found for this signature in database
GPG Key ID: DF0C67A04C543270
3 changed files with 80 additions and 60 deletions

View File

@ -2,10 +2,8 @@ import logos_delivery/waku/compat/option_valueor
import chronicles, chronos, results
import std/options
import brokers/broker_context
import
logos_delivery/waku/node/peer_manager,
logos_delivery/waku/waku_core,
logos_delivery/waku/waku_lightpush/[common, client, rpc]
import logos_delivery/waku/waku_core, logos_delivery/waku/waku
import logos_delivery/waku/api/publish
import ./[delivery_task, send_processor]
@ -13,27 +11,17 @@ logScope:
topics = "send service lightpush processor"
type LightpushSendProcessor* = ref object of BaseSendProcessor
peerManager: PeerManager
lightpushClient: WakuLightPushClient
waku: Waku
proc new*(
T: typedesc[LightpushSendProcessor],
peerManager: PeerManager,
lightpushClient: WakuLightPushClient,
brokerCtx: BrokerContext,
T: typedesc[LightpushSendProcessor], waku: Waku, brokerCtx: BrokerContext
): T =
return
T(peerManager: peerManager, lightpushClient: lightpushClient, brokerCtx: brokerCtx)
proc isLightpushPeerAvailable(
self: LightpushSendProcessor, pubsubTopic: PubsubTopic
): bool =
return self.peerManager.selectPeer(WakuLightPushCodec, some(pubsubTopic)).isSome()
return T(waku: waku, brokerCtx: brokerCtx)
method isValidProcessor*(
self: LightpushSendProcessor, task: DeliveryTask
): bool {.gcsafe.} =
return self.isLightpushPeerAvailable(task.pubsubTopic)
return self.waku.lightpushPeerAvailable(task.pubsubTopic)
method sendImpl*(
self: LightpushSendProcessor, task: DeliveryTask
@ -44,14 +32,8 @@ method sendImpl*(
msgHash = task.msgHash.to0xHex(),
tryCount = task.tryCount
let peer = self.peerManager.selectPeer(WakuLightPushCodec, some(task.pubsubTopic)).valueOr:
debug "No peer available for Lightpush, request pushed back for next round",
requestId = task.requestId
task.state = DeliveryState.NextRoundRetry
return
let numLightpushServers = (
await self.lightpushClient.publish(some(task.pubsubTopic), task.msg, peer)
await self.waku.lightpushPublishToAny(task.pubsubTopic, task.msg)
).valueOr:
error "LightpushSendProcessor.sendImpl failed", error = error.desc.get($error.code)
case error.code

View File

@ -7,18 +7,9 @@ import chronos, chronicles, libp2p/utility
import brokers/broker_context
import
./[send_processor, relay_processor, lightpush_processor, delivery_task],
logos_delivery/waku/[
waku_core,
node/waku_node,
node/peer_manager,
waku_store/common,
waku_relay/protocol,
waku_rln_relay/rln_relay,
waku_lightpush/client,
waku_lightpush/callbacks,
],
logos_delivery/waku/[waku_core, waku_store/common],
logos_delivery/waku/waku,
logos_delivery/waku/api/[store, subscriptions]
logos_delivery/waku/api/[store, subscriptions, publish]
import logos_delivery/api/messaging_client_api
logScope:
@ -62,14 +53,10 @@ type SendService* = ref object of RootObj
lastStoreCheckTime: Moment ## throttles store validation queries to ArchiveTime cadence
proc setupSendProcessorChain(
peerManager: PeerManager,
lightpushClient: WakuLightPushClient,
relay: WakuRelay,
rlnRelay: WakuRLNRelay,
brokerCtx: BrokerContext,
waku: Waku, brokerCtx: BrokerContext
): Result[BaseSendProcessor, string] =
let isRelayAvail = not relay.isNil()
let isLightPushAvail = not lightpushClient.isNil()
let isRelayAvail = waku.hasRelay()
let isLightPushAvail = waku.hasLightpush()
if not isRelayAvail and not isLightPushAvail:
return err("No valid send processor found for the delivery task")
@ -77,16 +64,10 @@ proc setupSendProcessorChain(
var processors = newSeq[BaseSendProcessor]()
if isRelayAvail:
let rln: Option[WakuRLNRelay] =
if rlnRelay.isNil():
none[WakuRLNRelay]()
else:
some(rlnRelay)
let publishProc = getRelayPushHandler(relay, rln)
let publishProc = waku.relayPushHandler()
processors.add(RelaySendProcessor.new(isLightPushAvail, publishProc, brokerCtx))
if isLightPushAvail:
processors.add(LightpushSendProcessor.new(peerManager, lightpushClient, brokerCtx))
processors.add(LightpushSendProcessor.new(waku, brokerCtx))
var currentProcessor: BaseSendProcessor = processors[0]
for i in 1 ..< processors.len:
@ -99,21 +80,14 @@ proc setupSendProcessorChain(
proc new*(
T: typedesc[SendService], preferP2PReliability: bool, waku: Waku
): Result[T, string] =
# The send-processor chain needs raw publish handles (relay, lightpush client,
# RLN, peer manager) that the kernel API does not expose yet, so it is built
# from `waku.node`. Everything else goes through the Waku api surface.
let node = waku.node
if node.wakuRelay.isNil() and node.wakuLightpushClient.isNil():
if not waku.hasRelay() and not waku.hasLightpush():
return err(
"Could not create SendService. wakuRelay or wakuLightpushClient should be set"
)
let checkStoreForMessages = preferP2PReliability and waku.isStoreMounted()
let sendProcessorChain = setupSendProcessorChain(
node.peerManager, node.wakuLightPushClient, node.wakuRelay, node.wakuRlnRelay,
waku.brokerCtx,
).valueOr:
let sendProcessorChain = setupSendProcessorChain(waku, waku.brokerCtx).valueOr:
return err("failed to setup SendProcessorChain: " & $error)
let sendService = SendService(

View File

@ -0,0 +1,64 @@
## Waku layer API — message publish primitives used by the messaging send
## pipeline.
##
## Unlike `relay.nim`/`lightpush.nim`, these preserve the rich
## `WakuLightPushResult` (status code + description) that the send processors
## branch on for their retry decisions, and expose relay/lightpush availability
## so the messaging layer never inspects `waku.node` directly.
import logos_delivery/waku/compat/option_valueor
{.push raises: [].}
import std/options
import results, chronos
import logos_delivery/waku/waku
import
logos_delivery/waku/[
waku_core,
node/waku_node,
node/peer_manager,
waku_relay/protocol,
waku_rln_relay/rln_relay,
waku_lightpush/common,
waku_lightpush/rpc,
waku_lightpush/client,
waku_lightpush/callbacks,
]
# WakuLightPushResult, PushMessageHandler, LightPushErrorCode (common) plus the
# LightPushStatusCode `$`/`==` the send processors branch on (rpc).
export common, rpc
proc hasRelay*(self: Waku): bool =
## True if relay (gossipsub publishing) is mounted.
return not self.node.wakuRelay.isNil()
proc hasLightpush*(self: Waku): bool =
## True if a lightpush client is mounted.
return not self.node.wakuLightpushClient.isNil()
proc relayPushHandler*(self: Waku): PushMessageHandler =
## Builds the relay publish handler (appending an RLN proof when RLN is
## mounted) used by the send pipeline. Caller ensures relay is mounted.
let rln =
if self.node.wakuRlnRelay.isNil():
none[WakuRLNRelay]()
else:
some(self.node.wakuRlnRelay)
return getRelayPushHandler(self.node.wakuRelay, rln)
proc lightpushPeerAvailable*(self: Waku, shard: PubsubTopic): bool =
## True if a lightpush service peer is available for `shard`.
return self.node.peerManager.selectPeer(WakuLightPushCodec, some(shard)).isSome()
proc lightpushPublishToAny*(
self: Waku, shard: PubsubTopic, message: WakuMessage
): Future[WakuLightPushResult] {.async.} =
## Selects a lightpush service peer for `shard` and publishes `message`.
## Returns SERVICE_NOT_AVAILABLE when no peer is available.
let peer = self.node.peerManager.selectPeer(WakuLightPushCodec, some(shard)).valueOr:
return lightpushResultServiceUnavailable("no lightpush peer available for shard")
try:
return await self.node.wakuLightpushClient.publish(some(shard), message, peer)
except CatchableError as e:
return lightpushResultInternalError(e.msg)