Fix brokerCtx settings for all usedbrokers, cover locked node init

This commit is contained in:
NagyZoltanPeter 2026-01-15 01:46:58 +01:00
parent d27ce3700b
commit 36fdba3d41
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
6 changed files with 94 additions and 49 deletions

View File

@ -9,14 +9,14 @@ import
import ../testlib/[common, wakucore, wakunode, testasync, futures, testutils] import ../testlib/[common, wakucore, wakunode, testasync, futures, testutils]
import import
waku, waku,
waku/ waku/[
[ waku_node,
waku_node, waku_core,
waku_core, waku_relay/protocol,
waku_relay/protocol, waku_filter_v2/common,
waku_filter_v2/common, waku_store/common,
waku_store/common, common/broker/broker_context,
] ]
import waku/api/api_conf, waku/factory/waku_conf, waku/factory/networks_config import waku/api/api_conf, waku/factory/waku_conf, waku/factory/networks_config
suite "Waku API - Send": suite "Waku API - Send":
@ -45,34 +45,36 @@ suite "Waku API - Send":
# handlerFuture.complete((pubsubTopic, message)) # handlerFuture.complete((pubsubTopic, message))
# return ok() # return ok()
relayNode1 = lockNewGlobalBrokerContext:
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) relayNode1 =
relayNode2 = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) await relayNode1.start()
(await relayNode1.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
lightpushNode = lockNewGlobalBrokerContext:
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) relayNode2 =
storeNode = newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) await relayNode2.start()
(await relayNode2.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
await allFutures( lockNewGlobalBrokerContext:
relayNode1.start(), relayNode2.start(), lightpushNode.start(), storeNode.start() lightpushNode =
) newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
await lightpushNode.start()
(await lightpushNode.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
(await lightpushNode.mountLightPush()).isOkOr:
raiseAssert "Failed to mount lightpush"
(await relayNode1.mountRelay()).isOkOr: lockNewGlobalBrokerContext:
raiseAssert "Failed to mount relay" storeNode =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
(await relayNode2.mountRelay()).isOkOr: await storeNode.start()
raiseAssert "Failed to mount relay" (await storeNode.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
(await lightpushNode.mountRelay()).isOkOr: await storeNode.mountStore()
raiseAssert "Failed to mount relay"
(await lightpushNode.mountLightPush()).isOkOr:
raiseAssert "Failed to mount lightpush"
(await storeNode.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
await storeNode.mountStore()
relayNode1PeerInfo = relayNode1.peerInfo.toRemotePeerInfo() relayNode1PeerInfo = relayNode1.peerInfo.toRemotePeerInfo()
relayNode1PeerId = relayNode1.peerInfo.peerId relayNode1PeerId = relayNode1.peerInfo.peerId
@ -85,6 +87,7 @@ suite "Waku API - Send":
storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo() storeNodePeerInfo = storeNode.peerInfo.toRemotePeerInfo()
storeNodePeerId = storeNode.peerInfo.peerId storeNodePeerId = storeNode.peerInfo.peerId
asyncTeardown: asyncTeardown:
await allFutures( await allFutures(
relayNode1.stop(), relayNode2.stop(), lightpushNode.stop(), storeNode.stop() relayNode1.stop(), relayNode2.stop(), lightpushNode.stop(), storeNode.stop()
@ -111,30 +114,40 @@ suite "Waku API - Send":
wakuConf.clusterId == 1 wakuConf.clusterId == 1
wakuConf.shardingConf.numShardsInCluster == 1 wakuConf.shardingConf.numShardsInCluster == 1
var node = (await createNode(nodeConfig)).valueOr: var node: Waku
raiseAssert error lockNewGlobalBrokerContext:
node = (await createNode(nodeConfig)).valueOr:
raiseAssert error
(await startWaku(addr node)).isOkOr:
raiseAssert "Failed to start Waku node: " & error
let sentListener = MessageSentEvent.listen( let sentListener = MessageSentEvent.listen(
node.brokerCtx,
proc(event: MessageSentEvent) {.async: (raises: []).} = proc(event: MessageSentEvent) {.async: (raises: []).} =
raiseAssert "Should not be called" raiseAssert "Should not be called"
,
).valueOr: ).valueOr:
raiseAssert error raiseAssert error
let errorListener = MessageErrorEvent.listen( let errorListener = MessageErrorEvent.listen(
node.brokerCtx,
proc(event: MessageErrorEvent) {.async: (raises: []).} = proc(event: MessageErrorEvent) {.async: (raises: []).} =
check true check true
,
).valueOr: ).valueOr:
raiseAssert error raiseAssert error
let propagatedListener = MessagePropagatedEvent.listen( let propagatedListener = MessagePropagatedEvent.listen(
node.brokerCtx,
proc(event: MessagePropagatedEvent) {.async: (raises: []).} = proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
raiseAssert "Should not be called" raiseAssert "Should not be called"
,
).valueOr: ).valueOr:
raiseAssert error raiseAssert error
defer: defer:
MessageSentEvent.dropListener(sentListener) MessageSentEvent.dropListener(node.brokerCtx, sentListener)
MessageErrorEvent.dropListener(errorListener) MessageErrorEvent.dropListener(node.brokerCtx, errorListener)
MessagePropagatedEvent.dropListener(propagatedListener) MessagePropagatedEvent.dropListener(node.brokerCtx, propagatedListener)
let envelope = MessageEnvelope.init( let envelope = MessageEnvelope.init(
ContentTopic("/waku/2/default-content/proto"), "test payload" ContentTopic("/waku/2/default-content/proto"), "test payload"

View File

@ -43,7 +43,9 @@ import
../factory/app_callbacks, ../factory/app_callbacks,
../waku_enr/multiaddr, ../waku_enr/multiaddr,
./waku_conf, ./waku_conf,
../common/broker/broker_context ../common/broker/broker_context,
../requests/health_request,
../api/types
logScope: logScope:
topics = "wakunode waku" topics = "wakunode waku"
@ -413,6 +415,31 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
waku[].healthMonitor.startHealthMonitor().isOkOr: waku[].healthMonitor.startHealthMonitor().isOkOr:
return err("failed to start health monitor: " & $error) return err("failed to start health monitor: " & $error)
## Setup RequestNodeHealth provider
RequestNodeHealth.setProvider(
globalBrokerContext(),
proc(): Result[RequestNodeHealth, string] =
let healthReportFut = waku[].healthMonitor.getNodeHealthReport()
if not healthReportFut.completed():
return err("Health report not available")
try:
let healthReport = healthReportFut.read()
# Convert HealthStatus to NodeHealth
let nodeHealth =
case healthReport.nodeHealth
of HealthStatus.READY:
NodeHealth.Healthy
of HealthStatus.SYNCHRONIZING, HealthStatus.INITIALIZING:
NodeHealth.MinimallyHealthy
else:
NodeHealth.Unhealthy
ok(RequestNodeHealth(healthStatus: nodeHealth))
except CatchableError:
err("Failed to read health report: " & getCurrentExceptionMsg()),
).isOkOr:
error "Failed to set RequestNodeHealth provider", error = error
if conf.restServerConf.isSome(): if conf.restServerConf.isSome():
rest_server_builder.startRestServerProtocolSupport( rest_server_builder.startRestServerProtocolSupport(
waku[].restServer, waku[].restServer,
@ -469,6 +496,9 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if not waku.healthMonitor.isNil(): if not waku.healthMonitor.isNil():
await waku.healthMonitor.stopHealthMonitor() await waku.healthMonitor.stopHealthMonitor()
## Clear RequestNodeHealth provider
RequestNodeHealth.clearProvider(waku.brokerCtx)
if not waku.restServer.isNil(): if not waku.restServer.isNil():
await waku.restServer.stop() await waku.restServer.stop()
except Exception: except Exception:

View File

@ -29,9 +29,7 @@ proc create*(
let msg = envelop.toWakuMessage() let msg = envelop.toWakuMessage()
# TODO: use sync request for such as soon as available # TODO: use sync request for such as soon as available
let relayShardRes = ( let relayShardRes = (
waitFor RequestRelayShard.request( RequestRelayShard.request(brokerCtx, none[PubsubTopic](), envelop.contentTopic)
brokerCtx, none[PubsubTopic](), envelop.contentTopic
)
).valueOr: ).valueOr:
return err($error) return err($error)

View File

@ -60,6 +60,7 @@ import
common/broker/broker_context, common/broker/broker_context,
waku_mix, waku_mix,
requests/node_requests, requests/node_requests,
common/broker/broker_context,
], ],
./net_config, ./net_config,
./peer_manager ./peer_manager
@ -466,17 +467,18 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string]
proc startProvidersAndListeners*(node: WakuNode) = proc startProvidersAndListeners*(node: WakuNode) =
RequestRelayShard.setProvider( RequestRelayShard.setProvider(
node.brokerCtx,
proc( proc(
pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic
): Future[Result[RequestRelayShard, string]] {.async.} = ): Result[RequestRelayShard, string] =
let shard = node.deduceRelayShard(contentTopic, pubsubTopic).valueOr: let shard = node.deduceRelayShard(contentTopic, pubsubTopic).valueOr:
return err($error) return err($error)
return ok(RequestRelayShard(relayShard: shard)) return ok(RequestRelayShard(relayShard: shard)),
).isOkOr: ).isOkOr:
error "Can't set proveder for RequestRelayShard", error = error error "Can't set proveder for RequestRelayShard", error = error
proc stopProvidersAndListeners*(node: WakuNode) = proc stopProvidersAndListeners*(node: WakuNode) =
RequestRelayShard.clearProvider() RequestRelayShard.clearProvider(node.brokerCtx)
proc start*(node: WakuNode) {.async.} = proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and ## Starts a created Waku Node and

View File

@ -2,10 +2,10 @@ import std/options
import waku/common/broker/[request_broker, multi_request_broker] import waku/common/broker/[request_broker, multi_request_broker]
import waku/waku_core/[topics] import waku/waku_core/[topics]
RequestBroker: RequestBroker(sync):
type RequestRelayShard* = object type RequestRelayShard* = object
relayShard*: RelayShard relayShard*: RelayShard
proc signature( proc signature(
pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic pubsubTopic: Option[PubsubTopic], contentTopic: ContentTopic
): Future[Result[RequestRelayShard, string]] {.async.} ): Result[RequestRelayShard, string]

View File

@ -22,7 +22,8 @@ import
waku/waku_core, waku/waku_core,
waku/node/health_monitor/topic_health, waku/node/health_monitor/topic_health,
waku/requests/health_request, waku/requests/health_request,
./message_id ./message_id,
waku/common/broker/broker_context
from ../waku_core/codecs import WakuRelayCodec from ../waku_core/codecs import WakuRelayCodec
export WakuRelayCodec export WakuRelayCodec
@ -326,12 +327,13 @@ proc initRelayObservers(w: WakuRelay) =
proc initRequestProviders(w: WakuRelay) = proc initRequestProviders(w: WakuRelay) =
RequestRelayTopicsHealth.setProvider( RequestRelayTopicsHealth.setProvider(
globalBrokerContext(),
proc(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] = proc(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] =
var collectedRes: RequestRelayTopicsHealth var collectedRes: RequestRelayTopicsHealth
for topic in topics: for topic in topics:
let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED) let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED)
collectedRes.topicHealth.add((topic, health)) collectedRes.topicHealth.add((topic, health))
return ok(collectedRes) return ok(collectedRes),
).isOkOr: ).isOkOr:
error "Cannot set Relay Topics Health request provider", error = error error "Cannot set Relay Topics Health request provider", error = error