From 1c08ba5ea17b8956b4b8f2e118247ad4fdee99f4 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Fri, 23 Jan 2026 16:58:12 +0100 Subject: [PATCH] Various fixes upon test failures. Added initial of subscribe API and auto-subscribe for send api --- tests/api/test_api_send.nim | 143 +++++++++++++----- waku/api/api.nim | 23 ++- waku/api/api_conf.nim | 4 + waku/events/message_events.nim | 7 + waku/factory/waku.nim | 38 ++++- .../delivery_service/delivery_service.nim | 16 +- .../recv_service/recv_service.nim | 17 ++- .../send_service/delivery_task.nim | 3 +- .../send_service/lightpush_processor.nim | 11 +- .../send_service/relay_processor.nim | 22 ++- .../send_service/send_service.nim | 42 +++-- .../delivery_service/subscription_service.nim | 64 ++++++++ waku/node/kernel_api/relay.nim | 71 +++++---- waku/node/waku_node.nim | 11 +- waku/waku_core/message/digest.nim | 2 +- waku/waku_relay/protocol.nim | 1 + 16 files changed, 360 insertions(+), 115 deletions(-) create mode 100644 waku/node/delivery_service/subscription_service.nim diff --git a/tests/api/test_api_send.nim b/tests/api/test_api_send.nim index b74a8be47..56c7c9e83 100644 --- a/tests/api/test_api_send.nim +++ b/tests/api/test_api_send.nim @@ -1,23 +1,12 @@ {.used.} +import std/strutils +import chronos, testutils/unittests, stew/byteutils, libp2p/[switch, peerinfo] +import ../testlib/[common, wakucore, wakunode, testasync] +import ../waku_archive/archive_utils import - std/[options, sequtils, strutils], - chronos, - testutils/unittests, - stew/byteutils, - libp2p/[switch, peerinfo] -import ../testlib/[common, wakucore, wakunode, testasync, futures, testutils] -import - waku, - waku/[ - waku_node, - waku_core, - waku_relay/protocol, - waku_filter_v2/common, - waku_store/common, - common/broker/broker_context, - ] -import waku/api/api_conf, waku/factory/waku_conf, waku/factory/networks_config + waku, waku/[waku_node, waku_core, waku_relay/protocol, common/broker/broker_context] +import waku/api/api_conf, waku/factory/waku_conf suite "Waku API - Send": var @@ -38,43 +27,52 @@ suite "Waku API - Send": storeNodePeerId {.threadvar.}: PeerId asyncSetup: - # handlerFuture = newPushHandlerFuture() - # handler = proc( - # peer: PeerId, pubsubTopic: PubsubTopic, message: WakuMessage - # ): Future[WakuLightPushResult[void]] {.async.} = - # handlerFuture.complete((pubsubTopic, message)) - # return ok() - lockNewGlobalBrokerContext: relayNode1 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - await relayNode1.start() + relayNode1.mountMetadata(1, @[0'u16]).isOkOr: + raiseAssert "Failed to mount metadata: " & error (await relayNode1.mountRelay()).isOkOr: raiseAssert "Failed to mount relay" + await relayNode1.mountLibp2pPing() + await relayNode1.start() lockNewGlobalBrokerContext: relayNode2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - await relayNode2.start() + relayNode2.mountMetadata(1, @[0'u16]).isOkOr: + raiseAssert "Failed to mount metadata: " & error (await relayNode2.mountRelay()).isOkOr: raiseAssert "Failed to mount relay" + await relayNode2.mountLibp2pPing() + await relayNode2.start() lockNewGlobalBrokerContext: lightpushNode = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - await lightpushNode.start() + lightpushNode.mountMetadata(1, @[0'u16]).isOkOr: + raiseAssert "Failed to mount metadata: " & error (await lightpushNode.mountRelay()).isOkOr: raiseAssert "Failed to mount relay" (await lightpushNode.mountLightPush()).isOkOr: raiseAssert "Failed to mount lightpush" + await lightpushNode.mountLibp2pPing() + await lightpushNode.start() lockNewGlobalBrokerContext: storeNode = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - await storeNode.start() + storeNode.mountMetadata(1, @[0'u16]).isOkOr: + raiseAssert "Failed to mount metadata: " & error (await storeNode.mountRelay()).isOkOr: raiseAssert "Failed to mount relay" + # Mount archive so store can persist messages + let archiveDriver = newSqliteArchiveDriver() + storeNode.mountArchive(archiveDriver).isOkOr: + raiseAssert "Failed to mount archive: " & error await storeNode.mountStore() + await storeNode.mountLibp2pPing() + await storeNode.start() relayNode1PeerInfo = relayNode1.peerInfo.toRemotePeerInfo() relayNode1PeerId = relayNode1.peerInfo.peerId @@ -88,6 +86,27 @@ suite "Waku API - Send": storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo() storeNodePeerId = storeNode.peerInfo.peerId + # Subscribe all relay nodes to the default shard topic + const testPubsubTopic = PubsubTopic("/waku/2/rs/1/0") + proc dummyHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + discard + + relayNode1.subscribe((kind: PubsubSub, topic: testPubsubTopic), dummyHandler).isOkOr: + raiseAssert "Failed to subscribe relayNode1: " & error + relayNode2.subscribe((kind: PubsubSub, topic: testPubsubTopic), dummyHandler).isOkOr: + raiseAssert "Failed to subscribe relayNode2: " & error + + lightpushNode.subscribe((kind: PubsubSub, topic: testPubsubTopic), dummyHandler).isOkOr: + raiseAssert "Failed to subscribe lightpushNode: " & error + storeNode.subscribe((kind: PubsubSub, topic: testPubsubTopic), dummyHandler).isOkOr: + raiseAssert "Failed to subscribe storeNode: " & error + + # Subscribe all relay nodes to the default shard topic + await relayNode1.connectToNodes(@[relayNode2PeerInfo, storeNodePeerInfo]) + await lightpushNode.connectToNodes(@[relayNode2PeerInfo]) + asyncTeardown: await allFutures( relayNode1.stop(), relayNode2.stop(), lightpushNode.stop(), storeNode.stop() @@ -120,27 +139,74 @@ suite "Waku API - Send": raiseAssert error (await startWaku(addr node)).isOkOr: raiseAssert "Failed to start Waku node: " & error + # node is not connected ! + let envelope = MessageEnvelope.init( + ContentTopic("/waku/2/default-content/proto"), "test payload" + ) + + let sendResult = await node.send(envelope) + + check sendResult.isErr() # Depending on implementation, it might say "not healthy" + check sendResult.error().contains("not healthy") + + (await node.stop()).isOkOr: + raiseAssert "Failed to stop node: " & error + + asyncTest "Check API availability (healthy node)": + # Create a node config that doesn't start or has no peers + let nodeConfig = NodeConfig.init( + mode = WakuMode.Core, + protocolsConfig = ProtocolsConfig.init( + entryNodes = @[], + clusterId = 1, + autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), + ), + p2pReliability = true, + ) + + var node: Waku + lockNewGlobalBrokerContext: + node = (await createNode(nodeConfig)).valueOr: + raiseAssert error + (await startWaku(addr node)).isOkOr: + raiseAssert "Failed to start Waku node: " & error + + await node.node.connectToNodes( + @[relayNode1PeerInfo, lightpushNodePeerInfo, storeNodePeerInfo] + ) + + let sentEventFuture = newFuture[void]("sentEvent") let sentListener = MessageSentEvent.listen( node.brokerCtx, proc(event: MessageSentEvent) {.async: (raises: []).} = - raiseAssert "Should not be called" + echo "SENT EVENT TRIGGERED: requestId=", event.requestId + if not sentEventFuture.finished(): + sentEventFuture.complete() , ).valueOr: raiseAssert error + let errorEventFuture = newFuture[void]("errorEvent") let errorListener = MessageErrorEvent.listen( node.brokerCtx, proc(event: MessageErrorEvent) {.async: (raises: []).} = - check true + echo "ERROR EVENT TRIGGERED: ", event.error + if not errorEventFuture.finished(): + errorEventFuture.fail( + newException(CatchableError, "Error event triggered: " & event.error) + ) , ).valueOr: raiseAssert error + let propagatedEventFuture = newFuture[void]("propagatedEvent") let propagatedListener = MessagePropagatedEvent.listen( node.brokerCtx, proc(event: MessagePropagatedEvent) {.async: (raises: []).} = - raiseAssert "Should not be called" + echo "PROPAGATED EVENT TRIGGERED: requestId=", event.requestId + if not propagatedEventFuture.finished(): + propagatedEventFuture.complete() , ).valueOr: raiseAssert error @@ -155,13 +221,16 @@ suite "Waku API - Send": let sendResult = await node.send(envelope) - if sendResult.isErr(): - echo "Send error: ", sendResult.error + check sendResult.isOk() # Depending on implementation, it might say "not healthy" - check: - sendResult.isErr() - # Depending on implementation, it might say "not healthy" - sendResult.error.contains("not healthy") + # Wait for events with timeout + const eventTimeout = 10.seconds + discard await allFutures(sentEventFuture, propagatedEventFuture, errorEventFuture) + .withTimeout(eventTimeout) + + check sentEventFuture.completed() + check propagatedEventFuture.completed() + check not errorEventFuture.failed() (await node.stop()).isOkOr: raiseAssert "Failed to stop node: " & error diff --git a/waku/api/api.nim b/waku/api/api.nim index 4079e9b0d..c741011ed 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -3,6 +3,7 @@ import chronicles, chronos, results import waku/factory/waku import waku/[requests/health_request, waku_core, waku_node] import waku/node/delivery_service/send_service +import waku/node/delivery_service/subscription_service import ./[api_conf, types] logScope: @@ -32,11 +33,23 @@ proc checkApiAvailability(w: Waku): Result[void, string] = warn "Failed to get Waku node health status: ", error = healthStatus.error # Let's suppose the node is hesalthy enough, go ahead else: - if healthStatus.get().healthStatus != NodeHealth.Unhealthy: + if healthStatus.get().healthStatus == NodeHealth.Unhealthy: return err("Waku node is not healthy, has got no connections.") return ok() +proc subscribe*( + w: Waku, contentTopic: ContentTopic +): Future[Result[void, string]] {.async.} = + ?checkApiAvailability(w) + + return w.deliveryService.subscriptionService.subscribe(contentTopic) + +proc unsubscribe*(w: Waku, contentTopic: ContentTopic): Result[void, string] = + ?checkApiAvailability(w) + + return w.deliveryService.subscriptionService.unsubscribe(contentTopic) + proc send*( w: Waku, envelope: MessageEnvelope ): Future[Result[RequestId, string]] {.async.} = @@ -45,7 +58,13 @@ proc send*( let requestId = newRequestId(w.rng) let deliveryTask = DeliveryTask.create(requestId, envelope, w.brokerCtx).valueOr: - return err("Failed to create delivery task: " & error) + return err("API send: Failed to create delivery task: " & error) + + info "API send: scheduling delivery task", + requestId = $requestId, + pubsubTopic = deliveryTask.pubsubTopic, + contentTopic = deliveryTask.msg.contentTopic, + msgHash = deliveryTask.msgHash.shortLog() asyncSpawn w.deliveryService.sendService.send(deliveryTask) diff --git a/waku/api/api_conf.nim b/waku/api/api_conf.nim index 5dd0dcc30..47aa9e7d8 100644 --- a/waku/api/api_conf.nim +++ b/waku/api/api_conf.nim @@ -86,6 +86,7 @@ type NodeConfig* {.requiresInit.} = object protocolsConfig: ProtocolsConfig networkingConfig: NetworkingConfig ethRpcEndpoints: seq[string] + p2pReliability: bool proc init*( T: typedesc[NodeConfig], @@ -93,12 +94,14 @@ proc init*( protocolsConfig: ProtocolsConfig = TheWakuNetworkPreset, networkingConfig: NetworkingConfig = DefaultNetworkingConfig, ethRpcEndpoints: seq[string] = @[], + p2pReliability: bool = false, ): T = return T( mode: mode, protocolsConfig: protocolsConfig, networkingConfig: networkingConfig, ethRpcEndpoints: ethRpcEndpoints, + p2pReliability: p2pReliability, ) proc toWakuConf*(nodeConfig: NodeConfig): Result[WakuConf, string] = @@ -202,6 +205,7 @@ proc toWakuConf*(nodeConfig: NodeConfig): Result[WakuConf, string] = ## Various configurations b.withNatStrategy("any") + b.withP2PReliability(nodeConfig.p2pReliability) let wakuConf = b.build().valueOr: return err("Failed to build configuration: " & error) diff --git a/waku/events/message_events.nim b/waku/events/message_events.nim index 9c8a3b321..cf3dac9b7 100644 --- a/waku/events/message_events.nim +++ b/waku/events/message_events.nim @@ -1,5 +1,6 @@ import waku/common/broker/event_broker import waku/api/types +import waku/waku_core/message export types @@ -21,3 +22,9 @@ EventBroker: type MessagePropagatedEvent* = object requestId*: RequestId messageHash*: string + +EventBroker: + # Event emitted when a message is received via Waku + type MessageReceivedEvent* = object + messageHash*: string + message*: WakuMessage diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index e0e1f24d0..c9d983226 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -428,15 +428,45 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: return err("Health report not available") try: let healthReport = healthReportFut.read() - # Convert HealthStatus to NodeHealth + + # Check if Relay or Lightpush Client is ready (MinimallyHealthy condition) + var relayReady = false + var lightpushClientReady = false + var storeClientReady = false + var filterClientReady = false + + for protocolHealth in healthReport.protocolsHealth: + if protocolHealth.protocol == "Relay" and + protocolHealth.health == HealthStatus.READY: + relayReady = true + elif protocolHealth.protocol == "Lightpush Client" and + protocolHealth.health == HealthStatus.READY: + lightpushClientReady = true + elif protocolHealth.protocol == "Store Client" and + protocolHealth.health == HealthStatus.READY: + storeClientReady = true + elif protocolHealth.protocol == "Filter Client" and + protocolHealth.health == HealthStatus.READY: + filterClientReady = true + + # Determine node health based on protocol states + let isMinimallyHealthy = relayReady or lightpushClientReady let nodeHealth = - case healthReport.nodeHealth - of HealthStatus.READY: + if isMinimallyHealthy and storeClientReady and filterClientReady: NodeHealth.Healthy - of HealthStatus.SYNCHRONIZING, HealthStatus.INITIALIZING: + elif isMinimallyHealthy: NodeHealth.MinimallyHealthy else: NodeHealth.Unhealthy + + debug "Providing health report", + nodeHealth = $nodeHealth, + relayReady = relayReady, + lightpushClientReady = lightpushClientReady, + storeClientReady = storeClientReady, + filterClientReady = filterClientReady, + details = $(healthReport) + ok(RequestNodeHealth(healthStatus: nodeHealth)) except CatchableError: err("Failed to read health report: " & getCurrentExceptionMsg()), diff --git a/waku/node/delivery_service/delivery_service.nim b/waku/node/delivery_service/delivery_service.nim index 3019c0dfb..8106cba9f 100644 --- a/waku/node/delivery_service/delivery_service.nim +++ b/waku/node/delivery_service/delivery_service.nim @@ -5,6 +5,7 @@ import chronos import ./recv_service, ./send_service, + ./subscription_service, waku/[ waku_core, waku_node, @@ -17,15 +18,24 @@ import type DeliveryService* = ref object sendService*: SendService recvService: RecvService + subscriptionService*: SubscriptionService proc new*( T: type DeliveryService, useP2PReliability: bool, w: WakuNode ): Result[T, string] = ## storeClient is needed to give store visitility to DeliveryService ## wakuRelay and wakuLightpushClient are needed to give a mechanism to SendService to re-publish - let sendService = ?SendService.new(useP2PReliability, w) - let recvService = RecvService.new(w) - return ok(DeliveryService(sendService: sendService, recvService: recvService)) + let subscriptionService = SubscriptionService.new(w) + let sendService = ?SendService.new(useP2PReliability, w, subscriptionService) + let recvService = RecvService.new(w, subscriptionService) + + return ok( + DeliveryService( + sendService: sendService, + recvService: recvService, + subscriptionService: subscriptionService, + ) + ) proc startDeliveryService*(self: DeliveryService) = self.sendService.startSendService() diff --git a/waku/node/delivery_service/recv_service/recv_service.nim b/waku/node/delivery_service/recv_service/recv_service.nim index 91fe7ec4e..46c35201e 100644 --- a/waku/node/delivery_service/recv_service/recv_service.nim +++ b/waku/node/delivery_service/recv_service/recv_service.nim @@ -4,6 +4,7 @@ import std/[tables, sequtils, options] import chronos, chronicles, libp2p/utility +import ../[subscription_service] import waku/[ waku_core, @@ -41,6 +42,7 @@ type RecvService* = ref object of RootObj node: WakuNode onSubscribeListener: OnFilterSubscribeEventListener onUnsubscribeListener: OnFilterUnsubscribeEventListener + subscriptionService: SubscriptionService recentReceivedMsgs: seq[RecvMessage] @@ -90,7 +92,6 @@ proc msgChecker(self: RecvService) {.async.} = ## Continuously checks if a message has been received while true: await sleepAsync(StoreCheckPeriod) - self.endTimeToCheck = getNowInNanosecondTime() var msgHashesInStore = newSeq[WakuMessageHash](0) @@ -156,18 +157,24 @@ proc onUnsubscribe( do: error "onUnsubscribe unsubscribing from wrong topic", pubsubTopic, contentTopics -proc new*(T: type RecvService, node: WakuNode): T = +proc new*(T: typedesc[RecvService], node: WakuNode, s: SubscriptionService): T = ## The storeClient will help to acquire any possible missed messages let now = getNowInNanosecondTime() - var recvService = - RecvService(node: node, startTimeToCheck: now, brokerCtx: node.brokerCtx) + var recvService = RecvService( + node: node, + startTimeToCheck: now, + brokerCtx: node.brokerCtx, + subscriptionService: s, + topicsInterest: initTable[PubsubTopic, seq[ContentTopic]](), + recentReceivedMsgs: @[], + ) if not node.wakuFilterClient.isNil(): let filterPushHandler = proc( pubsubTopic: PubsubTopic, message: WakuMessage ) {.async, closure.} = - ## Captures all the messages recived through filter + ## Captures all the messages received through filter let msgHash = computeMessageHash(pubSubTopic, message) let rxMsg = RecvMessage(msgHash: msgHash, rxTime: message.timestamp) diff --git a/waku/node/delivery_service/send_service/delivery_task.nim b/waku/node/delivery_service/send_service/delivery_task.nim index be576f8e9..e245f1841 100644 --- a/waku/node/delivery_service/send_service/delivery_task.nim +++ b/waku/node/delivery_service/send_service/delivery_task.nim @@ -21,7 +21,7 @@ type DeliveryTask* = ref object errorDesc*: string proc create*( - T: type DeliveryTask, + T: typedesc[DeliveryTask], requestId: RequestId, envelop: MessageEnvelope, brokerCtx: BrokerContext, @@ -31,6 +31,7 @@ proc create*( let relayShardRes = ( RequestRelayShard.request(brokerCtx, none[PubsubTopic](), envelop.contentTopic) ).valueOr: + echo "RequestRelayShard.request error", $error return err($error) let pubsubTopic = relayShardRes.relayShard.toPubsubTopic() diff --git a/waku/node/delivery_service/send_service/lightpush_processor.nim b/waku/node/delivery_service/send_service/lightpush_processor.nim index f6f077ff7..1875e4f52 100644 --- a/waku/node/delivery_service/send_service/lightpush_processor.nim +++ b/waku/node/delivery_service/send_service/lightpush_processor.nim @@ -2,10 +2,9 @@ import chronicles, chronos, results import std/options import - waku/waku_node, - waku/waku_core, waku/node/peer_manager, - waku/waku_lightpush/[callbacks, common, client, rpc], + waku/waku_core, + waku/waku_lightpush/[common, client, rpc], waku/common/broker/broker_context import ./[delivery_task, send_processor] @@ -18,7 +17,7 @@ type LightpushSendProcessor* = ref object of BaseSendProcessor lightpushClient: WakuLightPushClient proc new*( - T: type LightpushSendProcessor, + T: typedesc[LightpushSendProcessor], peerManager: PeerManager, lightpushClient: WakuLightPushClient, brokerCtx: BrokerContext, @@ -41,7 +40,9 @@ method sendImpl*( ): Future[void] {.async.} = task.tryCount.inc() info "Trying message delivery via Lightpush", - requestId = task.requestId, msgHash = task.msgHash, tryCount = task.tryCount + requestId = task.requestId, + msgHash = task.msgHash.to0xHex(), + tryCount = task.tryCount let peer = self.peerManager.selectPeer(WakuLightPushCodec, some(task.pubsubTopic)).valueOr: task.state = DeliveryState.NextRoundRetry diff --git a/waku/node/delivery_service/send_service/relay_processor.nim b/waku/node/delivery_service/send_service/relay_processor.nim index 4b826bd93..72a81c905 100644 --- a/waku/node/delivery_service/send_service/relay_processor.nim +++ b/waku/node/delivery_service/send_service/relay_processor.nim @@ -1,6 +1,6 @@ import chronos, chronicles import std/options -import waku/[waku_node, waku_core], waku/waku_lightpush/[common, callbacks, rpc] +import waku/[waku_core], waku/waku_lightpush/[common, rpc] import waku/requests/health_request import waku/common/broker/broker_context import waku/api/types @@ -33,36 +33,42 @@ proc new*( proc isTopicHealthy(self: RelaySendProcessor, topic: PubsubTopic): bool {.gcsafe.} = let healthReport = RequestRelayTopicsHealth.request(self.brokerCtx, @[topic]).valueOr: + error "isTopicHealthy: failed to get health report", topic = topic, error = error return false if healthReport.topicHealth.len() < 1: + warn "isTopicHealthy: no topic health entries", topic = topic return false let health = healthReport.topicHealth[0].health + debug "isTopicHealthy: topic health is ", topic = topic, health = health return health == MINIMALLY_HEALTHY or health == SUFFICIENTLY_HEALTHY method isValidProcessor*( self: RelaySendProcessor, task: DeliveryTask ): bool {.gcsafe.} = - return self.isTopicHealthy(task.pubsubTopic) + # Topic health query is not reliable enough after a fresh subscribe... + # return self.isTopicHealthy(task.pubsubTopic) + return true method sendImpl*(self: RelaySendProcessor, task: DeliveryTask): Future[void] {.async.} = task.tryCount.inc() info "Trying message delivery via Relay", - requestId = task.requestId, msgHash = task.msgHash, tryCount = task.tryCount + requestId = task.requestId, + msgHash = task.msgHash.to0xHex(), + tryCount = task.tryCount - let pushResult = await self.publishProc(task.pubsubTopic, task.msg) - if pushResult.isErr(): - let errorMessage = pushResult.error.desc.get($pushResult.error.code) + let noOfPublishedPeers = (await self.publishProc(task.pubsubTopic, task.msg)).valueOr: + let errorMessage = error.desc.get($error.code) error "Failed to publish message with relay", request = task.requestId, msgHash = task.msgHash, error = errorMessage - if pushResult.error.code != LightPushErrorCode.NO_PEERS_TO_RELAY: + if error.code != LightPushErrorCode.NO_PEERS_TO_RELAY: task.state = DeliveryState.FailedToDeliver task.errorDesc = errorMessage else: task.state = self.fallbackStateToSet return - if pushResult.isOk and pushResult.get() > 0: + if noOfPublishedPeers > 0: info "Message propagated via Relay", requestId = task.requestId, msgHash = task.msgHash task.state = DeliveryState.SuccessfullyPropagated diff --git a/waku/node/delivery_service/send_service/send_service.nim b/waku/node/delivery_service/send_service/send_service.nim index 1a54da4a7..ed05e0583 100644 --- a/waku/node/delivery_service/send_service/send_service.nim +++ b/waku/node/delivery_service/send_service/send_service.nim @@ -5,18 +5,17 @@ import std/[sequtils, tables, options] import chronos, chronicles, libp2p/utility import ./[send_processor, relay_processor, lightpush_processor, delivery_task], + ../[subscription_service], waku/[ waku_core, node/waku_node, node/peer_manager, waku_store/client, waku_store/common, - waku_archive/archive, waku_relay/protocol, waku_rln_relay/rln_relay, waku_lightpush/client, waku_lightpush/callbacks, - events/delivery_events, events/message_events, common/broker/broker_context, ] @@ -59,6 +58,7 @@ type SendService* = ref object of RootObj node: WakuNode checkStoreForMessages: bool + subscriptionService: SubscriptionService proc setupSendProcessorChain( peerManager: PeerManager, @@ -88,14 +88,14 @@ proc setupSendProcessorChain( processors.add(LightpushSendProcessor.new(peerManager, lightpushClient, brokerCtx)) var currentProcessor: BaseSendProcessor = processors[0] - for i in 1 ..< processors.len(): + for i in 1 ..< processors.len: currentProcessor.chain(processors[i]) currentProcessor = processors[i] return ok(processors[0]) proc new*( - T: type SendService, preferP2PReliability: bool, w: WakuNode + T: type SendService, preferP2PReliability: bool, w: WakuNode, s: SubscriptionService ): Result[T, string] = if w.wakuRelay.isNil() and w.wakuLightpushClient.isNil(): return err( @@ -116,6 +116,7 @@ proc new*( sendProcessor: sendProcessorChain, node: w, checkStoreForMessages: checkStoreForMessages, + subscriptionService: s, ) return ok(sendService) @@ -173,18 +174,21 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) = of DeliveryState.SuccessfullyPropagated: # TODO: in case of of unable to strore check messages shall we report success instead? info "Message successfully propagated", - requestId = task.requestId, msgHash = task.msgHash - MessagePropagatedEvent.emit(self.brokerCtx, task.requestId, task.msgHash.toString()) + requestId = task.requestId, msgHash = task.msgHash.to0xHex() + MessagePropagatedEvent.emit(self.brokerCtx, task.requestId, task.msgHash.to0xHex()) return of DeliveryState.SuccessfullyValidated: - info "Message successfully sent", requestId = task.requestId, msgHash = task.msgHash - MessageSentEvent.emit(self.brokerCtx, task.requestId, task.msgHash.toString()) + info "Message successfully sent", + requestId = task.requestId, msgHash = task.msgHash.to0xHex() + MessageSentEvent.emit(self.brokerCtx, task.requestId, task.msgHash.to0xHex()) return of DeliveryState.FailedToDeliver: error "Failed to send message", - requestId = task.requestId, msgHash = task.msgHash, error = task.errorDesc + requestId = task.requestId, + msgHash = task.msgHash.to0xHex(), + error = task.errorDesc MessageErrorEvent.emit( - self.brokerCtx, task.requestId, task.msgHash.toString(), task.errorDesc + self.brokerCtx, task.requestId, task.msgHash.to0xHex(), task.errorDesc ) return else: @@ -193,19 +197,22 @@ proc reportTaskResult(self: SendService, task: DeliveryTask) = if task.messageAge() > MaxTimeInCache: error "Failed to send message", - requestId = task.requestId, msgHash = task.msgHash, error = "Message too old" + requestId = task.requestId, + msgHash = task.msgHash.to0xHex(), + error = "Message too old", + age = task.messageAge() task.state = DeliveryState.FailedToDeliver MessageErrorEvent.emit( self.brokerCtx, task.requestId, - task.msgHash.toString(), + task.msgHash.to0xHex(), "Unable to send within retry time window", ) proc evaluateAndCleanUp(self: SendService) = self.taskCache.forEach(self.reportTaskResult(it)) self.taskCache.keepItIf( - it.state != DeliveryState.SuccessfullyValidated or + it.state != DeliveryState.SuccessfullyValidated and it.state != DeliveryState.FailedToDeliver ) @@ -238,9 +245,16 @@ proc stopSendService*(self: SendService) = if not self.serviceLoopHandle.isNil(): discard self.serviceLoopHandle.cancelAndWait() -proc send*(self: SendService, task: DeliveryTask): Future[void] {.async.} = +proc send*(self: SendService, task: DeliveryTask) {.async.} = assert(not task.isNil(), "task for send must not be nil") + info "SendService.send: processing delivery task", + requestId = task.requestId, msgHash = task.msgHash + + self.subscriptionService.subscribe(task.msg.contentTopic).isOkOr: + error "SendService.send: failed to subscribe to content topic", + contentTopic = task.msg.contentTopic, error = error + await self.sendProcessor.process(task) reportTaskResult(self, task) if task.state != DeliveryState.FailedToDeliver: diff --git a/waku/node/delivery_service/subscription_service.nim b/waku/node/delivery_service/subscription_service.nim new file mode 100644 index 000000000..78763161b --- /dev/null +++ b/waku/node/delivery_service/subscription_service.nim @@ -0,0 +1,64 @@ +import chronos, chronicles +import + waku/[ + waku_core, + waku_core/topics, + events/message_events, + waku_node, + common/broker/broker_context, + ] + +type SubscriptionService* = ref object of RootObj + brokerCtx: BrokerContext + node: WakuNode + +proc new*(T: typedesc[SubscriptionService], node: WakuNode): T = + ## The storeClient will help to acquire any possible missed messages + + return SubscriptionService(brokerCtx: node.brokerCtx, node: node) + +proc isSubscribed*( + self: SubscriptionService, topic: ContentTopic +): Result[bool, string] = + var isSubscribed = false + if self.node.wakuRelay.isNil() == false: + return self.node.isSubscribed((kind: ContentSub, topic: topic)) + + # TODO: Add support for edge mode with Filter subscription management + return ok(isSubscribed) + +#TODO: later PR may consider to refactor or place this function elsewhere +# The only important part is that it emits MessageReceivedEvent +proc getReceiveHandler(self: SubscriptionService): WakuRelayHandler = + return proc(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async, gcsafe.} = + let msgHash = computeMessageHash(topic, msg).to0xHex() + info "API received message", + pubsubTopic = topic, contentTopic = msg.contentTopic, msgHash = msgHash + + MessageReceivedEvent.emit(self.brokerCtx, msgHash, msg) + +proc subscribe*(self: SubscriptionService, topic: ContentTopic): Result[void, string] = + let isSubscribed = self.isSubscribed(topic).valueOr: + error "Failed to check subscription status: ", error = error + return err("Failed to check subscription status: " & error) + + if isSubscribed == false: + if self.node.wakuRelay.isNil() == false: + self.node.subscribe((kind: ContentSub, topic: topic), self.getReceiveHandler()).isOkOr: + error "Failed to subscribe: ", error = error + return err("Failed to subscribe: " & error) + + # TODO: Add support for edge mode with Filter subscription management + + return ok() + +proc unsubscribe*( + self: SubscriptionService, topic: ContentTopic +): Result[void, string] = + if self.node.wakuRelay.isNil() == false: + self.node.unsubscribe((kind: ContentSub, topic: topic)).isOkOr: + error "Failed to unsubscribe: ", error = error + return err("Failed to unsubscribe: " & error) + + # TODO: Add support for edge mode with Filter subscription management + return ok() diff --git a/waku/node/kernel_api/relay.nim b/waku/node/kernel_api/relay.nim index 819bbb7b9..a2940db0c 100644 --- a/waku/node/kernel_api/relay.nim +++ b/waku/node/kernel_api/relay.nim @@ -30,6 +30,8 @@ import ../peer_manager, ../../waku_rln_relay +export waku_relay.WakuRelayHandler + declarePublicHistogram waku_histogram_message_size, "message size histogram in kB", buckets = [ @@ -91,6 +93,23 @@ proc registerRelayHandler( node.wakuRelay.subscribe(topic, uniqueTopicHandler) +proc getTopicOfSubscriptionEvent( + node: WakuNode, subscription: SubscriptionEvent +): Result[(PubsubTopic, Option[ContentTopic]), string] = + case subscription.kind + of ContentSub, ContentUnsub: + if node.wakuAutoSharding.isSome(): + let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr: + return err("Autosharding error: " & error) + return ok(($shard, some(subscription.topic))) + else: + return + err("Static sharding is used, relay subscriptions must specify a pubsub topic") + of PubsubSub, PubsubUnsub: + return ok((subscription.topic, none[ContentTopic]())) + else: + return err("Unsupported subscription type in relay getTopicOfSubscriptionEvent") + proc subscribe*( node: WakuNode, subscription: SubscriptionEvent, handler: WakuRelayHandler ): Result[void, string] = @@ -101,27 +120,15 @@ proc subscribe*( error "Invalid API call to `subscribe`. WakuRelay not mounted." return err("Invalid API call to `subscribe`. WakuRelay not mounted.") - let (pubsubTopic, contentTopicOp) = - case subscription.kind - of ContentSub: - if node.wakuAutoSharding.isSome(): - let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr: - error "Autosharding error", error = error - return err("Autosharding error: " & error) - ($shard, some(subscription.topic)) - else: - return err( - "Static sharding is used, relay subscriptions must specify a pubsub topic" - ) - of PubsubSub: - (subscription.topic, none(ContentTopic)) - else: - return err("Unsupported subscription type in relay subscribe") + let (pubsubTopic, contentTopicOp) = getTopicOfSubscriptionEvent(node, subscription).valueOr: + error "Failed to decode subscription event", error = error + return err("Failed to decode subscription event: " & error) if node.wakuRelay.isSubscribed(pubsubTopic): warn "No-effect API call to subscribe. Already subscribed to topic", pubsubTopic return ok() + info "subscribe", pubsubTopic, contentTopicOp node.registerRelayHandler(pubsubTopic, handler) node.topicSubscriptionQueue.emit((kind: PubsubSub, topic: pubsubTopic)) @@ -136,22 +143,9 @@ proc unsubscribe*( error "Invalid API call to `unsubscribe`. WakuRelay not mounted." return err("Invalid API call to `unsubscribe`. WakuRelay not mounted.") - let (pubsubTopic, contentTopicOp) = - case subscription.kind - of ContentUnsub: - if node.wakuAutoSharding.isSome(): - let shard = node.wakuAutoSharding.get().getShard((subscription.topic)).valueOr: - error "Autosharding error", error = error - return err("Autosharding error: " & error) - ($shard, some(subscription.topic)) - else: - return err( - "Static sharding is used, relay subscriptions must specify a pubsub topic" - ) - of PubsubUnsub: - (subscription.topic, none(ContentTopic)) - else: - return err("Unsupported subscription type in relay unsubscribe") + let (pubsubTopic, contentTopicOp) = getTopicOfSubscriptionEvent(node, subscription).valueOr: + error "Failed to decode unsubscribe event", error = error + return err("Failed to decode unsubscribe event: " & error) if not node.wakuRelay.isSubscribed(pubsubTopic): warn "No-effect API call to `unsubscribe`. Was not subscribed", pubsubTopic @@ -163,6 +157,19 @@ proc unsubscribe*( return ok() +proc isSubscribed*( + node: WakuNode, subscription: SubscriptionEvent +): Result[bool, string] = + if node.wakuRelay.isNil(): + error "Invalid API call to `isSubscribed`. WakuRelay not mounted." + return err("Invalid API call to `isSubscribed`. WakuRelay not mounted.") + + let (pubsubTopic, contentTopicOp) = getTopicOfSubscriptionEvent(node, subscription).valueOr: + error "Failed to decode subscription event", error = error + return err("Failed to decode subscription event: " & error) + + return ok(node.wakuRelay.isSubscribed(pubsubTopic)) + proc publish*( node: WakuNode, pubsubTopicOp: Option[PubsubTopic], message: WakuMessage ): Future[Result[int, string]] {.async, gcsafe.} = diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index 49ed9072c..47e8bb2ad 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -145,9 +145,14 @@ proc deduceRelayShard( let pubsubTopic = pubsubTopicOp.valueOr: if node.wakuAutoSharding.isNone(): return err("Pubsub topic must be specified when static sharding is enabled.") - node.wakuAutoSharding.get().getShard(contentTopic).valueOr: - let msg = "Deducing shard failed: " & error - return err(msg) + let shard = node.wakuAutoSharding.get().getShard(contentTopic).valueOr: + let msg = "Deducing shard failed: " & error + return err(msg) + return ok(shard) + + let shard = RelayShard.parse(pubsubTopic).valueOr: + return err("Invalid topic:" & pubsubTopic & " " & $error) + return ok(shard) proc getShardsGetter(node: WakuNode): GetShards = return proc(): seq[uint16] {.closure, gcsafe, raises: [].} = diff --git a/waku/waku_core/message/digest.nim b/waku/waku_core/message/digest.nim index dd8d9433a..3f82ce8f6 100644 --- a/waku/waku_core/message/digest.nim +++ b/waku/waku_core/message/digest.nim @@ -19,7 +19,7 @@ func shortLog*(hash: WakuMessageHash): string = func `$`*(hash: WakuMessageHash): string = shortLog(hash) -func toString*(hash: WakuMessageHash): string = +func to0xHex*(hash: WakuMessageHash): string = var hexhash = newStringOfCap(64) hexhash &= hash.toOpenArray(hash.low, hash.high).to0xHex() hexhash diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 2604b32db..3f343269a 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -584,6 +584,7 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle procCall GossipSub(w).subscribe(pubsubTopic, topicHandler) w.topicHandlers[pubsubTopic] = topicHandler + asyncSpawn w.updateTopicsHealth() proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) = ## Unsubscribe all handlers on this pubsub topic