diff --git a/logos_delivery/messaging/delivery_service/send_service/lightpush_processor.nim b/logos_delivery/messaging/delivery_service/send_service/lightpush_processor.nim index ae5e775aa..fa199774b 100644 --- a/logos_delivery/messaging/delivery_service/send_service/lightpush_processor.nim +++ b/logos_delivery/messaging/delivery_service/send_service/lightpush_processor.nim @@ -2,10 +2,8 @@ import logos_delivery/waku/compat/option_valueor import chronicles, chronos, results import std/options import brokers/broker_context -import - logos_delivery/waku/node/peer_manager, - logos_delivery/waku/waku_core, - logos_delivery/waku/waku_lightpush/[common, client, rpc] +import logos_delivery/waku/waku_core, logos_delivery/waku/waku +import logos_delivery/waku/api/publish import ./[delivery_task, send_processor] @@ -13,27 +11,17 @@ logScope: topics = "send service lightpush processor" type LightpushSendProcessor* = ref object of BaseSendProcessor - peerManager: PeerManager - lightpushClient: WakuLightPushClient + waku: Waku proc new*( - T: typedesc[LightpushSendProcessor], - peerManager: PeerManager, - lightpushClient: WakuLightPushClient, - brokerCtx: BrokerContext, + T: typedesc[LightpushSendProcessor], waku: Waku, brokerCtx: BrokerContext ): T = - return - T(peerManager: peerManager, lightpushClient: lightpushClient, brokerCtx: brokerCtx) - -proc isLightpushPeerAvailable( - self: LightpushSendProcessor, pubsubTopic: PubsubTopic -): bool = - return self.peerManager.selectPeer(WakuLightPushCodec, some(pubsubTopic)).isSome() + return T(waku: waku, brokerCtx: brokerCtx) method isValidProcessor*( self: LightpushSendProcessor, task: DeliveryTask ): bool {.gcsafe.} = - return self.isLightpushPeerAvailable(task.pubsubTopic) + return self.waku.lightpushPeerAvailable(task.pubsubTopic) method sendImpl*( self: LightpushSendProcessor, task: DeliveryTask @@ -44,14 +32,8 @@ method sendImpl*( msgHash = task.msgHash.to0xHex(), tryCount = task.tryCount - let peer = self.peerManager.selectPeer(WakuLightPushCodec, some(task.pubsubTopic)).valueOr: - debug "No peer available for Lightpush, request pushed back for next round", - requestId = task.requestId - task.state = DeliveryState.NextRoundRetry - return - let numLightpushServers = ( - await self.lightpushClient.publish(some(task.pubsubTopic), task.msg, peer) + await self.waku.lightpushPublishToAny(task.pubsubTopic, task.msg) ).valueOr: error "LightpushSendProcessor.sendImpl failed", error = error.desc.get($error.code) case error.code diff --git a/logos_delivery/messaging/delivery_service/send_service/send_service.nim b/logos_delivery/messaging/delivery_service/send_service/send_service.nim index 026acee98..f482a2aff 100644 --- a/logos_delivery/messaging/delivery_service/send_service/send_service.nim +++ b/logos_delivery/messaging/delivery_service/send_service/send_service.nim @@ -7,18 +7,9 @@ import chronos, chronicles, libp2p/utility import brokers/broker_context import ./[send_processor, relay_processor, lightpush_processor, delivery_task], - logos_delivery/waku/[ - waku_core, - node/waku_node, - node/peer_manager, - waku_store/common, - waku_relay/protocol, - waku_rln_relay/rln_relay, - waku_lightpush/client, - waku_lightpush/callbacks, - ], + logos_delivery/waku/[waku_core, waku_store/common], logos_delivery/waku/waku, - logos_delivery/waku/api/[store, subscriptions] + logos_delivery/waku/api/[store, subscriptions, publish] import logos_delivery/api/messaging_client_api logScope: @@ -62,14 +53,10 @@ type SendService* = ref object of RootObj lastStoreCheckTime: Moment ## throttles store validation queries to ArchiveTime cadence proc setupSendProcessorChain( - peerManager: PeerManager, - lightpushClient: WakuLightPushClient, - relay: WakuRelay, - rlnRelay: WakuRLNRelay, - brokerCtx: BrokerContext, + waku: Waku, brokerCtx: BrokerContext ): Result[BaseSendProcessor, string] = - let isRelayAvail = not relay.isNil() - let isLightPushAvail = not lightpushClient.isNil() + let isRelayAvail = waku.hasRelay() + let isLightPushAvail = waku.hasLightpush() if not isRelayAvail and not isLightPushAvail: return err("No valid send processor found for the delivery task") @@ -77,16 +64,10 @@ proc setupSendProcessorChain( var processors = newSeq[BaseSendProcessor]() if isRelayAvail: - let rln: Option[WakuRLNRelay] = - if rlnRelay.isNil(): - none[WakuRLNRelay]() - else: - some(rlnRelay) - let publishProc = getRelayPushHandler(relay, rln) - + let publishProc = waku.relayPushHandler() processors.add(RelaySendProcessor.new(isLightPushAvail, publishProc, brokerCtx)) if isLightPushAvail: - processors.add(LightpushSendProcessor.new(peerManager, lightpushClient, brokerCtx)) + processors.add(LightpushSendProcessor.new(waku, brokerCtx)) var currentProcessor: BaseSendProcessor = processors[0] for i in 1 ..< processors.len: @@ -99,21 +80,14 @@ proc setupSendProcessorChain( proc new*( T: typedesc[SendService], preferP2PReliability: bool, waku: Waku ): Result[T, string] = - # The send-processor chain needs raw publish handles (relay, lightpush client, - # RLN, peer manager) that the kernel API does not expose yet, so it is built - # from `waku.node`. Everything else goes through the Waku api surface. - let node = waku.node - if node.wakuRelay.isNil() and node.wakuLightpushClient.isNil(): + if not waku.hasRelay() and not waku.hasLightpush(): return err( "Could not create SendService. wakuRelay or wakuLightpushClient should be set" ) let checkStoreForMessages = preferP2PReliability and waku.isStoreMounted() - let sendProcessorChain = setupSendProcessorChain( - node.peerManager, node.wakuLightPushClient, node.wakuRelay, node.wakuRlnRelay, - waku.brokerCtx, - ).valueOr: + let sendProcessorChain = setupSendProcessorChain(waku, waku.brokerCtx).valueOr: return err("failed to setup SendProcessorChain: " & $error) let sendService = SendService( diff --git a/logos_delivery/waku/api/publish.nim b/logos_delivery/waku/api/publish.nim new file mode 100644 index 000000000..10d518c03 --- /dev/null +++ b/logos_delivery/waku/api/publish.nim @@ -0,0 +1,64 @@ +## Waku layer API — message publish primitives used by the messaging send +## pipeline. +## +## Unlike `relay.nim`/`lightpush.nim`, these preserve the rich +## `WakuLightPushResult` (status code + description) that the send processors +## branch on for their retry decisions, and expose relay/lightpush availability +## so the messaging layer never inspects `waku.node` directly. +import logos_delivery/waku/compat/option_valueor +{.push raises: [].} + +import std/options +import results, chronos + +import logos_delivery/waku/waku +import + logos_delivery/waku/[ + waku_core, + node/waku_node, + node/peer_manager, + waku_relay/protocol, + waku_rln_relay/rln_relay, + waku_lightpush/common, + waku_lightpush/rpc, + waku_lightpush/client, + waku_lightpush/callbacks, + ] + +# WakuLightPushResult, PushMessageHandler, LightPushErrorCode (common) plus the +# LightPushStatusCode `$`/`==` the send processors branch on (rpc). +export common, rpc + +proc hasRelay*(self: Waku): bool = + ## True if relay (gossipsub publishing) is mounted. + return not self.node.wakuRelay.isNil() + +proc hasLightpush*(self: Waku): bool = + ## True if a lightpush client is mounted. + return not self.node.wakuLightpushClient.isNil() + +proc relayPushHandler*(self: Waku): PushMessageHandler = + ## Builds the relay publish handler (appending an RLN proof when RLN is + ## mounted) used by the send pipeline. Caller ensures relay is mounted. + let rln = + if self.node.wakuRlnRelay.isNil(): + none[WakuRLNRelay]() + else: + some(self.node.wakuRlnRelay) + return getRelayPushHandler(self.node.wakuRelay, rln) + +proc lightpushPeerAvailable*(self: Waku, shard: PubsubTopic): bool = + ## True if a lightpush service peer is available for `shard`. + return self.node.peerManager.selectPeer(WakuLightPushCodec, some(shard)).isSome() + +proc lightpushPublishToAny*( + self: Waku, shard: PubsubTopic, message: WakuMessage +): Future[WakuLightPushResult] {.async.} = + ## Selects a lightpush service peer for `shard` and publishes `message`. + ## Returns SERVICE_NOT_AVAILABLE when no peer is available. + let peer = self.node.peerManager.selectPeer(WakuLightPushCodec, some(shard)).valueOr: + return lightpushResultServiceUnavailable("no lightpush peer available for shard") + try: + return await self.node.wakuLightpushClient.publish(some(shard), message, peer) + except CatchableError as e: + return lightpushResultInternalError(e.msg)