feat: implement Waku API Health spec (#3689)

* Fix protocol strength metric to consider connected peers only
* Remove polling loop; event-driven node connection health updates
* Remove 10s WakuRelay topic health polling loop; now event-driven
* Change NodeHealthStatus to ConnectionStatus
* Change new nodeState (rest API /health) field to connectionStatus
* Add getSyncProtocolHealthInfo and getSyncNodeHealthReport
* Add ConnectionStatusChangeEvent
* Add RequestHealthReport
* Refactor sync/async protocol health queries in the health monitor
* Add EventRelayTopicHealthChange
* Add EventWakuPeer emitted by PeerManager
* Add Edge support for topics health requests and events
* Rename "RelayTopic" -> "Topic"
* Add RequestContentTopicsHealth sync request
* Add EventContentTopicHealthChange
* Rename RequestTopicsHealth -> RequestShardTopicsHealth
* Remove health check gating from checkApiAvailability
* Add basic health smoke tests
* Other misc improvements, refactors, fixes

Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com>
Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
Fabiana Cecin 2026-02-12 14:52:39 -03:00 committed by GitHub
parent dd8dc7429d
commit 1fb4d1eab0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
32 changed files with 1727 additions and 394 deletions

View File

@ -0,0 +1,19 @@
{.push raises: [].}
import system, std/json
import ./json_base_event
import ../../waku/api/types
type JsonConnectionStatusChangeEvent* = ref object of JsonEvent
status*: ConnectionStatus
proc new*(
T: type JsonConnectionStatusChangeEvent, status: ConnectionStatus
): T =
return JsonConnectionStatusChangeEvent(
eventType: "node_health_change",
status: status
)
method `$`*(event: JsonConnectionStatusChangeEvent): string =
$(%*event)

View File

@ -7,9 +7,11 @@ import
./events/json_message_event,
./events/json_topic_health_change_event,
./events/json_connection_change_event,
./events/json_connection_status_change_event,
../waku/factory/app_callbacks,
waku/factory/waku,
waku/node/waku_node,
waku/node/health_monitor/health_status,
./declare_lib
################################################################################
@ -61,10 +63,16 @@ proc waku_new(
callEventCallback(ctx, "onConnectionChange"):
$JsonConnectionChangeEvent.new($peerId, peerEvent)
proc onConnectionStatusChange(ctx: ptr FFIContext): ConnectionStatusChangeHandler =
return proc(status: ConnectionStatus) {.async.} =
callEventCallback(ctx, "onConnectionStatusChange"):
$JsonConnectionStatusChangeEvent.new(status)
let appCallbacks = AppCallbacks(
relayHandler: onReceivedMessage(ctx),
topicHealthChangeHandler: onTopicHealthChange(ctx),
connectionChangeHandler: onConnectionChange(ctx),
connectionStatusChangeHandler: onConnectionStatusChange(ctx)
)
ffi.sendRequestToFFIThread(

View File

@ -1,3 +1,3 @@
{.used.}
import ./test_entry_nodes, ./test_node_conf
import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health

View File

@ -0,0 +1,296 @@
{.used.}
import std/[options, sequtils, times]
import chronos, testutils/unittests, stew/byteutils, libp2p/[switch, peerinfo]
import ../testlib/[common, wakucore, wakunode, testasync]
import
waku,
waku/[waku_node, waku_core, waku_relay/protocol, common/broker/broker_context],
waku/node/health_monitor/[topic_health, health_status, protocol_health, health_report],
waku/requests/health_requests,
waku/requests/node_requests,
waku/events/health_events,
waku/common/waku_protocol,
waku/factory/waku_conf
const TestTimeout = chronos.seconds(10)
const DefaultShard = PubsubTopic("/waku/2/rs/1/0")
const TestContentTopic = ContentTopic("/waku/2/default-content/proto")
proc dummyHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
discard
proc waitForConnectionStatus(
brokerCtx: BrokerContext, expected: ConnectionStatus
) {.async.} =
var future = newFuture[void]("waitForConnectionStatus")
let handler: EventConnectionStatusChangeListenerProc = proc(
e: EventConnectionStatusChange
) {.async: (raises: []), gcsafe.} =
if not future.finished:
if e.connectionStatus == expected:
future.complete()
let handle = EventConnectionStatusChange.listen(brokerCtx, handler).valueOr:
raiseAssert error
try:
if not await future.withTimeout(TestTimeout):
raiseAssert "Timeout waiting for status: " & $expected
finally:
EventConnectionStatusChange.dropListener(brokerCtx, handle)
proc waitForShardHealthy(
brokerCtx: BrokerContext
): Future[EventShardTopicHealthChange] {.async.} =
var future = newFuture[EventShardTopicHealthChange]("waitForShardHealthy")
let handler: EventShardTopicHealthChangeListenerProc = proc(
e: EventShardTopicHealthChange
) {.async: (raises: []), gcsafe.} =
if not future.finished:
if e.health == TopicHealth.MINIMALLY_HEALTHY or
e.health == TopicHealth.SUFFICIENTLY_HEALTHY:
future.complete(e)
let handle = EventShardTopicHealthChange.listen(brokerCtx, handler).valueOr:
raiseAssert error
try:
if await future.withTimeout(TestTimeout):
return future.read()
else:
raiseAssert "Timeout waiting for shard health event"
finally:
EventShardTopicHealthChange.dropListener(brokerCtx, handle)
suite "LM API health checking":
var
serviceNode {.threadvar.}: WakuNode
client {.threadvar.}: Waku
servicePeerInfo {.threadvar.}: RemotePeerInfo
asyncSetup:
lockNewGlobalBrokerContext:
serviceNode =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
(await serviceNode.mountRelay()).isOkOr:
raiseAssert error
serviceNode.mountMetadata(1, @[0'u16]).isOkOr:
raiseAssert error
await serviceNode.mountLibp2pPing()
await serviceNode.start()
servicePeerInfo = serviceNode.peerInfo.toRemotePeerInfo()
serviceNode.wakuRelay.subscribe(DefaultShard, dummyHandler)
lockNewGlobalBrokerContext:
let conf = NodeConfig.init(
mode = WakuMode.Core,
networkingConfig =
NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0),
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1'u16,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
),
)
client = (await createNode(conf)).valueOr:
raiseAssert error
(await startWaku(addr client)).isOkOr:
raiseAssert error
asyncTeardown:
discard await client.stop()
await serviceNode.stop()
asyncTest "RequestShardTopicsHealth, check PubsubTopic health":
client.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
await client.node.connectToNodes(@[servicePeerInfo])
var isHealthy = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestShardTopicsHealth.request(client.brokerCtx, @[DefaultShard]).valueOr:
raiseAssert "RequestShardTopicsHealth failed"
if req.topicHealth.len > 0:
let h = req.topicHealth[0].health
if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY:
isHealthy = true
break
await sleepAsync(chronos.milliseconds(100))
check isHealthy == true
asyncTest "RequestShardTopicsHealth, check disconnected PubsubTopic":
const GhostShard = PubsubTopic("/waku/2/rs/1/666")
client.node.wakuRelay.subscribe(GhostShard, dummyHandler)
let req = RequestShardTopicsHealth.request(client.brokerCtx, @[GhostShard]).valueOr:
raiseAssert "Request failed"
check req.topicHealth.len > 0
check req.topicHealth[0].health == TopicHealth.UNHEALTHY
asyncTest "RequestProtocolHealth, check relay status":
await client.node.connectToNodes(@[servicePeerInfo])
var isReady = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let relayReq = await RequestProtocolHealth.request(
client.brokerCtx, WakuProtocol.RelayProtocol
)
if relayReq.isOk() and relayReq.get().healthStatus.health == HealthStatus.READY:
isReady = true
break
await sleepAsync(chronos.milliseconds(100))
check isReady == true
let storeReq =
await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol)
if storeReq.isOk():
check storeReq.get().healthStatus.health != HealthStatus.READY
asyncTest "RequestProtocolHealth, check unmounted protocol":
let req =
await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol)
check req.isOk()
let status = req.get().healthStatus
check status.health == HealthStatus.NOT_MOUNTED
check status.desc.isNone()
asyncTest "RequestConnectionStatus, check connectivity state":
let initialReq = RequestConnectionStatus.request(client.brokerCtx).valueOr:
raiseAssert "RequestConnectionStatus failed"
check initialReq.connectionStatus == ConnectionStatus.Disconnected
await client.node.connectToNodes(@[servicePeerInfo])
var isConnected = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestConnectionStatus.request(client.brokerCtx).valueOr:
raiseAssert "RequestConnectionStatus failed"
if req.connectionStatus == ConnectionStatus.PartiallyConnected or
req.connectionStatus == ConnectionStatus.Connected:
isConnected = true
break
await sleepAsync(chronos.milliseconds(100))
check isConnected == true
asyncTest "EventConnectionStatusChange, detect connect and disconnect":
let connectFuture =
waitForConnectionStatus(client.brokerCtx, ConnectionStatus.PartiallyConnected)
await client.node.connectToNodes(@[servicePeerInfo])
await connectFuture
let disconnectFuture =
waitForConnectionStatus(client.brokerCtx, ConnectionStatus.Disconnected)
await client.node.disconnectNode(servicePeerInfo)
await disconnectFuture
asyncTest "EventShardTopicHealthChange, detect health improvement":
client.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
let healthEventFuture = waitForShardHealthy(client.brokerCtx)
await client.node.connectToNodes(@[servicePeerInfo])
let event = await healthEventFuture
check event.topic == DefaultShard
asyncTest "RequestHealthReport, check aggregate report":
let req = await RequestHealthReport.request(client.brokerCtx)
check req.isOk()
let report = req.get().healthReport
check report.nodeHealth == HealthStatus.READY
check report.protocolsHealth.len > 0
check report.protocolsHealth.anyIt(it.protocol == $WakuProtocol.RelayProtocol)
asyncTest "RequestContentTopicsHealth, smoke test":
let fictionalTopic = ContentTopic("/waku/2/this-does-not-exist/proto")
let req = RequestContentTopicsHealth.request(client.brokerCtx, @[fictionalTopic])
check req.isOk()
let res = req.get()
check res.contentTopicHealth.len == 1
check res.contentTopicHealth[0].topic == fictionalTopic
check res.contentTopicHealth[0].health == TopicHealth.NOT_SUBSCRIBED
asyncTest "RequestContentTopicsHealth, core mode trivial 1-shard autosharding":
let cTopic = ContentTopic("/waku/2/my-content-topic/proto")
let shardReq =
RequestRelayShard.request(client.brokerCtx, none(PubsubTopic), cTopic)
check shardReq.isOk()
let targetShard = $shardReq.get().relayShard
client.node.wakuRelay.subscribe(targetShard, dummyHandler)
serviceNode.wakuRelay.subscribe(targetShard, dummyHandler)
await client.node.connectToNodes(@[servicePeerInfo])
var isHealthy = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestContentTopicsHealth.request(client.brokerCtx, @[cTopic]).valueOr:
raiseAssert "Request failed"
if req.contentTopicHealth.len > 0:
let h = req.contentTopicHealth[0].health
if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY:
isHealthy = true
break
await sleepAsync(chronos.milliseconds(100))
check isHealthy == true
asyncTest "RequestProtocolHealth, edge mode smoke test":
var edgeWaku: Waku
lockNewGlobalBrokerContext:
let edgeConf = NodeConfig.init(
mode = WakuMode.Edge,
networkingConfig =
NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0),
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1'u16,
messageValidation =
MessageValidation(maxMessageSize: "150 KiB", rlnConfig: none(RlnConfig)),
),
)
edgeWaku = (await createNode(edgeConf)).valueOr:
raiseAssert "Failed to create edge node: " & error
(await startWaku(addr edgeWaku)).isOkOr:
raiseAssert "Failed to start edge waku: " & error
let relayReq = await RequestProtocolHealth.request(
edgeWaku.brokerCtx, WakuProtocol.RelayProtocol
)
check relayReq.isOk()
check relayReq.get().healthStatus.health == HealthStatus.NOT_MOUNTED
check not edgeWaku.node.wakuFilterClient.isNil()
discard await edgeWaku.stop()

View File

@ -117,6 +117,9 @@ proc validate(
check requestId == expectedRequestId
proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
# allocate random ports to avoid port-already-in-use errors
let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0)
result = NodeConfig.init(
mode = mode,
protocolsConfig = ProtocolsConfig.init(
@ -124,6 +127,7 @@ proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig =
clusterId = 1,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
),
networkingConfig = netConf,
p2pReliability = true,
)
@ -246,8 +250,9 @@ suite "Waku API - Send":
let sendResult = await node.send(envelope)
check sendResult.isErr() # Depending on implementation, it might say "not healthy"
check sendResult.error().contains("not healthy")
# TODO: The API is not enforcing a health check before the send,
# so currently this test cannot successfully fail to send.
check sendResult.isOk()
(await node.stop()).isOkOr:
raiseAssert "Failed to stop node: " & error

View File

@ -7,4 +7,5 @@ import
./test_wakunode_peer_exchange,
./test_wakunode_store,
./test_wakunode_legacy_store,
./test_wakunode_peer_manager
./test_wakunode_peer_manager,
./test_wakunode_health_monitor

View File

@ -0,0 +1,301 @@
{.used.}
import
std/[json, options, sequtils, strutils, tables], testutils/unittests, chronos, results
import
waku/[
waku_core,
common/waku_protocol,
node/waku_node,
node/peer_manager,
node/health_monitor/health_status,
node/health_monitor/connection_status,
node/health_monitor/protocol_health,
node/health_monitor/node_health_monitor,
node/kernel_api/relay,
node/kernel_api/store,
node/kernel_api/lightpush,
node/kernel_api/filter,
waku_archive,
]
import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils
const MockDLow = 4 # Mocked GossipSub DLow value
const TestConnectivityTimeLimit = 3.seconds
proc protoHealthMock(kind: WakuProtocol, health: HealthStatus): ProtocolHealth =
var ph = ProtocolHealth.init(kind)
if health == HealthStatus.READY:
return ph.ready()
else:
return ph.notReady("mock")
suite "Health Monitor - health state calculation":
test "Disconnected, zero peers":
let protocols =
@[
protoHealthMock(RelayProtocol, HealthStatus.NOT_READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
protoHealthMock(FilterClientProtocol, HealthStatus.NOT_READY),
protoHealthMock(LightpushClientProtocol, HealthStatus.NOT_READY),
]
let strength = initTable[WakuProtocol, int]()
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Disconnected
test "PartiallyConnected, weak relay":
let weakCount = MockDLow - 1
let protocols = @[protoHealthMock(RelayProtocol, HealthStatus.READY)]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = weakCount
let state = calculateConnectionState(protocols, strength, some(MockDLow))
# Partially connected since relay connectivity is weak (> 0, but < dLow)
check state == ConnectionStatus.PartiallyConnected
test "Connected, robust relay":
let protocols = @[protoHealthMock(RelayProtocol, HealthStatus.READY)]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
let state = calculateConnectionState(protocols, strength, some(MockDLow))
# Fully connected since relay connectivity is ideal (>= dLow)
check state == ConnectionStatus.Connected
test "Connected, robust edge":
let protocols =
@[
protoHealthMock(RelayProtocol, HealthStatus.NOT_MOUNTED),
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = HealthyThreshold
strength[FilterClientProtocol] = HealthyThreshold
strength[StoreClientProtocol] = HealthyThreshold
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
test "Disconnected, edge missing store":
let protocols =
@[
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = HealthyThreshold
strength[FilterClientProtocol] = HealthyThreshold
strength[StoreClientProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Disconnected
test "PartiallyConnected, edge meets minimum failover requirement":
let weakCount = max(1, HealthyThreshold - 1)
let protocols =
@[
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = weakCount
strength[FilterClientProtocol] = weakCount
strength[StoreClientProtocol] = weakCount
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.PartiallyConnected
test "Connected, robust relay ignores store server":
let protocols =
@[
protoHealthMock(RelayProtocol, HealthStatus.READY),
protoHealthMock(StoreProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
strength[StoreProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
test "Connected, robust relay ignores store client":
let protocols =
@[
protoHealthMock(RelayProtocol, HealthStatus.READY),
protoHealthMock(StoreProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
strength[StoreProtocol] = 0
strength[StoreClientProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
suite "Health Monitor - events":
asyncTest "Core (relay) health update":
let
nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
(await nodeA.mountRelay()).expect("Node A failed to mount Relay")
await nodeA.start()
let monitorA = NodeHealthMonitor.new(nodeA)
var
lastStatus = ConnectionStatus.Disconnected
callbackCount = 0
healthChangeSignal = newAsyncEvent()
monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} =
lastStatus = status
callbackCount.inc()
healthChangeSignal.fire()
monitorA.startHealthMonitor().expect("Health monitor failed to start")
let
nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
await nodeB.mountStore()
await nodeB.start()
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async.} =
discard
nodeA.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect(
"Node A failed to subscribe"
)
nodeB.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect(
"Node B failed to subscribe"
)
let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotConnected = false
while Moment.now() < connectTimeLimit:
if lastStatus == ConnectionStatus.PartiallyConnected:
gotConnected = true
break
if await healthChangeSignal.wait().withTimeout(connectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotConnected == true
callbackCount >= 1
lastStatus == ConnectionStatus.PartiallyConnected
healthChangeSignal.clear()
await nodeB.stop()
await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo())
let disconnectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotDisconnected = false
while Moment.now() < disconnectTimeLimit:
if lastStatus == ConnectionStatus.Disconnected:
gotDisconnected = true
break
if await healthChangeSignal.wait().withTimeout(disconnectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotDisconnected == true
await monitorA.stopHealthMonitor()
await nodeA.stop()
asyncTest "Edge (light client) health update":
let
nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
nodeA.mountLightpushClient()
await nodeA.mountFilterClient()
nodeA.mountStoreClient()
await nodeA.start()
let monitorA = NodeHealthMonitor.new(nodeA)
var
lastStatus = ConnectionStatus.Disconnected
callbackCount = 0
healthChangeSignal = newAsyncEvent()
monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} =
lastStatus = status
callbackCount.inc()
healthChangeSignal.fire()
monitorA.startHealthMonitor().expect("Health monitor failed to start")
let
nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
(await nodeB.mountLightpush()).expect("Node B failed to mount lightpush")
await nodeB.mountFilter()
await nodeB.mountStore()
await nodeB.start()
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotConnected = false
while Moment.now() < connectTimeLimit:
if lastStatus == ConnectionStatus.PartiallyConnected:
gotConnected = true
break
if await healthChangeSignal.wait().withTimeout(connectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotConnected == true
callbackCount >= 1
lastStatus == ConnectionStatus.PartiallyConnected
healthChangeSignal.clear()
await nodeB.stop()
await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo())
let disconnectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotDisconnected = false
while Moment.now() < disconnectTimeLimit:
if lastStatus == ConnectionStatus.Disconnected:
gotDisconnected = true
break
if await healthChangeSignal.wait().withTimeout(disconnectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotDisconnected == true
lastStatus == ConnectionStatus.Disconnected
await monitorA.stopHealthMonitor()
await nodeA.stop()

View File

@ -11,15 +11,15 @@ import
from std/times import epochTime
import
waku/
[
waku_relay,
node/waku_node,
node/peer_manager,
waku_core,
waku_node,
waku_rln_relay,
],
waku/[
waku_relay,
node/waku_node,
node/peer_manager,
waku_core,
waku_node,
waku_rln_relay,
common/broker/broker_context,
],
../waku_store/store_utils,
../waku_archive/archive_utils,
../testlib/[wakucore, futures]

View File

@ -10,6 +10,7 @@ import
libp2p/crypto/crypto
import
waku/[
common/waku_protocol,
waku_node,
node/waku_node as waku_node2,
# TODO: Remove after moving `git_version` to the app code.
@ -78,47 +79,39 @@ suite "Waku v2 REST API - health":
# When
var response = await client.healthCheck()
let report = response.data
# Then
check:
response.status == 200
$response.contentType == $MIMETYPE_JSON
response.data.nodeHealth == HealthStatus.READY
response.data.protocolsHealth.len() == 15
response.data.protocolsHealth[0].protocol == "Relay"
response.data.protocolsHealth[0].health == HealthStatus.NOT_READY
response.data.protocolsHealth[0].desc == some("No connected peers")
response.data.protocolsHealth[1].protocol == "Rln Relay"
response.data.protocolsHealth[1].health == HealthStatus.READY
response.data.protocolsHealth[2].protocol == "Lightpush"
response.data.protocolsHealth[2].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[3].protocol == "Legacy Lightpush"
response.data.protocolsHealth[3].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[4].protocol == "Filter"
response.data.protocolsHealth[4].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[5].protocol == "Store"
response.data.protocolsHealth[5].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[6].protocol == "Legacy Store"
response.data.protocolsHealth[6].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[7].protocol == "Peer Exchange"
response.data.protocolsHealth[7].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[8].protocol == "Rendezvous"
response.data.protocolsHealth[8].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[9].protocol == "Mix"
response.data.protocolsHealth[9].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[10].protocol == "Lightpush Client"
response.data.protocolsHealth[10].health == HealthStatus.NOT_READY
response.data.protocolsHealth[10].desc ==
report.nodeHealth == HealthStatus.READY
report.protocolsHealth.len() == 15
report.getHealth(RelayProtocol).health == HealthStatus.NOT_READY
report.getHealth(RelayProtocol).desc == some("No connected peers")
report.getHealth(RlnRelayProtocol).health == HealthStatus.READY
report.getHealth(LightpushProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(LegacyLightpushProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(FilterProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(StoreProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(LegacyStoreProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(PeerExchangeProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(RendezvousProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(MixProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(LightpushClientProtocol).health == HealthStatus.NOT_READY
report.getHealth(LightpushClientProtocol).desc ==
some("No Lightpush service peer available yet")
response.data.protocolsHealth[11].protocol == "Legacy Lightpush Client"
response.data.protocolsHealth[11].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[12].protocol == "Store Client"
response.data.protocolsHealth[12].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[13].protocol == "Legacy Store Client"
response.data.protocolsHealth[13].health == HealthStatus.NOT_MOUNTED
response.data.protocolsHealth[14].protocol == "Filter Client"
response.data.protocolsHealth[14].health == HealthStatus.NOT_READY
response.data.protocolsHealth[14].desc ==
report.getHealth(LegacyLightpushClientProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(StoreClientProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(LegacyStoreClientProtocol).health == HealthStatus.NOT_MOUNTED
report.getHealth(FilterClientProtocol).health == HealthStatus.NOT_READY
report.getHealth(FilterClientProtocol).desc ==
some("No Filter service peer available yet")
await restServer.stop()

View File

@ -1,7 +1,7 @@
import chronicles, chronos, results
import chronicles, chronos, results, std/strutils
import waku/factory/waku
import waku/[requests/health_request, waku_core, waku_node]
import waku/[requests/health_requests, waku_core, waku_node]
import waku/node/delivery_service/send_service
import waku/node/delivery_service/subscription_service
import ./[api_conf, types]
@ -25,16 +25,8 @@ proc checkApiAvailability(w: Waku): Result[void, string] =
if w.isNil():
return err("Waku node is not initialized")
# check if health is satisfactory
# If Node is not healthy, return err("Waku node is not healthy")
let healthStatus = RequestNodeHealth.request(w.brokerCtx)
if healthStatus.isErr():
warn "Failed to get Waku node health status: ", error = healthStatus.error
# Let's suppose the node is hesalthy enough, go ahead
else:
if healthStatus.get().healthStatus == NodeHealth.Unhealthy:
return err("Waku node is not healthy, has got no connections.")
# TODO: Conciliate request-bouncing health checks here with unit testing.
# (For now, better to just allow all sends and rely on retries.)
return ok()

View File

@ -14,10 +14,10 @@ type
RequestId* = distinct string
NodeHealth* {.pure.} = enum
Healthy
MinimallyHealthy
Unhealthy
ConnectionStatus* {.pure.} = enum
Disconnected
PartiallyConnected
Connected
proc new*(T: typedesc[RequestId], rng: ref HmacDrbgContext): T =
## Generate a new RequestId using the provided RNG.

View File

@ -0,0 +1,24 @@
{.push raises: [].}
type WakuProtocol* {.pure.} = enum
RelayProtocol = "Relay"
RlnRelayProtocol = "Rln Relay"
StoreProtocol = "Store"
LegacyStoreProtocol = "Legacy Store"
FilterProtocol = "Filter"
LightpushProtocol = "Lightpush"
LegacyLightpushProtocol = "Legacy Lightpush"
PeerExchangeProtocol = "Peer Exchange"
RendezvousProtocol = "Rendezvous"
MixProtocol = "Mix"
StoreClientProtocol = "Store Client"
LegacyStoreClientProtocol = "Legacy Store Client"
FilterClientProtocol = "Filter Client"
LightpushClientProtocol = "Lightpush Client"
LegacyLightpushClientProtocol = "Legacy Lightpush Client"
const
RelayProtocols* = {RelayProtocol}
StoreClientProtocols* = {StoreClientProtocol, LegacyStoreClientProtocol}
LightpushClientProtocols* = {LightpushClientProtocol, LegacyLightpushClientProtocol}
FilterClientProtocols* = {FilterClientProtocol}

View File

@ -1,3 +1,3 @@
import ./[message_events, delivery_events]
import ./[message_events, delivery_events, health_events, peer_events]
export message_events, delivery_events
export message_events, delivery_events, health_events, peer_events

View File

@ -0,0 +1,27 @@
import waku/common/broker/event_broker
import waku/api/types
import waku/node/health_monitor/[protocol_health, topic_health]
import waku/waku_core/topics
export protocol_health, topic_health
# Notify health changes to node connectivity
EventBroker:
type EventConnectionStatusChange* = object
connectionStatus*: ConnectionStatus
# Notify health changes to a subscribed topic
# TODO: emit content topic health change events when subscribe/unsubscribe
# from/to content topic is provided in the new API (so we know which
# content topics are of interest to the application)
EventBroker:
type EventContentTopicHealthChange* = object
contentTopic*: ContentTopic
health*: TopicHealth
# Notify health changes to a shard (pubsub topic)
EventBroker:
type EventShardTopicHealthChange* = object
topic*: PubsubTopic
health*: TopicHealth

View File

@ -0,0 +1,13 @@
import waku/common/broker/event_broker
import libp2p/switch
type WakuPeerEventKind* {.pure.} = enum
EventConnected
EventDisconnected
EventIdentified
EventMetadataUpdated
EventBroker:
type EventWakuPeer* = object
peerId*: PeerId
kind*: WakuPeerEventKind

View File

@ -1,6 +1,7 @@
import ../waku_relay, ../node/peer_manager
import ../waku_relay, ../node/peer_manager, ../node/health_monitor/connection_status
type AppCallbacks* = ref object
relayHandler*: WakuRelayHandler
topicHealthChangeHandler*: TopicHealthChangeHandler
connectionChangeHandler*: ConnectionChangeHandler
connectionStatusChangeHandler*: ConnectionStatusChangeHandler

View File

@ -15,7 +15,8 @@ import
../waku_node,
../node/peer_manager,
../common/rate_limit/setting,
../common/utils/parse_size_units
../common/utils/parse_size_units,
../common/broker/broker_context
type
WakuNodeBuilder* = object # General

View File

@ -17,35 +17,36 @@ import
eth/p2p/discoveryv5/enr,
presto,
metrics,
metrics/chronos_httpserver
import
../common/logging,
../waku_core,
../waku_node,
../node/peer_manager,
../node/health_monitor,
../node/waku_metrics,
../node/delivery_service/delivery_service,
../rest_api/message_cache,
../rest_api/endpoint/server,
../rest_api/endpoint/builder as rest_server_builder,
../waku_archive,
../waku_relay/protocol,
../discovery/waku_dnsdisc,
../discovery/waku_discv5,
../discovery/autonat_service,
../waku_enr/sharding,
../waku_rln_relay,
../waku_store,
../waku_filter_v2,
../factory/node_factory,
../factory/internal_config,
../factory/app_callbacks,
../waku_enr/multiaddr,
./waku_conf,
../common/broker/broker_context,
../requests/health_request,
../api/types
metrics/chronos_httpserver,
waku/[
waku_core,
waku_node,
waku_archive,
waku_rln_relay,
waku_store,
waku_filter_v2,
waku_relay/protocol,
waku_enr/sharding,
waku_enr/multiaddr,
api/types,
common/logging,
common/broker/broker_context,
node/peer_manager,
node/health_monitor,
node/waku_metrics,
node/delivery_service/delivery_service,
rest_api/message_cache,
rest_api/endpoint/server,
rest_api/endpoint/builder as rest_server_builder,
discovery/waku_dnsdisc,
discovery/waku_discv5,
discovery/autonat_service,
requests/health_requests,
factory/node_factory,
factory/internal_config,
factory/app_callbacks,
],
./waku_conf
logScope:
topics = "wakunode waku"
@ -118,7 +119,10 @@ proc newCircuitRelay(isRelayClient: bool): Relay =
return Relay.new()
proc setupAppCallbacks(
node: WakuNode, conf: WakuConf, appCallbacks: AppCallbacks
node: WakuNode,
conf: WakuConf,
appCallbacks: AppCallbacks,
healthMonitor: NodeHealthMonitor,
): Result[void, string] =
if appCallbacks.isNil():
info "No external callbacks to be set"
@ -159,6 +163,13 @@ proc setupAppCallbacks(
err("Cannot configure connectionChangeHandler callback with empty peer manager")
node.peerManager.onConnectionChange = appCallbacks.connectionChangeHandler
if not appCallbacks.connectionStatusChangeHandler.isNil():
if healthMonitor.isNil():
return
err("Cannot configure connectionStatusChangeHandler with empty health monitor")
healthMonitor.onConnectionStatusChange = appCallbacks.connectionStatusChangeHandler
return ok()
proc new*(
@ -192,7 +203,7 @@ proc new*(
else:
nil
node.setupAppCallbacks(wakuConf, appCallbacks).isOkOr:
node.setupAppCallbacks(wakuConf, appCallbacks, healthMonitor).isOkOr:
error "Failed setting up app callbacks", error = error
return err("Failed setting up app callbacks: " & $error)
@ -409,60 +420,48 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises:
waku[].healthMonitor.startHealthMonitor().isOkOr:
return err("failed to start health monitor: " & $error)
## Setup RequestNodeHealth provider
## Setup RequestConnectionStatus provider
RequestNodeHealth.setProvider(
RequestConnectionStatus.setProvider(
globalBrokerContext(),
proc(): Result[RequestNodeHealth, string] =
let healthReportFut = waku[].healthMonitor.getNodeHealthReport()
if not healthReportFut.completed():
return err("Health report not available")
proc(): Result[RequestConnectionStatus, string] =
try:
let healthReport = healthReportFut.read()
# Check if Relay or Lightpush Client is ready (MinimallyHealthy condition)
var relayReady = false
var lightpushClientReady = false
var storeClientReady = false
var filterClientReady = false
for protocolHealth in healthReport.protocolsHealth:
if protocolHealth.protocol == "Relay" and
protocolHealth.health == HealthStatus.READY:
relayReady = true
elif protocolHealth.protocol == "Lightpush Client" and
protocolHealth.health == HealthStatus.READY:
lightpushClientReady = true
elif protocolHealth.protocol == "Store Client" and
protocolHealth.health == HealthStatus.READY:
storeClientReady = true
elif protocolHealth.protocol == "Filter Client" and
protocolHealth.health == HealthStatus.READY:
filterClientReady = true
# Determine node health based on protocol states
let isMinimallyHealthy = relayReady or lightpushClientReady
let nodeHealth =
if isMinimallyHealthy and storeClientReady and filterClientReady:
NodeHealth.Healthy
elif isMinimallyHealthy:
NodeHealth.MinimallyHealthy
else:
NodeHealth.Unhealthy
debug "Providing health report",
nodeHealth = $nodeHealth,
relayReady = relayReady,
lightpushClientReady = lightpushClientReady,
storeClientReady = storeClientReady,
filterClientReady = filterClientReady,
details = $(healthReport)
return ok(RequestNodeHealth(healthStatus: nodeHealth))
except CatchableError as exc:
err("Failed to read health report: " & exc.msg),
let healthReport = waku[].healthMonitor.getSyncNodeHealthReport()
return
ok(RequestConnectionStatus(connectionStatus: healthReport.connectionStatus))
except CatchableError:
err("Failed to read health report: " & getCurrentExceptionMsg()),
).isOkOr:
error "Failed to set RequestNodeHealth provider", error = error
error "Failed to set RequestConnectionStatus provider", error = error
## Setup RequestProtocolHealth provider
RequestProtocolHealth.setProvider(
globalBrokerContext(),
proc(
protocol: WakuProtocol
): Future[Result[RequestProtocolHealth, string]] {.async.} =
try:
let protocolHealthStatus =
await waku[].healthMonitor.getProtocolHealthInfo(protocol)
return ok(RequestProtocolHealth(healthStatus: protocolHealthStatus))
except CatchableError:
return err("Failed to get protocol health: " & getCurrentExceptionMsg()),
).isOkOr:
error "Failed to set RequestProtocolHealth provider", error = error
## Setup RequestHealthReport provider (The lost child)
RequestHealthReport.setProvider(
globalBrokerContext(),
proc(): Future[Result[RequestHealthReport, string]] {.async.} =
try:
let report = await waku[].healthMonitor.getNodeHealthReport()
return ok(RequestHealthReport(healthReport: report))
except CatchableError:
return err("Failed to get health report: " & getCurrentExceptionMsg()),
).isOkOr:
error "Failed to set RequestHealthReport provider", error = error
if conf.restServerConf.isSome():
rest_server_builder.startRestServerProtocolSupport(
@ -521,8 +520,8 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} =
if not waku.healthMonitor.isNil():
await waku.healthMonitor.stopHealthMonitor()
## Clear RequestNodeHealth provider
RequestNodeHealth.clearProvider(waku.brokerCtx)
## Clear RequestConnectionStatus provider
RequestConnectionStatus.clearProvider(waku.brokerCtx)
if not waku.restServer.isNil():
await waku.restServer.stop()

View File

@ -1,7 +1,7 @@
import std/options
import chronos, chronicles
import waku/[waku_core], waku/waku_lightpush/[common, rpc]
import waku/requests/health_request
import waku/requests/health_requests
import waku/common/broker/broker_context
import waku/api/types
import ./[delivery_task, send_processor]
@ -32,7 +32,7 @@ proc new*(
)
proc isTopicHealthy(self: RelaySendProcessor, topic: PubsubTopic): bool {.gcsafe.} =
let healthReport = RequestRelayTopicsHealth.request(self.brokerCtx, @[topic]).valueOr:
let healthReport = RequestShardTopicsHealth.request(self.brokerCtx, @[topic]).valueOr:
error "isTopicHealthy: failed to get health report", topic = topic, error = error
return false

View File

@ -1,4 +1,9 @@
import
health_monitor/[node_health_monitor, protocol_health, online_monitor, health_status]
health_monitor/[
node_health_monitor, protocol_health, online_monitor, health_status,
connection_status, health_report,
]
export node_health_monitor, protocol_health, online_monitor, health_status
export
node_health_monitor, protocol_health, online_monitor, health_status,
connection_status, health_report

View File

@ -0,0 +1,15 @@
import chronos, results, std/strutils, ../../api/types
export ConnectionStatus
proc init*(
t: typedesc[ConnectionStatus], strRep: string
): Result[ConnectionStatus, string] =
try:
let status = parseEnum[ConnectionStatus](strRep)
return ok(status)
except ValueError:
return err("Invalid ConnectionStatus string representation: " & strRep)
type ConnectionStatusChangeHandler* =
proc(status: ConnectionStatus): Future[void] {.gcsafe, raises: [Defect].}

View File

@ -0,0 +1,10 @@
{.push raises: [].}
import ./health_status, ./connection_status, ./protocol_health
type HealthReport* = object
## Rest API type returned for /health endpoint
##
nodeHealth*: HealthStatus # legacy "READY" health indicator
connectionStatus*: ConnectionStatus # new "Connected" health indicator
protocolsHealth*: seq[ProtocolHealth]

View File

@ -1,55 +1,89 @@
{.push raises: [].}
import
std/[options, sets, random, sequtils],
std/[options, sets, random, sequtils, json, strutils, tables],
chronos,
chronicles,
libp2p/protocols/rendezvous
import
../waku_node,
../kernel_api,
../../waku_rln_relay,
../../waku_relay,
../peer_manager,
./online_monitor,
./health_status,
./protocol_health
libp2p/protocols/rendezvous,
libp2p/protocols/pubsub,
libp2p/protocols/pubsub/rpc/messages,
waku/[
waku_relay,
waku_rln_relay,
api/types,
events/health_events,
events/peer_events,
node/waku_node,
node/peer_manager,
node/kernel_api,
node/health_monitor/online_monitor,
node/health_monitor/health_status,
node/health_monitor/health_report,
node/health_monitor/connection_status,
node/health_monitor/protocol_health,
]
## This module is aimed to check the state of the "self" Waku Node
# randomize initializes sdt/random's random number generator
# if not called, the outcome of randomization procedures will be the same in every run
randomize()
random.randomize()
type
HealthReport* = object
nodeHealth*: HealthStatus
protocolsHealth*: seq[ProtocolHealth]
const HealthyThreshold* = 2
## minimum peers required for all services for a Connected status, excluding Relay
NodeHealthMonitor* = ref object
nodeHealth: HealthStatus
node: WakuNode
onlineMonitor*: OnlineMonitor
keepAliveFut: Future[void]
type NodeHealthMonitor* = ref object
nodeHealth: HealthStatus
node: WakuNode
onlineMonitor*: OnlineMonitor
keepAliveFut: Future[void]
healthLoopFut: Future[void]
healthUpdateEvent: AsyncEvent
connectionStatus: ConnectionStatus
onConnectionStatusChange*: ConnectionStatusChangeHandler
cachedProtocols: seq[ProtocolHealth]
## state of each protocol to report.
## calculated on last event that can change any protocol's state so fetching a report is fast.
strength: Table[WakuProtocol, int]
## latest known connectivity strength (e.g. connected peer count) metric for each protocol.
## if it doesn't make sense for the protocol in question, this is set to zero.
relayObserver: PubSubObserver
peerEventListener: EventWakuPeerListener
func getHealth*(report: HealthReport, kind: WakuProtocol): ProtocolHealth =
for h in report.protocolsHealth:
if h.protocol == $kind:
return h
# Shouldn't happen, but if it does, then assume protocol is not mounted
return ProtocolHealth.init(kind)
proc countCapablePeers(hm: NodeHealthMonitor, codec: string): int =
if isNil(hm.node.peerManager):
return 0
return hm.node.peerManager.getCapablePeersCount(codec)
proc getRelayHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Relay")
var p = ProtocolHealth.init(WakuProtocol.RelayProtocol)
if hm.node.wakuRelay == nil:
if isNil(hm.node.wakuRelay):
hm.strength[WakuProtocol.RelayProtocol] = 0
return p.notMounted()
let relayPeers = hm.node.wakuRelay.getConnectedPubSubPeers(pubsubTopic = "").valueOr:
hm.strength[WakuProtocol.RelayProtocol] = 0
return p.notMounted()
if relayPeers.len() == 0:
let count = relayPeers.len
hm.strength[WakuProtocol.RelayProtocol] = count
if count == 0:
return p.notReady("No connected peers")
return p.ready()
proc getRlnRelayHealth(hm: NodeHealthMonitor): Future[ProtocolHealth] {.async.} =
var p = ProtocolHealth.init("Rln Relay")
if hm.node.wakuRlnRelay.isNil():
var p = ProtocolHealth.init(WakuProtocol.RlnRelayProtocol)
if isNil(hm.node.wakuRlnRelay):
return p.notMounted()
const FutIsReadyTimout = 5.seconds
@ -72,121 +106,144 @@ proc getRlnRelayHealth(hm: NodeHealthMonitor): Future[ProtocolHealth] {.async.}
proc getLightpushHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Lightpush")
var p = ProtocolHealth.init(WakuProtocol.LightpushProtocol)
if hm.node.wakuLightPush == nil:
if isNil(hm.node.wakuLightPush):
hm.strength[WakuProtocol.LightpushProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuLightPushCodec)
hm.strength[WakuProtocol.LightpushProtocol] = peerCount
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Node has no relay peers to fullfill push requests")
proc getLightpushClientHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Lightpush Client")
if hm.node.wakuLightpushClient == nil:
return p.notMounted()
let selfServiceAvailable =
hm.node.wakuLightPush != nil and relayHealth == HealthStatus.READY
let servicePeerAvailable = hm.node.peerManager.selectPeer(WakuLightPushCodec).isSome()
if selfServiceAvailable or servicePeerAvailable:
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getLegacyLightpushHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Lightpush")
var p = ProtocolHealth.init(WakuProtocol.LegacyLightpushProtocol)
if hm.node.wakuLegacyLightPush == nil:
if isNil(hm.node.wakuLegacyLightPush):
hm.strength[WakuProtocol.LegacyLightpushProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuLegacyLightPushCodec)
hm.strength[WakuProtocol.LegacyLightpushProtocol] = peerCount
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Node has no relay peers to fullfill push requests")
proc getLegacyLightpushClientHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Lightpush Client")
if hm.node.wakuLegacyLightpushClient == nil:
return p.notMounted()
if (hm.node.wakuLegacyLightPush != nil and relayHealth == HealthStatus.READY) or
hm.node.peerManager.selectPeer(WakuLegacyLightPushCodec).isSome():
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getFilterHealth(hm: NodeHealthMonitor, relayHealth: HealthStatus): ProtocolHealth =
var p = ProtocolHealth.init("Filter")
var p = ProtocolHealth.init(WakuProtocol.FilterProtocol)
if hm.node.wakuFilter == nil:
if isNil(hm.node.wakuFilter):
hm.strength[WakuProtocol.FilterProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuFilterSubscribeCodec)
hm.strength[WakuProtocol.FilterProtocol] = peerCount
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Relay is not ready, filter will not be able to sort out messages")
proc getFilterClientHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Filter Client")
if hm.node.wakuFilterClient == nil:
return p.notMounted()
if hm.node.peerManager.selectPeer(WakuFilterSubscribeCodec).isSome():
return p.ready()
return p.notReady("No Filter service peer available yet")
proc getStoreHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Store")
var p = ProtocolHealth.init(WakuProtocol.StoreProtocol)
if hm.node.wakuStore == nil:
if isNil(hm.node.wakuStore):
hm.strength[WakuProtocol.StoreProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuStoreCodec)
hm.strength[WakuProtocol.StoreProtocol] = peerCount
return p.ready()
proc getStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Store Client")
proc getLegacyStoreHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init(WakuProtocol.LegacyStoreProtocol)
if hm.node.wakuStoreClient == nil:
if isNil(hm.node.wakuLegacyStore):
hm.strength[WakuProtocol.LegacyStoreProtocol] = 0
return p.notMounted()
if hm.node.peerManager.selectPeer(WakuStoreCodec).isSome() or hm.node.wakuStore != nil:
let peerCount = hm.countCapablePeers(WakuLegacyStoreCodec)
hm.strength[WakuProtocol.LegacyStoreProtocol] = peerCount
return p.ready()
proc getLightpushClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init(WakuProtocol.LightpushClientProtocol)
if isNil(hm.node.wakuLightpushClient):
hm.strength[WakuProtocol.LightpushClientProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuLightPushCodec)
hm.strength[WakuProtocol.LightpushClientProtocol] = peerCount
if peerCount > 0:
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getLegacyLightpushClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init(WakuProtocol.LegacyLightpushClientProtocol)
if isNil(hm.node.wakuLegacyLightpushClient):
hm.strength[WakuProtocol.LegacyLightpushClientProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuLegacyLightPushCodec)
hm.strength[WakuProtocol.LegacyLightpushClientProtocol] = peerCount
if peerCount > 0:
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getFilterClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init(WakuProtocol.FilterClientProtocol)
if isNil(hm.node.wakuFilterClient):
hm.strength[WakuProtocol.FilterClientProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuFilterSubscribeCodec)
hm.strength[WakuProtocol.FilterClientProtocol] = peerCount
if peerCount > 0:
return p.ready()
return p.notReady("No Filter service peer available yet")
proc getStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init(WakuProtocol.StoreClientProtocol)
if isNil(hm.node.wakuStoreClient):
hm.strength[WakuProtocol.StoreClientProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuStoreCodec)
hm.strength[WakuProtocol.StoreClientProtocol] = peerCount
if peerCount > 0 or not isNil(hm.node.wakuStore):
return p.ready()
return p.notReady(
"No Store service peer available yet, neither Store service set up for the node"
)
proc getLegacyStoreHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Store")
if hm.node.wakuLegacyStore == nil:
return p.notMounted()
return p.ready()
proc getLegacyStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Store Client")
var p = ProtocolHealth.init(WakuProtocol.LegacyStoreClientProtocol)
if hm.node.wakuLegacyStoreClient == nil:
if isNil(hm.node.wakuLegacyStoreClient):
hm.strength[WakuProtocol.LegacyStoreClientProtocol] = 0
return p.notMounted()
if hm.node.peerManager.selectPeer(WakuLegacyStoreCodec).isSome() or
hm.node.wakuLegacyStore != nil:
let peerCount = countCapablePeers(hm, WakuLegacyStoreCodec)
hm.strength[WakuProtocol.LegacyStoreClientProtocol] = peerCount
if peerCount > 0 or not isNil(hm.node.wakuLegacyStore):
return p.ready()
return p.notReady(
@ -194,38 +251,305 @@ proc getLegacyStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
)
proc getPeerExchangeHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Peer Exchange")
var p = ProtocolHealth.init(WakuProtocol.PeerExchangeProtocol)
if hm.node.wakuPeerExchange == nil:
if isNil(hm.node.wakuPeerExchange):
hm.strength[WakuProtocol.PeerExchangeProtocol] = 0
return p.notMounted()
let peerCount = countCapablePeers(hm, WakuPeerExchangeCodec)
hm.strength[WakuProtocol.PeerExchangeProtocol] = peerCount
return p.ready()
proc getRendezvousHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Rendezvous")
var p = ProtocolHealth.init(WakuProtocol.RendezvousProtocol)
if hm.node.wakuRendezvous == nil:
if isNil(hm.node.wakuRendezvous):
hm.strength[WakuProtocol.RendezvousProtocol] = 0
return p.notMounted()
if hm.node.peerManager.switch.peerStore.peers(RendezVousCodec).len() == 0:
let peerCount = countCapablePeers(hm, RendezVousCodec)
hm.strength[WakuProtocol.RendezvousProtocol] = peerCount
if peerCount == 0:
return p.notReady("No Rendezvous peers are available yet")
return p.ready()
proc getMixHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Mix")
var p = ProtocolHealth.init(WakuProtocol.MixProtocol)
if hm.node.wakuMix.isNil():
if isNil(hm.node.wakuMix):
return p.notMounted()
return p.ready()
proc getSyncProtocolHealthInfo*(
hm: NodeHealthMonitor, protocol: WakuProtocol
): ProtocolHealth =
## Get ProtocolHealth for a given protocol that can provide it synchronously
##
case protocol
of WakuProtocol.RelayProtocol:
return hm.getRelayHealth()
of WakuProtocol.StoreProtocol:
return hm.getStoreHealth()
of WakuProtocol.LegacyStoreProtocol:
return hm.getLegacyStoreHealth()
of WakuProtocol.FilterProtocol:
return hm.getFilterHealth(hm.getRelayHealth().health)
of WakuProtocol.LightpushProtocol:
return hm.getLightpushHealth(hm.getRelayHealth().health)
of WakuProtocol.LegacyLightpushProtocol:
return hm.getLegacyLightpushHealth(hm.getRelayHealth().health)
of WakuProtocol.PeerExchangeProtocol:
return hm.getPeerExchangeHealth()
of WakuProtocol.RendezvousProtocol:
return hm.getRendezvousHealth()
of WakuProtocol.MixProtocol:
return hm.getMixHealth()
of WakuProtocol.StoreClientProtocol:
return hm.getStoreClientHealth()
of WakuProtocol.LegacyStoreClientProtocol:
return hm.getLegacyStoreClientHealth()
of WakuProtocol.FilterClientProtocol:
return hm.getFilterClientHealth()
of WakuProtocol.LightpushClientProtocol:
return hm.getLightpushClientHealth()
of WakuProtocol.LegacyLightpushClientProtocol:
return hm.getLegacyLightpushClientHealth()
of WakuProtocol.RlnRelayProtocol:
# Could waitFor here but we don't want to block the main thread.
# Could also return a cached value from a previous check.
var p = ProtocolHealth.init(protocol)
return p.notReady("RLN Relay health check is async")
else:
var p = ProtocolHealth.init(protocol)
return p.notMounted()
proc getProtocolHealthInfo*(
hm: NodeHealthMonitor, protocol: WakuProtocol
): Future[ProtocolHealth] {.async.} =
## Get ProtocolHealth for a given protocol
##
case protocol
of WakuProtocol.RlnRelayProtocol:
return await hm.getRlnRelayHealth()
else:
return hm.getSyncProtocolHealthInfo(protocol)
proc getSyncAllProtocolHealthInfo(hm: NodeHealthMonitor): seq[ProtocolHealth] =
## Get ProtocolHealth for the subset of protocols that can provide it synchronously
##
var protocols: seq[ProtocolHealth] = @[]
let relayHealth = hm.getRelayHealth()
protocols.add(relayHealth)
protocols.add(hm.getLightpushHealth(relayHealth.health))
protocols.add(hm.getLegacyLightpushHealth(relayHealth.health))
protocols.add(hm.getFilterHealth(relayHealth.health))
protocols.add(hm.getStoreHealth())
protocols.add(hm.getLegacyStoreHealth())
protocols.add(hm.getPeerExchangeHealth())
protocols.add(hm.getRendezvousHealth())
protocols.add(hm.getMixHealth())
protocols.add(hm.getLightpushClientHealth())
protocols.add(hm.getLegacyLightpushClientHealth())
protocols.add(hm.getStoreClientHealth())
protocols.add(hm.getLegacyStoreClientHealth())
protocols.add(hm.getFilterClientHealth())
return protocols
proc getAllProtocolHealthInfo(
hm: NodeHealthMonitor
): Future[seq[ProtocolHealth]] {.async.} =
## Get ProtocolHealth for all protocols
##
var protocols = hm.getSyncAllProtocolHealthInfo()
let rlnHealth = await hm.getRlnRelayHealth()
protocols.add(rlnHealth)
return protocols
proc calculateConnectionState*(
protocols: seq[ProtocolHealth],
strength: Table[WakuProtocol, int], ## latest connectivity strength (e.g. peer count) for a protocol
dLowOpt: Option[int], ## minimum relay peers for Connected status if in Core (Relay) mode
): ConnectionStatus =
var
relayCount = 0
lightpushCount = 0
filterCount = 0
storeClientCount = 0
for p in protocols:
let kind =
try:
parseEnum[WakuProtocol](p.protocol)
except ValueError:
continue
if p.health != HealthStatus.READY:
continue
let strength = strength.getOrDefault(kind, 0)
if kind in RelayProtocols:
relayCount = max(relayCount, strength)
elif kind in StoreClientProtocols:
storeClientCount = max(storeClientCount, strength)
elif kind in LightpushClientProtocols:
lightpushCount = max(lightpushCount, strength)
elif kind in FilterClientProtocols:
filterCount = max(filterCount, strength)
debug "calculateConnectionState",
protocol = kind,
strength = strength,
relayCount = relayCount,
storeClientCount = storeClientCount,
lightpushCount = lightpushCount,
filterCount = filterCount
# Relay connectivity should be a sufficient check in Core mode.
# "Store peers" are relay peers because incoming messages in
# the relay are input to the store server.
# But if Store server (or client, even) is not mounted as well, this logic assumes
# the user knows what they're doing.
if dLowOpt.isSome():
if relayCount >= dLowOpt.get():
return ConnectionStatus.Connected
if relayCount > 0:
return ConnectionStatus.PartiallyConnected
# No relay connectivity. Relay might not be mounted, or may just have zero peers.
# Fall back to Edge check in any case to be sure.
let canSend = lightpushCount > 0
let canReceive = filterCount > 0
let canStore = storeClientCount > 0
let meetsMinimum = canSend and canReceive and canStore
if not meetsMinimum:
return ConnectionStatus.Disconnected
let isEdgeRobust =
(lightpushCount >= HealthyThreshold) and (filterCount >= HealthyThreshold) and
(storeClientCount >= HealthyThreshold)
if isEdgeRobust:
return ConnectionStatus.Connected
return ConnectionStatus.PartiallyConnected
proc calculateConnectionState*(hm: NodeHealthMonitor): ConnectionStatus =
let dLow =
if isNil(hm.node.wakuRelay):
none(int)
else:
some(hm.node.wakuRelay.parameters.dLow)
return calculateConnectionState(hm.cachedProtocols, hm.strength, dLow)
proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} =
## Get a HealthReport that includes all protocols
##
var report: HealthReport
if hm.nodeHealth == HealthStatus.INITIALIZING or
hm.nodeHealth == HealthStatus.SHUTTING_DOWN:
report.nodeHealth = hm.nodeHealth
report.connectionStatus = ConnectionStatus.Disconnected
return report
if hm.cachedProtocols.len == 0:
hm.cachedProtocols = await hm.getAllProtocolHealthInfo()
hm.connectionStatus = hm.calculateConnectionState()
report.nodeHealth = HealthStatus.READY
report.connectionStatus = hm.connectionStatus
report.protocolsHealth = hm.cachedProtocols
return report
proc getSyncNodeHealthReport*(hm: NodeHealthMonitor): HealthReport =
## Get a HealthReport that includes the subset of protocols that inform health synchronously
##
var report: HealthReport
if hm.nodeHealth == HealthStatus.INITIALIZING or
hm.nodeHealth == HealthStatus.SHUTTING_DOWN:
report.nodeHealth = hm.nodeHealth
report.connectionStatus = ConnectionStatus.Disconnected
return report
if hm.cachedProtocols.len == 0:
hm.cachedProtocols = hm.getSyncAllProtocolHealthInfo()
hm.connectionStatus = hm.calculateConnectionState()
report.nodeHealth = HealthStatus.READY
report.connectionStatus = hm.connectionStatus
report.protocolsHealth = hm.cachedProtocols
return report
proc onRelayMsg(
hm: NodeHealthMonitor, peer: PubSubPeer, msg: var RPCMsg
) {.gcsafe, raises: [].} =
## Inspect Relay events for health-update relevance in Core (Relay) mode.
##
## For Core (Relay) mode, the connectivity health state is mostly determined
## by the relay protocol state (it is the dominant factor), and we know
## that a peer Relay can only affect this Relay's health if there is a
## subscription change or a mesh (GRAFT/PRUNE) change.
##
if msg.subscriptions.len == 0:
if msg.control.isNone():
return
let ctrl = msg.control.get()
if ctrl.graft.len == 0 and ctrl.prune.len == 0:
return
hm.healthUpdateEvent.fire()
proc healthLoop(hm: NodeHealthMonitor) {.async.} =
## Re-evaluate the global health state of the node when notified of a potential change,
## and call back the application if an actual change from the last notified state happened.
info "Health monitor loop start"
while true:
try:
await hm.healthUpdateEvent.wait()
hm.healthUpdateEvent.clear()
hm.cachedProtocols = await hm.getAllProtocolHealthInfo()
let newConnectionStatus = hm.calculateConnectionState()
if newConnectionStatus != hm.connectionStatus:
hm.connectionStatus = newConnectionStatus
EventConnectionStatusChange.emit(hm.node.brokerCtx, newConnectionStatus)
if not isNil(hm.onConnectionStatusChange):
await hm.onConnectionStatusChange(newConnectionStatus)
except CancelledError:
break
except Exception as e:
error "HealthMonitor: error in update loop", error = e.msg
# safety cooldown to protect from edge cases
await sleepAsync(100.milliseconds)
info "Health monitor loop end"
proc selectRandomPeersForKeepalive(
node: WakuNode, outPeers: seq[PeerId], numRandomPeers: int
): Future[seq[PeerId]] {.async.} =
## Select peers for random keepalive, prioritizing mesh peers
if node.wakuRelay.isNil():
if isNil(node.wakuRelay):
return selectRandomPeers(outPeers, numRandomPeers)
let meshPeers = node.wakuRelay.getPeersInMesh().valueOr:
@ -359,45 +683,55 @@ proc startKeepalive*(
hm.keepAliveFut = hm.node.keepAliveLoop(randomPeersKeepalive, allPeersKeepalive)
return ok()
proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} =
var report: HealthReport
report.nodeHealth = hm.nodeHealth
let relayHealth = hm.getRelayHealth()
report.protocolsHealth.add(relayHealth)
report.protocolsHealth.add(await hm.getRlnRelayHealth())
report.protocolsHealth.add(hm.getLightpushHealth(relayHealth.health))
report.protocolsHealth.add(hm.getLegacyLightpushHealth(relayHealth.health))
report.protocolsHealth.add(hm.getFilterHealth(relayHealth.health))
report.protocolsHealth.add(hm.getStoreHealth())
report.protocolsHealth.add(hm.getLegacyStoreHealth())
report.protocolsHealth.add(hm.getPeerExchangeHealth())
report.protocolsHealth.add(hm.getRendezvousHealth())
report.protocolsHealth.add(hm.getMixHealth())
report.protocolsHealth.add(hm.getLightpushClientHealth(relayHealth.health))
report.protocolsHealth.add(hm.getLegacyLightpushClientHealth(relayHealth.health))
report.protocolsHealth.add(hm.getStoreClientHealth())
report.protocolsHealth.add(hm.getLegacyStoreClientHealth())
report.protocolsHealth.add(hm.getFilterClientHealth(relayHealth.health))
return report
proc setOverallHealth*(hm: NodeHealthMonitor, health: HealthStatus) =
hm.nodeHealth = health
proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] =
hm.onlineMonitor.startOnlineMonitor()
if isNil(hm.node.peerManager):
return err("startHealthMonitor: no node peerManager to monitor")
if not isNil(hm.node.wakuRelay):
hm.relayObserver = PubSubObserver(
onRecv: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} =
hm.onRelayMsg(peer, msgs)
)
hm.node.wakuRelay.addObserver(hm.relayObserver)
hm.peerEventListener = EventWakuPeer.listen(
hm.node.brokerCtx,
proc(evt: EventWakuPeer): Future[void] {.async: (raises: []), gcsafe.} =
## Recompute health on any peer changing anything (join, leave, identify, metadata update)
hm.healthUpdateEvent.fire(),
).valueOr:
return err("Failed to subscribe to peer events: " & error)
hm.healthUpdateEvent = newAsyncEvent()
hm.healthUpdateEvent.fire()
hm.healthLoopFut = hm.healthLoop()
hm.startKeepalive().isOkOr:
return err("startHealthMonitor: failed starting keep alive: " & error)
return ok()
proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
if not hm.onlineMonitor.isNil():
if not isNil(hm.onlineMonitor):
await hm.onlineMonitor.stopOnlineMonitor()
if not hm.keepAliveFut.isNil():
if not isNil(hm.keepAliveFut):
await hm.keepAliveFut.cancelAndWait()
if not isNil(hm.healthLoopFut):
await hm.healthLoopFut.cancelAndWait()
if hm.peerEventListener.id != 0:
EventWakuPeer.dropListener(hm.node.brokerCtx, hm.peerEventListener)
if not isNil(hm.node.wakuRelay) and not isNil(hm.relayObserver):
hm.node.wakuRelay.removeObserver(hm.relayObserver)
proc new*(
T: type NodeHealthMonitor,
node: WakuNode,
@ -406,4 +740,10 @@ proc new*(
let om = OnlineMonitor.init(dnsNameServers)
om.setPeerStoreToOnlineMonitor(node.switch.peerStore)
om.addOnlineStateObserver(node.peerManager.getOnlineStateObserver())
T(nodeHealth: INITIALIZING, node: node, onlineMonitor: om)
T(
nodeHealth: INITIALIZING,
node: node,
onlineMonitor: om,
connectionStatus: ConnectionStatus.Disconnected,
strength: initTable[WakuProtocol, int](),
)

View File

@ -1,5 +1,8 @@
import std/[options, strformat]
import ./health_status
import waku/common/waku_protocol
export waku_protocol
type ProtocolHealth* = object
protocol*: string
@ -39,8 +42,7 @@ proc shuttingDown*(p: var ProtocolHealth): ProtocolHealth =
proc `$`*(p: ProtocolHealth): string =
return fmt"protocol: {p.protocol}, health: {p.health}, description: {p.desc}"
proc init*(p: typedesc[ProtocolHealth], protocol: string): ProtocolHealth =
let p = ProtocolHealth(
protocol: protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]()
proc init*(p: typedesc[ProtocolHealth], protocol: WakuProtocol): ProtocolHealth =
return ProtocolHealth(
protocol: $protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]()
)
return p

View File

@ -1,27 +1,31 @@
{.push raises: [].}
import
std/[options, sets, sequtils, times, strformat, strutils, math, random, tables],
std/
[
options, sets, sequtils, times, strformat, strutils, math, random, tables,
algorithm,
],
chronos,
chronicles,
metrics,
libp2p/multistream,
libp2p/muxers/muxer,
libp2p/nameresolving/nameresolver,
libp2p/peerstore
import
../../common/nimchronos,
../../common/enr,
../../common/callbacks,
../../common/utils/parse_size_units,
../../waku_core,
../../waku_relay,
../../waku_relay/protocol,
../../waku_enr/sharding,
../../waku_enr/capabilities,
../../waku_metadata,
../health_monitor/online_monitor,
libp2p/[multistream, muxers/muxer, nameresolving/nameresolver, peerstore],
waku/[
waku_core,
waku_relay,
waku_metadata,
waku_core/topics/sharding,
waku_relay/protocol,
waku_enr/sharding,
waku_enr/capabilities,
events/peer_events,
common/nimchronos,
common/enr,
common/callbacks,
common/utils/parse_size_units,
common/broker/broker_context,
node/health_monitor/online_monitor,
],
./peer_store/peer_storage,
./waku_peer_store
@ -84,6 +88,7 @@ type ConnectionChangeHandler* = proc(
): Future[void] {.gcsafe, raises: [Defect].}
type PeerManager* = ref object of RootObj
brokerCtx: BrokerContext
switch*: Switch
wakuMetadata*: WakuMetadata
initialBackoffInSec*: int
@ -483,8 +488,9 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool =
proc connectedPeers*(
pm: PeerManager, protocol: string = ""
): (seq[PeerId], seq[PeerId]) =
## Returns the peerIds of physical connections (in and out)
## If a protocol is specified, only returns peers with at least one stream of that protocol
## Returns the PeerIds of peers with an active socket connection.
## If a protocol is specified, it returns peers that currently have one
## or more active logical streams for that protocol.
var inPeers: seq[PeerId]
var outPeers: seq[PeerId]
@ -500,6 +506,65 @@ proc connectedPeers*(
return (inPeers, outPeers)
proc capablePeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) =
## Returns the PeerIds of peers with an active socket connection.
## If a protocol is specified, it returns peers that have identified
## themselves as supporting the protocol.
var inPeers: seq[PeerId]
var outPeers: seq[PeerId]
for peerId, muxers in pm.switch.connManager.getConnections():
# filter out peers that don't have the capability registered in the peer store
if pm.switch.peerStore.hasPeer(peerId, protocol):
for peerConn in muxers:
if peerConn.connection.transportDir == Direction.In:
inPeers.add(peerId)
elif peerConn.connection.transportDir == Direction.Out:
outPeers.add(peerId)
return (inPeers, outPeers)
proc getConnectedPeersCount*(pm: PeerManager, protocol: string): int =
## Returns the total number of unique connected peers (inbound + outbound)
## with active streams for a specific protocol.
let (inPeers, outPeers) = pm.connectedPeers(protocol)
var peers = initHashSet[PeerId](nextPowerOfTwo(inPeers.len + outPeers.len))
for p in inPeers:
peers.incl(p)
for p in outPeers:
peers.incl(p)
return peers.len
proc getCapablePeersCount*(pm: PeerManager, protocol: string): int =
## Returns the total number of unique connected peers (inbound + outbound)
## who have identified themselves as supporting the given protocol.
let (inPeers, outPeers) = pm.capablePeers(protocol)
var peers = initHashSet[PeerId](nextPowerOfTwo(inPeers.len + outPeers.len))
for p in inPeers:
peers.incl(p)
for p in outPeers:
peers.incl(p)
return peers.len
proc getPeersForShard*(pm: PeerManager, protocolId: string, shard: PubsubTopic): int =
let (inPeers, outPeers) = pm.connectedPeers(protocolId)
let connectedProtocolPeers = inPeers & outPeers
if connectedProtocolPeers.len == 0:
return 0
let shardInfo = RelayShard.parse(shard).valueOr:
# count raw peers of the given protocol if for some reason we can't get
# a shard mapping out of the gossipsub topic string.
return connectedProtocolPeers.len
var shardPeers = 0
for peerId in connectedProtocolPeers:
if pm.switch.peerStore.hasShard(peerId, shardInfo.clusterId, shardInfo.shardId):
shardPeers.inc()
return shardPeers
proc disconnectAllPeers*(pm: PeerManager) {.async.} =
let (inPeerIds, outPeerIds) = pm.connectedPeers()
let connectedPeers = concat(inPeerIds, outPeerIds)
@ -635,7 +700,7 @@ proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] =
# Event Handling #
#~~~~~~~~~~~~~~~~~#
proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
proc refreshPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
let res = catch:
await pm.switch.dial(peerId, WakuMetadataCodec)
@ -664,6 +729,10 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
let shards = metadata.shards.mapIt(it.uint16)
pm.switch.peerStore.setShardInfo(peerId, shards)
# TODO: should only trigger an event if metadata actually changed
# should include the shard subscription delta in the event when
# it is a MetadataUpdated event
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventMetadataUpdated)
return
info "disconnecting from peer", peerId = peerId, reason = reason
@ -673,14 +742,14 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} =
# called when a peer i) first connects to us ii) disconnects all connections from us
proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined:
await pm.onPeerMetadata(peerId)
await pm.refreshPeerMetadata(peerId)
var peerStore = pm.switch.peerStore
var direction: PeerDirection
var connectedness: Connectedness
case event.kind
of Joined:
of PeerEventKind.Joined:
direction = if event.initiator: Outbound else: Inbound
connectedness = Connected
@ -708,10 +777,12 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
asyncSpawn(pm.switch.disconnect(peerId))
peerStore.delete(peerId)
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventConnected)
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
asyncSpawn pm.onConnectionChange(peerId, Joined)
of Left:
of PeerEventKind.Left:
direction = UnknownDirection
connectedness = CanConnect
@ -723,12 +794,16 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
pm.ipTable.del(ip)
break
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventDisconnected)
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
asyncSpawn pm.onConnectionChange(peerId, Left)
of Identified:
of PeerEventKind.Identified:
info "event identified", peerId = peerId
EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventIdentified)
peerStore[ConnectionBook][peerId] = connectedness
peerStore[DirectionBook][peerId] = direction
@ -1085,8 +1160,11 @@ proc new*(
error "Max backoff time can't be over 1 week", maxBackoff = backoff
raise newException(Defect, "Max backoff time can't be over 1 week")
let brokerCtx = globalBrokerContext()
let pm = PeerManager(
switch: switch,
brokerCtx: brokerCtx,
wakuMetadata: wakuMetadata,
storage: storage,
initialBackoffInSec: initialBackoffInSec,

View File

@ -162,7 +162,9 @@ proc connectedness*(peerStore: PeerStore, peerId: PeerId): Connectedness =
peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected)
proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool =
peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard)
return
peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard) or
peerStore[ShardBook].book.getOrDefault(peerId, @[]).contains(shard)
proc hasCapability*(peerStore: PeerStore, peerId: PeerID, cap: Capabilities): bool =
peerStore[ENRBook].book.getOrDefault(peerId).supportsCapability(cap)
@ -219,7 +221,8 @@ proc getPeersByShard*(
peerStore: PeerStore, cluster, shard: uint16
): seq[RemotePeerInfo] =
return peerStore.peers.filterIt(
it.enr.isSome() and it.enr.get().containsShard(cluster, shard)
(it.enr.isSome() and it.enr.get().containsShard(cluster, shard)) or
it.shards.contains(shard)
)
proc getPeersByCapability*(

View File

@ -42,6 +42,7 @@ import
waku_store/resume,
waku_store_sync,
waku_filter_v2,
waku_filter_v2/common as filter_common,
waku_filter_v2/client as filter_client,
waku_metadata,
waku_rendezvous/protocol,
@ -57,12 +58,18 @@ import
common/rate_limit/setting,
common/callbacks,
common/nimchronos,
common/broker/broker_context,
common/broker/request_broker,
waku_mix,
requests/node_requests,
common/broker/broker_context,
requests/health_requests,
events/health_events,
events/peer_events,
],
./net_config,
./peer_manager
./peer_manager,
./health_monitor/health_status,
./health_monitor/topic_health
declarePublicCounter waku_node_messages, "number of messages received", ["type"]
@ -91,6 +98,9 @@ const clientId* = "Nimbus Waku v2 node"
const WakuNodeVersionString* = "version / git commit hash: " & git_version
const EdgeTopicHealthyThreshold = 2
## Lightpush server and filter server requirement for a healthy topic in edge mode
# key and crypto modules different
type
# TODO: Move to application instance (e.g., `WakuNode2`)
@ -135,6 +145,10 @@ type
topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent]
rateLimitSettings*: ProtocolRateLimitSettings
wakuMix*: WakuMix
edgeTopicsHealth*: Table[PubsubTopic, TopicHealth]
edgeHealthEvent*: AsyncEvent
edgeHealthLoop: Future[void]
peerEventListener*: EventWakuPeerListener
proc deduceRelayShard(
node: WakuNode,
@ -469,7 +483,52 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string]
return ok()
proc startProvidersAndListeners(node: WakuNode) =
proc calculateEdgeTopicHealth(node: WakuNode, shard: PubsubTopic): TopicHealth =
let filterPeers =
node.peerManager.getPeersForShard(filter_common.WakuFilterSubscribeCodec, shard)
let lightpushPeers =
node.peerManager.getPeersForShard(lightpush_protocol.WakuLightPushCodec, shard)
if filterPeers >= EdgeTopicHealthyThreshold and
lightpushPeers >= EdgeTopicHealthyThreshold:
return TopicHealth.SUFFICIENTLY_HEALTHY
elif filterPeers > 0 and lightpushPeers > 0:
return TopicHealth.MINIMALLY_HEALTHY
return TopicHealth.UNHEALTHY
proc loopEdgeHealth(node: WakuNode) {.async.} =
while node.started:
await node.edgeHealthEvent.wait()
node.edgeHealthEvent.clear()
try:
for shard in node.edgeTopicsHealth.keys:
if not node.wakuRelay.isNil and node.wakuRelay.isSubscribed(shard):
continue
let oldHealth = node.edgeTopicsHealth.getOrDefault(shard, TopicHealth.UNHEALTHY)
let newHealth = node.calculateEdgeTopicHealth(shard)
if newHealth != oldHealth:
node.edgeTopicsHealth[shard] = newHealth
EventShardTopicHealthChange.emit(node.brokerCtx, shard, newHealth)
except CancelledError:
break
except CatchableError as e:
warn "Error in edge health check", error = e.msg
# safety cooldown to protect from edge cases
await sleepAsync(100.milliseconds)
proc startProvidersAndListeners*(node: WakuNode) =
node.peerEventListener = EventWakuPeer.listen(
node.brokerCtx,
proc(evt: EventWakuPeer) {.async: (raises: []), gcsafe.} =
node.edgeHealthEvent.fire(),
).valueOr:
error "Failed to listen to peer events", error = error
return
RequestRelayShard.setProvider(
node.brokerCtx,
proc(
@ -481,8 +540,60 @@ proc startProvidersAndListeners(node: WakuNode) =
).isOkOr:
error "Can't set provider for RequestRelayShard", error = error
proc stopProvidersAndListeners(node: WakuNode) =
RequestShardTopicsHealth.setProvider(
node.brokerCtx,
proc(topics: seq[PubsubTopic]): Result[RequestShardTopicsHealth, string] =
var response: RequestShardTopicsHealth
for shard in topics:
var healthStatus = TopicHealth.UNHEALTHY
if not node.wakuRelay.isNil:
healthStatus =
node.wakuRelay.topicsHealth.getOrDefault(shard, TopicHealth.NOT_SUBSCRIBED)
if healthStatus == TopicHealth.NOT_SUBSCRIBED:
healthStatus = node.calculateEdgeTopicHealth(shard)
response.topicHealth.add((shard, healthStatus))
return ok(response),
).isOkOr:
error "Can't set provider for RequestShardTopicsHealth", error = error
RequestContentTopicsHealth.setProvider(
node.brokerCtx,
proc(topics: seq[ContentTopic]): Result[RequestContentTopicsHealth, string] =
var response: RequestContentTopicsHealth
for contentTopic in topics:
var topicHealth = TopicHealth.NOT_SUBSCRIBED
let shardResult = node.deduceRelayShard(contentTopic, none[PubsubTopic]())
if shardResult.isOk():
let shardObj = shardResult.get()
let pubsubTopic = $shardObj
if not isNil(node.wakuRelay):
topicHealth = node.wakuRelay.topicsHealth.getOrDefault(
pubsubTopic, TopicHealth.NOT_SUBSCRIBED
)
if topicHealth == TopicHealth.NOT_SUBSCRIBED and
pubsubTopic in node.edgeTopicsHealth:
topicHealth = node.calculateEdgeTopicHealth(pubsubTopic)
response.contentTopicHealth.add((topic: contentTopic, health: topicHealth))
return ok(response),
).isOkOr:
error "Can't set provider for RequestContentTopicsHealth", error = error
proc stopProvidersAndListeners*(node: WakuNode) =
EventWakuPeer.dropListener(node.brokerCtx, node.peerEventListener)
RequestRelayShard.clearProvider(node.brokerCtx)
RequestContentTopicsHealth.clearProvider(node.brokerCtx)
RequestShardTopicsHealth.clearProvider(node.brokerCtx)
proc start*(node: WakuNode) {.async.} =
## Starts a created Waku Node and
@ -532,6 +643,9 @@ proc start*(node: WakuNode) {.async.} =
## The switch will update addresses after start using the addressMapper
await node.switch.start()
node.edgeHealthEvent = newAsyncEvent()
node.edgeHealthLoop = loopEdgeHealth(node)
node.startProvidersAndListeners()
node.started = true
@ -549,6 +663,10 @@ proc stop*(node: WakuNode) {.async.} =
node.stopProvidersAndListeners()
if not node.edgeHealthLoop.isNil:
await node.edgeHealthLoop.cancelAndWait()
node.edgeHealthLoop = nil
await node.switch.stop()
node.peerManager.stop()

View File

@ -1,21 +0,0 @@
import waku/common/broker/[request_broker, multi_request_broker]
import waku/api/types
import waku/node/health_monitor/[protocol_health, topic_health]
import waku/waku_core/topics
export protocol_health, topic_health
RequestBroker(sync):
type RequestNodeHealth* = object
healthStatus*: NodeHealth
RequestBroker(sync):
type RequestRelayTopicsHealth* = object
topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]]
proc signature(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string]
MultiRequestBroker:
type RequestProtocolHealth* = object
healthStatus*: ProtocolHealth

View File

@ -0,0 +1,39 @@
import waku/common/broker/request_broker
import waku/api/types
import waku/node/health_monitor/[protocol_health, topic_health, health_report]
import waku/waku_core/topics
import waku/common/waku_protocol
export protocol_health, topic_health
# Get the overall node connectivity status
RequestBroker(sync):
type RequestConnectionStatus* = object
connectionStatus*: ConnectionStatus
# Get the health status of a set of content topics
RequestBroker(sync):
type RequestContentTopicsHealth* = object
contentTopicHealth*: seq[tuple[topic: ContentTopic, health: TopicHealth]]
proc signature(topics: seq[ContentTopic]): Result[RequestContentTopicsHealth, string]
# Get a consolidated node health report
RequestBroker:
type RequestHealthReport* = object
healthReport*: HealthReport
# Get the health status of a set of shards (pubsub topics)
RequestBroker(sync):
type RequestShardTopicsHealth* = object
topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]]
proc signature(topics: seq[PubsubTopic]): Result[RequestShardTopicsHealth, string]
# Get the health status of a mounted protocol
RequestBroker:
type RequestProtocolHealth* = object
healthStatus*: ProtocolHealth
proc signature(protocol: WakuProtocol): Future[Result[RequestProtocolHealth, string]]

View File

@ -1,3 +1,3 @@
import ./[health_request, rln_requests, node_requests]
import ./[health_requests, rln_requests, node_requests]
export health_request, rln_requests, node_requests
export health_requests, rln_requests, node_requests

View File

@ -2,7 +2,8 @@
import results
import chronicles, json_serialization, json_serialization/std/options
import ../../../waku_node, ../serdes
import ../serdes
import waku/[waku_node, api/types]
#### Serialization and deserialization
@ -44,6 +45,7 @@ proc writeValue*(
) {.raises: [IOError].} =
writer.beginRecord()
writer.writeField("nodeHealth", $value.nodeHealth)
writer.writeField("connectionStatus", $value.connectionStatus)
writer.writeField("protocolsHealth", value.protocolsHealth)
writer.endRecord()
@ -52,6 +54,7 @@ proc readValue*(
) {.raises: [SerializationError, IOError].} =
var
nodeHealth: Option[HealthStatus]
connectionStatus: Option[ConnectionStatus]
protocolsHealth: Option[seq[ProtocolHealth]]
for fieldName in readObjectFields(reader):
@ -66,6 +69,16 @@ proc readValue*(
reader.raiseUnexpectedValue("Invalid `health` value: " & $error)
nodeHealth = some(health)
of "connectionStatus":
if connectionStatus.isSome():
reader.raiseUnexpectedField(
"Multiple `connectionStatus` fields found", "HealthReport"
)
let state = ConnectionStatus.init(reader.readValue(string)).valueOr:
reader.raiseUnexpectedValue("Invalid `connectionStatus` value: " & $error)
connectionStatus = some(state)
of "protocolsHealth":
if protocolsHealth.isSome():
reader.raiseUnexpectedField(
@ -79,5 +92,8 @@ proc readValue*(
if nodeHealth.isNone():
reader.raiseUnexpectedValue("Field `nodeHealth` is missing")
value =
HealthReport(nodeHealth: nodeHealth.get, protocolsHealth: protocolsHealth.get(@[]))
value = HealthReport(
nodeHealth: nodeHealth.get,
connectionStatus: connectionStatus.get,
protocolsHealth: protocolsHealth.get(@[]),
)

View File

@ -5,7 +5,7 @@
{.push raises: [].}
import
std/[strformat, strutils],
std/[strformat, strutils, sets],
stew/byteutils,
results,
sequtils,
@ -21,11 +21,13 @@ import
import
waku/waku_core,
waku/node/health_monitor/topic_health,
waku/requests/health_request,
waku/requests/health_requests,
waku/events/health_events,
./message_id,
waku/common/broker/broker_context
waku/common/broker/broker_context,
waku/events/peer_events
from ../waku_core/codecs import WakuRelayCodec
from waku/waku_core/codecs import WakuRelayCodec
export WakuRelayCodec
type ShardMetrics = object
@ -154,6 +156,8 @@ type
pubsubTopic: PubsubTopic, message: WakuMessage
): Future[ValidationResult] {.gcsafe, raises: [Defect].}
WakuRelay* = ref object of GossipSub
brokerCtx: BrokerContext
peerEventListener: EventWakuPeerListener
# seq of tuples: the first entry in the tuple contains the validators are called for every topic
# the second entry contains the error messages to be returned when the validator fails
wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]]
@ -165,6 +169,11 @@ type
topicsHealth*: Table[string, TopicHealth]
onTopicHealthChange*: TopicHealthChangeHandler
topicHealthLoopHandle*: Future[void]
topicHealthUpdateEvent: AsyncEvent
topicHealthDirty: HashSet[string]
# list of topics that need their health updated in the update event
topicHealthCheckAll: bool
# true if all topics need to have their health status refreshed in the update event
msgMetricsPerShard*: Table[string, ShardMetrics]
# predefinition for more detailed results from publishing new message
@ -287,6 +296,21 @@ proc initRelayObservers(w: WakuRelay) =
)
proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) =
if msgs.control.isSome():
let ctrl = msgs.control.get()
var topicsChanged = false
for graft in ctrl.graft:
w.topicHealthDirty.incl(graft.topicID)
topicsChanged = true
for prune in ctrl.prune:
w.topicHealthDirty.incl(prune.topicID)
topicsChanged = true
if topicsChanged:
w.topicHealthUpdateEvent.fire()
for msg in msgs.messages:
let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr:
continue
@ -325,18 +349,6 @@ proc initRelayObservers(w: WakuRelay) =
w.addObserver(administrativeObserver)
proc initRequestProviders(w: WakuRelay) =
RequestRelayTopicsHealth.setProvider(
globalBrokerContext(),
proc(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] =
var collectedRes: RequestRelayTopicsHealth
for topic in topics:
let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED)
collectedRes.topicHealth.add((topic, health))
return ok(collectedRes),
).isOkOr:
error "Cannot set Relay Topics Health request provider", error = error
proc new*(
T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize)
): WakuRelayResult[T] =
@ -354,12 +366,25 @@ proc new*(
maxMessageSize = maxMessageSize,
parameters = GossipsubParameters,
)
w.brokerCtx = globalBrokerContext()
procCall GossipSub(w).initPubSub()
w.topicsHealth = initTable[string, TopicHealth]()
w.topicHealthUpdateEvent = newAsyncEvent()
w.topicHealthDirty = initHashSet[string]()
w.topicHealthCheckAll = false
w.initProtocolHandler()
w.initRelayObservers()
w.initRequestProviders()
w.peerEventListener = EventWakuPeer.listen(
w.brokerCtx,
proc(evt: EventWakuPeer): Future[void] {.async: (raises: []), gcsafe.} =
if evt.kind == WakuPeerEventKind.EventDisconnected:
w.topicHealthCheckAll = true
w.topicHealthUpdateEvent.fire()
,
).valueOr:
return err("Failed to subscribe to peer events: " & error)
except InitializationError:
return err("initialization error: " & getCurrentExceptionMsg())
@ -437,38 +462,58 @@ proc calculateTopicHealth(wakuRelay: WakuRelay, topic: string): TopicHealth =
return TopicHealth.MINIMALLY_HEALTHY
return TopicHealth.SUFFICIENTLY_HEALTHY
proc updateTopicsHealth(wakuRelay: WakuRelay) {.async.} =
var futs = newSeq[Future[void]]()
for topic in toSeq(wakuRelay.topics.keys):
## loop over all the topics I'm subscribed to
let
oldHealth = wakuRelay.topicsHealth.getOrDefault(topic)
currentHealth = wakuRelay.calculateTopicHealth(topic)
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
GossipSub(w).topics.hasKey(topic)
if oldHealth == currentHealth:
continue
proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
return toSeq(GossipSub(w).topics.keys())
wakuRelay.topicsHealth[topic] = currentHealth
if not wakuRelay.onTopicHealthChange.isNil():
let fut = wakuRelay.onTopicHealthChange(topic, currentHealth)
if not fut.completed(): # Fast path for successful sync handlers
futs.add(fut)
proc topicsHealthLoop(w: WakuRelay) {.async.} =
while true:
await w.topicHealthUpdateEvent.wait()
w.topicHealthUpdateEvent.clear()
var topicsToCheck: seq[string]
if w.topicHealthCheckAll:
topicsToCheck = toSeq(w.topics.keys)
else:
topicsToCheck = toSeq(w.topicHealthDirty)
w.topicHealthCheckAll = false
w.topicHealthDirty.clear()
var futs = newSeq[Future[void]]()
for topic in topicsToCheck:
# guard against topic being unsubscribed since fire()
if not w.isSubscribed(topic):
continue
let
oldHealth = w.topicsHealth.getOrDefault(topic, TopicHealth.UNHEALTHY)
currentHealth = w.calculateTopicHealth(topic)
if oldHealth == currentHealth:
continue
w.topicsHealth[topic] = currentHealth
EventShardTopicHealthChange.emit(w.brokerCtx, topic, currentHealth)
if not w.onTopicHealthChange.isNil():
futs.add(w.onTopicHealthChange(topic, currentHealth))
if futs.len() > 0:
# slow path - we have to wait for the handlers to complete
try:
futs = await allFinished(futs)
discard await allFinished(futs)
except CancelledError:
# check for errors in futures
for fut in futs:
if fut.failed:
let err = fut.readError()
warn "Error in health change handler", description = err.msg
break
except CatchableError as e:
warn "Error in topic health callback", error = e.msg
proc topicsHealthLoop(wakuRelay: WakuRelay) {.async.} =
while true:
await wakuRelay.updateTopicsHealth()
await sleepAsync(10.seconds)
# safety cooldown to protect from edge cases
await sleepAsync(100.milliseconds)
method start*(w: WakuRelay) {.async, base.} =
info "start"
@ -478,15 +523,13 @@ method start*(w: WakuRelay) {.async, base.} =
method stop*(w: WakuRelay) {.async, base.} =
info "stop"
await procCall GossipSub(w).stop()
if w.peerEventListener.id != 0:
EventWakuPeer.dropListener(w.brokerCtx, w.peerEventListener)
if not w.topicHealthLoopHandle.isNil():
await w.topicHealthLoopHandle.cancelAndWait()
proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool =
GossipSub(w).topics.hasKey(topic)
proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] =
return toSeq(GossipSub(w).topics.keys())
proc generateOrderedValidator(w: WakuRelay): ValidatorHandler {.gcsafe.} =
# rejects messages that are not WakuMessage
let wrappedValidator = proc(
@ -584,7 +627,8 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle
procCall GossipSub(w).subscribe(pubsubTopic, topicHandler)
w.topicHandlers[pubsubTopic] = topicHandler
asyncSpawn w.updateTopicsHealth()
w.topicHealthDirty.incl(pubsubTopic)
w.topicHealthUpdateEvent.fire()
proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
## Unsubscribe all handlers on this pubsub topic
@ -594,6 +638,8 @@ proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) =
procCall GossipSub(w).unsubscribeAll(pubsubTopic)
w.topicValidator.del(pubsubTopic)
w.topicHandlers.del(pubsubTopic)
w.topicsHealth.del(pubsubTopic)
w.topicHealthDirty.excl(pubsubTopic)
proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
if not w.topicValidator.hasKey(pubsubTopic):
@ -619,6 +665,8 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) =
w.topicValidator.del(pubsubTopic)
w.topicHandlers.del(pubsubTopic)
w.topicsHealth.del(pubsubTopic)
w.topicHealthDirty.excl(pubsubTopic)
proc publish*(
w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage