From 36fdba3d41b9882d08b710cb76120fdd2c8cb3da Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Thu, 15 Jan 2026 01:46:58 +0100 Subject: [PATCH] Fix brokerCtx settings for all usedbrokers, cover locked node init --- tests/api/test_api_send.nim | 89 +++++++++++-------- waku/factory/waku.nim | 32 ++++++- .../send_service/delivery_task.nim | 4 +- waku/node/waku_node.nim | 8 +- waku/requests/node_requests.nim | 4 +- waku/waku_relay/protocol.nim | 6 +- 6 files changed, 94 insertions(+), 49 deletions(-) diff --git a/tests/api/test_api_send.nim b/tests/api/test_api_send.nim index 71a305b38..b74a8be47 100644 --- a/tests/api/test_api_send.nim +++ b/tests/api/test_api_send.nim @@ -9,14 +9,14 @@ import import ../testlib/[common, wakucore, wakunode, testasync, futures, testutils] import waku, - waku/ - [ - waku_node, - waku_core, - waku_relay/protocol, - waku_filter_v2/common, - waku_store/common, - ] + waku/[ + waku_node, + waku_core, + waku_relay/protocol, + waku_filter_v2/common, + waku_store/common, + common/broker/broker_context, + ] import waku/api/api_conf, waku/factory/waku_conf, waku/factory/networks_config suite "Waku API - Send": @@ -45,34 +45,36 @@ suite "Waku API - Send": # handlerFuture.complete((pubsubTopic, message)) # return ok() - relayNode1 = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - relayNode2 = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + lockNewGlobalBrokerContext: + relayNode1 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + await relayNode1.start() + (await relayNode1.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" - lightpushNode = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - storeNode = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + lockNewGlobalBrokerContext: + relayNode2 = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + await relayNode2.start() + (await relayNode2.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" - await allFutures( - relayNode1.start(), relayNode2.start(), lightpushNode.start(), storeNode.start() - ) + lockNewGlobalBrokerContext: + lightpushNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + await lightpushNode.start() + (await lightpushNode.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + (await lightpushNode.mountLightPush()).isOkOr: + raiseAssert "Failed to mount lightpush" - (await relayNode1.mountRelay()).isOkOr: - raiseAssert "Failed to mount relay" - - (await relayNode2.mountRelay()).isOkOr: - raiseAssert "Failed to mount relay" - - (await lightpushNode.mountRelay()).isOkOr: - raiseAssert "Failed to mount relay" - (await lightpushNode.mountLightPush()).isOkOr: - raiseAssert "Failed to mount lightpush" - - (await storeNode.mountRelay()).isOkOr: - raiseAssert "Failed to mount relay" - await storeNode.mountStore() + lockNewGlobalBrokerContext: + storeNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + await storeNode.start() + (await storeNode.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + await storeNode.mountStore() relayNode1PeerInfo = relayNode1.peerInfo.toRemotePeerInfo() relayNode1PeerId = relayNode1.peerInfo.peerId @@ -85,6 +87,7 @@ suite "Waku API - Send": storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo() storeNodePeerId = storeNode.peerInfo.peerId + asyncTeardown: await allFutures( relayNode1.stop(), relayNode2.stop(), lightpushNode.stop(), storeNode.stop() @@ -111,30 +114,40 @@ suite "Waku API - Send": wakuConf.clusterId == 1 wakuConf.shardingConf.numShardsInCluster == 1 - var node = (await createNode(nodeConfig)).valueOr: - raiseAssert error + var node: Waku + lockNewGlobalBrokerContext: + node = (await createNode(nodeConfig)).valueOr: + raiseAssert error + (await startWaku(addr node)).isOkOr: + raiseAssert "Failed to start Waku node: " & error let sentListener = MessageSentEvent.listen( + node.brokerCtx, proc(event: MessageSentEvent) {.async: (raises: []).} = raiseAssert "Should not be called" + , ).valueOr: raiseAssert error let errorListener = MessageErrorEvent.listen( + node.brokerCtx, proc(event: MessageErrorEvent) {.async: (raises: []).} = check true + , ).valueOr: raiseAssert error let propagatedListener = MessagePropagatedEvent.listen( + node.brokerCtx, proc(event: MessagePropagatedEvent) {.async: (raises: []).} = raiseAssert "Should not be called" + , ).valueOr: raiseAssert error defer: - MessageSentEvent.dropListener(sentListener) - MessageErrorEvent.dropListener(errorListener) - MessagePropagatedEvent.dropListener(propagatedListener) + MessageSentEvent.dropListener(node.brokerCtx, sentListener) + MessageErrorEvent.dropListener(node.brokerCtx, errorListener) + MessagePropagatedEvent.dropListener(node.brokerCtx, propagatedListener) let envelope = MessageEnvelope.init( ContentTopic("/waku/2/default-content/proto"), "test payload" diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 0a4f60324..7eb62b556 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -43,7 +43,9 @@ import ../factory/app_callbacks, ../waku_enr/multiaddr, ./waku_conf, - ../common/broker/broker_context + ../common/broker/broker_context, + ../requests/health_request, + ../api/types logScope: topics = "wakunode waku" @@ -413,6 +415,31 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: waku[].healthMonitor.startHealthMonitor().isOkOr: return err("failed to start health monitor: " & $error) + ## Setup RequestNodeHealth provider + + RequestNodeHealth.setProvider( + globalBrokerContext(), + proc(): Result[RequestNodeHealth, string] = + let healthReportFut = waku[].healthMonitor.getNodeHealthReport() + if not healthReportFut.completed(): + return err("Health report not available") + try: + let healthReport = healthReportFut.read() + # Convert HealthStatus to NodeHealth + let nodeHealth = + case healthReport.nodeHealth + of HealthStatus.READY: + NodeHealth.Healthy + of HealthStatus.SYNCHRONIZING, HealthStatus.INITIALIZING: + NodeHealth.MinimallyHealthy + else: + NodeHealth.Unhealthy + ok(RequestNodeHealth(healthStatus: nodeHealth)) + except CatchableError: + err("Failed to read health report: " & getCurrentExceptionMsg()), + ).isOkOr: + error "Failed to set RequestNodeHealth provider", error = error + if conf.restServerConf.isSome(): rest_server_builder.startRestServerProtocolSupport( waku[].restServer, @@ -469,6 +496,9 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = if not waku.healthMonitor.isNil(): await waku.healthMonitor.stopHealthMonitor() + ## Clear RequestNodeHealth provider + RequestNodeHealth.clearProvider(waku.brokerCtx) + if not waku.restServer.isNil(): await waku.restServer.stop() except Exception: diff --git a/waku/node/delivery_service/send_service/delivery_task.nim b/waku/node/delivery_service/send_service/delivery_task.nim index 0f3c9d902..be576f8e9 100644 --- a/waku/node/delivery_service/send_service/delivery_task.nim +++ b/waku/node/delivery_service/send_service/delivery_task.nim @@ -29,9 +29,7 @@ proc create*( let msg = envelop.toWakuMessage() # TODO: use sync request for such as soon as available let relayShardRes = ( - waitFor RequestRelayShard.request( - brokerCtx, none[PubsubTopic](), envelop.contentTopic - ) + RequestRelayShard.request(brokerCtx, none[PubsubTopic](), envelop.contentTopic) ).valueOr: return err($error) diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index e946fbb35..49ed9072c 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -60,6 +60,7 @@ import common/broker/broker_context, waku_mix, requests/node_requests, + common/broker/broker_context, ], ./net_config, ./peer_manager @@ -466,17 +467,18 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] proc startProvidersAndListeners*(node: WakuNode) = RequestRelayShard.setProvider( + node.brokerCtx, proc( pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic - ): Future[Result[RequestRelayShard, string]] {.async.} = + ): Result[RequestRelayShard, string] = let shard = node.deduceRelayShard(contentTopic, pubsubTopic).valueOr: return err($error) - return ok(RequestRelayShard(relayShard: shard)) + return ok(RequestRelayShard(relayShard: shard)), ).isOkOr: error "Can't set proveder for RequestRelayShard", error = error proc stopProvidersAndListeners*(node: WakuNode) = - RequestRelayShard.clearProvider() + RequestRelayShard.clearProvider(node.brokerCtx) proc start*(node: WakuNode) {.async.} = ## Starts a created Waku Node and diff --git a/waku/requests/node_requests.nim b/waku/requests/node_requests.nim index 48b5617fb..a4ccc6de4 100644 --- a/waku/requests/node_requests.nim +++ b/waku/requests/node_requests.nim @@ -2,10 +2,10 @@ import std/options import waku/common/broker/[request_broker, multi_request_broker] import waku/waku_core/[topics] -RequestBroker: +RequestBroker(sync): type RequestRelayShard* = object relayShard*: RelayShard proc signature( pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic - ): Future[Result[RequestRelayShard, string]] {.async.} + ): Result[RequestRelayShard, string] diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 812bf204a..2604b32db 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -22,7 +22,8 @@ import waku/waku_core, waku/node/health_monitor/topic_health, waku/requests/health_request, - ./message_id + ./message_id, + waku/common/broker/broker_context from ../waku_core/codecs import WakuRelayCodec export WakuRelayCodec @@ -326,12 +327,13 @@ proc initRelayObservers(w: WakuRelay) = proc initRequestProviders(w: WakuRelay) = RequestRelayTopicsHealth.setProvider( + globalBrokerContext(), proc(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] = var collectedRes: RequestRelayTopicsHealth for topic in topics: let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED) collectedRes.topicHealth.add((topic, health)) - return ok(collectedRes) + return ok(collectedRes), ).isOkOr: error "Cannot set Relay Topics Health request provider", error = error