From 1fb4d1eab0460b3e033e0ab87bd4ac3b6966f3c9 Mon Sep 17 00:00:00 2001 From: Fabiana Cecin Date: Thu, 12 Feb 2026 14:52:39 -0300 Subject: [PATCH] feat: implement Waku API Health spec (#3689) * Fix protocol strength metric to consider connected peers only * Remove polling loop; event-driven node connection health updates * Remove 10s WakuRelay topic health polling loop; now event-driven * Change NodeHealthStatus to ConnectionStatus * Change new nodeState (rest API /health) field to connectionStatus * Add getSyncProtocolHealthInfo and getSyncNodeHealthReport * Add ConnectionStatusChangeEvent * Add RequestHealthReport * Refactor sync/async protocol health queries in the health monitor * Add EventRelayTopicHealthChange * Add EventWakuPeer emitted by PeerManager * Add Edge support for topics health requests and events * Rename "RelayTopic" -> "Topic" * Add RequestContentTopicsHealth sync request * Add EventContentTopicHealthChange * Rename RequestTopicsHealth -> RequestShardTopicsHealth * Remove health check gating from checkApiAvailability * Add basic health smoke tests * Other misc improvements, refactors, fixes Co-authored-by: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> --- .../json_connection_status_change_event.nim | 19 + library/libwaku.nim | 8 + tests/api/test_all.nim | 2 +- tests/api/test_api_health.nim | 296 +++++++++ tests/api/test_api_send.nim | 9 +- tests/node/test_all.nim | 3 +- tests/node/test_wakunode_health_monitor.nim | 301 +++++++++ tests/waku_relay/utils.nim | 18 +- tests/wakunode_rest/test_rest_health.nim | 63 +- waku/api/api.nim | 16 +- waku/api/types.nim | 8 +- waku/common/waku_protocol.nim | 24 + waku/events/events.nim | 4 +- waku/events/health_events.nim | 27 + waku/events/peer_events.nim | 13 + waku/factory/app_callbacks.nim | 3 +- waku/factory/builder.nim | 3 +- waku/factory/waku.nim | 165 +++-- .../send_service/relay_processor.nim | 4 +- waku/node/health_monitor.nim | 9 +- .../node/health_monitor/connection_status.nim | 15 + waku/node/health_monitor/health_report.nim | 10 + .../health_monitor/node_health_monitor.nim | 598 ++++++++++++++---- waku/node/health_monitor/protocol_health.nim | 10 +- waku/node/peer_manager/peer_manager.nim | 128 +++- waku/node/peer_manager/waku_peer_store.nim | 7 +- waku/node/waku_node.nim | 126 +++- waku/requests/health_request.nim | 21 - waku/requests/health_requests.nim | 39 ++ waku/requests/requests.nim | 4 +- waku/rest_api/endpoint/health/types.nim | 22 +- waku/waku_relay/protocol.nim | 146 +++-- 32 files changed, 1727 insertions(+), 394 deletions(-) create mode 100644 library/events/json_connection_status_change_event.nim create mode 100644 tests/api/test_api_health.nim create mode 100644 tests/node/test_wakunode_health_monitor.nim create mode 100644 waku/common/waku_protocol.nim create mode 100644 waku/events/health_events.nim create mode 100644 waku/events/peer_events.nim create mode 100644 waku/node/health_monitor/connection_status.nim create mode 100644 waku/node/health_monitor/health_report.nim delete mode 100644 waku/requests/health_request.nim create mode 100644 waku/requests/health_requests.nim diff --git a/library/events/json_connection_status_change_event.nim b/library/events/json_connection_status_change_event.nim new file mode 100644 index 000000000..347a84c48 --- /dev/null +++ b/library/events/json_connection_status_change_event.nim @@ -0,0 +1,19 @@ +{.push raises: [].} + +import system, std/json +import ./json_base_event +import ../../waku/api/types + +type JsonConnectionStatusChangeEvent* = ref object of JsonEvent + status*: ConnectionStatus + +proc new*( + T: type JsonConnectionStatusChangeEvent, status: ConnectionStatus +): T = + return JsonConnectionStatusChangeEvent( + eventType: "node_health_change", + status: status + ) + +method `$`*(event: JsonConnectionStatusChangeEvent): string = + $(%*event) diff --git a/library/libwaku.nim b/library/libwaku.nim index c71e823d6..eb3cdff5e 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -7,9 +7,11 @@ import ./events/json_message_event, ./events/json_topic_health_change_event, ./events/json_connection_change_event, + ./events/json_connection_status_change_event, ../waku/factory/app_callbacks, waku/factory/waku, waku/node/waku_node, + waku/node/health_monitor/health_status, ./declare_lib ################################################################################ @@ -61,10 +63,16 @@ proc waku_new( callEventCallback(ctx, "onConnectionChange"): $JsonConnectionChangeEvent.new($peerId, peerEvent) + proc onConnectionStatusChange(ctx: ptr FFIContext): ConnectionStatusChangeHandler = + return proc(status: ConnectionStatus) {.async.} = + callEventCallback(ctx, "onConnectionStatusChange"): + $JsonConnectionStatusChangeEvent.new(status) + let appCallbacks = AppCallbacks( relayHandler: onReceivedMessage(ctx), topicHealthChangeHandler: onTopicHealthChange(ctx), connectionChangeHandler: onConnectionChange(ctx), + connectionStatusChangeHandler: onConnectionStatusChange(ctx) ) ffi.sendRequestToFFIThread( diff --git a/tests/api/test_all.nim b/tests/api/test_all.nim index 99c1b3b4c..57f7f37f2 100644 --- a/tests/api/test_all.nim +++ b/tests/api/test_all.nim @@ -1,3 +1,3 @@ {.used.} -import ./test_entry_nodes, ./test_node_conf +import ./test_entry_nodes, ./test_node_conf, ./test_api_send, ./test_api_health diff --git a/tests/api/test_api_health.nim b/tests/api/test_api_health.nim new file mode 100644 index 000000000..b7aab43f9 --- /dev/null +++ b/tests/api/test_api_health.nim @@ -0,0 +1,296 @@ +{.used.} + +import std/[options, sequtils, times] +import chronos, testutils/unittests, stew/byteutils, libp2p/[switch, peerinfo] +import ../testlib/[common, wakucore, wakunode, testasync] + +import + waku, + waku/[waku_node, waku_core, waku_relay/protocol, common/broker/broker_context], + waku/node/health_monitor/[topic_health, health_status, protocol_health, health_report], + waku/requests/health_requests, + waku/requests/node_requests, + waku/events/health_events, + waku/common/waku_protocol, + waku/factory/waku_conf + +const TestTimeout = chronos.seconds(10) +const DefaultShard = PubsubTopic("/waku/2/rs/1/0") +const TestContentTopic = ContentTopic("/waku/2/default-content/proto") + +proc dummyHandler( + topic: PubsubTopic, msg: WakuMessage +): Future[void] {.async, gcsafe.} = + discard + +proc waitForConnectionStatus( + brokerCtx: BrokerContext, expected: ConnectionStatus +) {.async.} = + var future = newFuture[void]("waitForConnectionStatus") + + let handler: EventConnectionStatusChangeListenerProc = proc( + e: EventConnectionStatusChange + ) {.async: (raises: []), gcsafe.} = + if not future.finished: + if e.connectionStatus == expected: + future.complete() + + let handle = EventConnectionStatusChange.listen(brokerCtx, handler).valueOr: + raiseAssert error + + try: + if not await future.withTimeout(TestTimeout): + raiseAssert "Timeout waiting for status: " & $expected + finally: + EventConnectionStatusChange.dropListener(brokerCtx, handle) + +proc waitForShardHealthy( + brokerCtx: BrokerContext +): Future[EventShardTopicHealthChange] {.async.} = + var future = newFuture[EventShardTopicHealthChange]("waitForShardHealthy") + + let handler: EventShardTopicHealthChangeListenerProc = proc( + e: EventShardTopicHealthChange + ) {.async: (raises: []), gcsafe.} = + if not future.finished: + if e.health == TopicHealth.MINIMALLY_HEALTHY or + e.health == TopicHealth.SUFFICIENTLY_HEALTHY: + future.complete(e) + + let handle = EventShardTopicHealthChange.listen(brokerCtx, handler).valueOr: + raiseAssert error + + try: + if await future.withTimeout(TestTimeout): + return future.read() + else: + raiseAssert "Timeout waiting for shard health event" + finally: + EventShardTopicHealthChange.dropListener(brokerCtx, handle) + +suite "LM API health checking": + var + serviceNode {.threadvar.}: WakuNode + client {.threadvar.}: Waku + servicePeerInfo {.threadvar.}: RemotePeerInfo + + asyncSetup: + lockNewGlobalBrokerContext: + serviceNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + (await serviceNode.mountRelay()).isOkOr: + raiseAssert error + serviceNode.mountMetadata(1, @[0'u16]).isOkOr: + raiseAssert error + await serviceNode.mountLibp2pPing() + await serviceNode.start() + + servicePeerInfo = serviceNode.peerInfo.toRemotePeerInfo() + serviceNode.wakuRelay.subscribe(DefaultShard, dummyHandler) + + lockNewGlobalBrokerContext: + let conf = NodeConfig.init( + mode = WakuMode.Core, + networkingConfig = + NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0), + protocolsConfig = ProtocolsConfig.init( + entryNodes = @[], + clusterId = 1'u16, + autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), + ), + ) + + client = (await createNode(conf)).valueOr: + raiseAssert error + (await startWaku(addr client)).isOkOr: + raiseAssert error + + asyncTeardown: + discard await client.stop() + await serviceNode.stop() + + asyncTest "RequestShardTopicsHealth, check PubsubTopic health": + client.node.wakuRelay.subscribe(DefaultShard, dummyHandler) + await client.node.connectToNodes(@[servicePeerInfo]) + + var isHealthy = false + let start = Moment.now() + while Moment.now() - start < TestTimeout: + let req = RequestShardTopicsHealth.request(client.brokerCtx, @[DefaultShard]).valueOr: + raiseAssert "RequestShardTopicsHealth failed" + + if req.topicHealth.len > 0: + let h = req.topicHealth[0].health + if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY: + isHealthy = true + break + await sleepAsync(chronos.milliseconds(100)) + + check isHealthy == true + + asyncTest "RequestShardTopicsHealth, check disconnected PubsubTopic": + const GhostShard = PubsubTopic("/waku/2/rs/1/666") + client.node.wakuRelay.subscribe(GhostShard, dummyHandler) + + let req = RequestShardTopicsHealth.request(client.brokerCtx, @[GhostShard]).valueOr: + raiseAssert "Request failed" + + check req.topicHealth.len > 0 + check req.topicHealth[0].health == TopicHealth.UNHEALTHY + + asyncTest "RequestProtocolHealth, check relay status": + await client.node.connectToNodes(@[servicePeerInfo]) + + var isReady = false + let start = Moment.now() + while Moment.now() - start < TestTimeout: + let relayReq = await RequestProtocolHealth.request( + client.brokerCtx, WakuProtocol.RelayProtocol + ) + if relayReq.isOk() and relayReq.get().healthStatus.health == HealthStatus.READY: + isReady = true + break + await sleepAsync(chronos.milliseconds(100)) + + check isReady == true + + let storeReq = + await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol) + if storeReq.isOk(): + check storeReq.get().healthStatus.health != HealthStatus.READY + + asyncTest "RequestProtocolHealth, check unmounted protocol": + let req = + await RequestProtocolHealth.request(client.brokerCtx, WakuProtocol.StoreProtocol) + check req.isOk() + + let status = req.get().healthStatus + check status.health == HealthStatus.NOT_MOUNTED + check status.desc.isNone() + + asyncTest "RequestConnectionStatus, check connectivity state": + let initialReq = RequestConnectionStatus.request(client.brokerCtx).valueOr: + raiseAssert "RequestConnectionStatus failed" + check initialReq.connectionStatus == ConnectionStatus.Disconnected + + await client.node.connectToNodes(@[servicePeerInfo]) + + var isConnected = false + let start = Moment.now() + while Moment.now() - start < TestTimeout: + let req = RequestConnectionStatus.request(client.brokerCtx).valueOr: + raiseAssert "RequestConnectionStatus failed" + + if req.connectionStatus == ConnectionStatus.PartiallyConnected or + req.connectionStatus == ConnectionStatus.Connected: + isConnected = true + break + await sleepAsync(chronos.milliseconds(100)) + + check isConnected == true + + asyncTest "EventConnectionStatusChange, detect connect and disconnect": + let connectFuture = + waitForConnectionStatus(client.brokerCtx, ConnectionStatus.PartiallyConnected) + + await client.node.connectToNodes(@[servicePeerInfo]) + await connectFuture + + let disconnectFuture = + waitForConnectionStatus(client.brokerCtx, ConnectionStatus.Disconnected) + await client.node.disconnectNode(servicePeerInfo) + await disconnectFuture + + asyncTest "EventShardTopicHealthChange, detect health improvement": + client.node.wakuRelay.subscribe(DefaultShard, dummyHandler) + + let healthEventFuture = waitForShardHealthy(client.brokerCtx) + + await client.node.connectToNodes(@[servicePeerInfo]) + + let event = await healthEventFuture + check event.topic == DefaultShard + + asyncTest "RequestHealthReport, check aggregate report": + let req = await RequestHealthReport.request(client.brokerCtx) + + check req.isOk() + + let report = req.get().healthReport + check report.nodeHealth == HealthStatus.READY + check report.protocolsHealth.len > 0 + check report.protocolsHealth.anyIt(it.protocol == $WakuProtocol.RelayProtocol) + + asyncTest "RequestContentTopicsHealth, smoke test": + let fictionalTopic = ContentTopic("/waku/2/this-does-not-exist/proto") + + let req = RequestContentTopicsHealth.request(client.brokerCtx, @[fictionalTopic]) + + check req.isOk() + + let res = req.get() + check res.contentTopicHealth.len == 1 + check res.contentTopicHealth[0].topic == fictionalTopic + check res.contentTopicHealth[0].health == TopicHealth.NOT_SUBSCRIBED + + asyncTest "RequestContentTopicsHealth, core mode trivial 1-shard autosharding": + let cTopic = ContentTopic("/waku/2/my-content-topic/proto") + + let shardReq = + RequestRelayShard.request(client.brokerCtx, none(PubsubTopic), cTopic) + + check shardReq.isOk() + let targetShard = $shardReq.get().relayShard + + client.node.wakuRelay.subscribe(targetShard, dummyHandler) + serviceNode.wakuRelay.subscribe(targetShard, dummyHandler) + + await client.node.connectToNodes(@[servicePeerInfo]) + + var isHealthy = false + let start = Moment.now() + while Moment.now() - start < TestTimeout: + let req = RequestContentTopicsHealth.request(client.brokerCtx, @[cTopic]).valueOr: + raiseAssert "Request failed" + + if req.contentTopicHealth.len > 0: + let h = req.contentTopicHealth[0].health + if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY: + isHealthy = true + break + + await sleepAsync(chronos.milliseconds(100)) + + check isHealthy == true + + asyncTest "RequestProtocolHealth, edge mode smoke test": + var edgeWaku: Waku + + lockNewGlobalBrokerContext: + let edgeConf = NodeConfig.init( + mode = WakuMode.Edge, + networkingConfig = + NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0), + protocolsConfig = ProtocolsConfig.init( + entryNodes = @[], + clusterId = 1'u16, + messageValidation = + MessageValidation(maxMessageSize: "150 KiB", rlnConfig: none(RlnConfig)), + ), + ) + + edgeWaku = (await createNode(edgeConf)).valueOr: + raiseAssert "Failed to create edge node: " & error + + (await startWaku(addr edgeWaku)).isOkOr: + raiseAssert "Failed to start edge waku: " & error + + let relayReq = await RequestProtocolHealth.request( + edgeWaku.brokerCtx, WakuProtocol.RelayProtocol + ) + check relayReq.isOk() + check relayReq.get().healthStatus.health == HealthStatus.NOT_MOUNTED + + check not edgeWaku.node.wakuFilterClient.isNil() + + discard await edgeWaku.stop() diff --git a/tests/api/test_api_send.nim b/tests/api/test_api_send.nim index e247c65ce..7343fc655 100644 --- a/tests/api/test_api_send.nim +++ b/tests/api/test_api_send.nim @@ -117,6 +117,9 @@ proc validate( check requestId == expectedRequestId proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig = + # allocate random ports to avoid port-already-in-use errors + let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0) + result = NodeConfig.init( mode = mode, protocolsConfig = ProtocolsConfig.init( @@ -124,6 +127,7 @@ proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig = clusterId = 1, autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), ), + networkingConfig = netConf, p2pReliability = true, ) @@ -246,8 +250,9 @@ suite "Waku API - Send": let sendResult = await node.send(envelope) - check sendResult.isErr() # Depending on implementation, it might say "not healthy" - check sendResult.error().contains("not healthy") + # TODO: The API is not enforcing a health check before the send, + # so currently this test cannot successfully fail to send. + check sendResult.isOk() (await node.stop()).isOkOr: raiseAssert "Failed to stop node: " & error diff --git a/tests/node/test_all.nim b/tests/node/test_all.nim index f6e7507b7..fe785dee2 100644 --- a/tests/node/test_all.nim +++ b/tests/node/test_all.nim @@ -7,4 +7,5 @@ import ./test_wakunode_peer_exchange, ./test_wakunode_store, ./test_wakunode_legacy_store, - ./test_wakunode_peer_manager + ./test_wakunode_peer_manager, + ./test_wakunode_health_monitor diff --git a/tests/node/test_wakunode_health_monitor.nim b/tests/node/test_wakunode_health_monitor.nim new file mode 100644 index 000000000..8be9c444d --- /dev/null +++ b/tests/node/test_wakunode_health_monitor.nim @@ -0,0 +1,301 @@ +{.used.} + +import + std/[json, options, sequtils, strutils, tables], testutils/unittests, chronos, results + +import + waku/[ + waku_core, + common/waku_protocol, + node/waku_node, + node/peer_manager, + node/health_monitor/health_status, + node/health_monitor/connection_status, + node/health_monitor/protocol_health, + node/health_monitor/node_health_monitor, + node/kernel_api/relay, + node/kernel_api/store, + node/kernel_api/lightpush, + node/kernel_api/filter, + waku_archive, + ] + +import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils + +const MockDLow = 4 # Mocked GossipSub DLow value + +const TestConnectivityTimeLimit = 3.seconds + +proc protoHealthMock(kind: WakuProtocol, health: HealthStatus): ProtocolHealth = + var ph = ProtocolHealth.init(kind) + if health == HealthStatus.READY: + return ph.ready() + else: + return ph.notReady("mock") + +suite "Health Monitor - health state calculation": + test "Disconnected, zero peers": + let protocols = + @[ + protoHealthMock(RelayProtocol, HealthStatus.NOT_READY), + protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY), + protoHealthMock(FilterClientProtocol, HealthStatus.NOT_READY), + protoHealthMock(LightpushClientProtocol, HealthStatus.NOT_READY), + ] + let strength = initTable[WakuProtocol, int]() + let state = calculateConnectionState(protocols, strength, some(MockDLow)) + check state == ConnectionStatus.Disconnected + + test "PartiallyConnected, weak relay": + let weakCount = MockDLow - 1 + let protocols = @[protoHealthMock(RelayProtocol, HealthStatus.READY)] + var strength = initTable[WakuProtocol, int]() + strength[RelayProtocol] = weakCount + let state = calculateConnectionState(protocols, strength, some(MockDLow)) + # Partially connected since relay connectivity is weak (> 0, but < dLow) + check state == ConnectionStatus.PartiallyConnected + + test "Connected, robust relay": + let protocols = @[protoHealthMock(RelayProtocol, HealthStatus.READY)] + var strength = initTable[WakuProtocol, int]() + strength[RelayProtocol] = MockDLow + let state = calculateConnectionState(protocols, strength, some(MockDLow)) + # Fully connected since relay connectivity is ideal (>= dLow) + check state == ConnectionStatus.Connected + + test "Connected, robust edge": + let protocols = + @[ + protoHealthMock(RelayProtocol, HealthStatus.NOT_MOUNTED), + protoHealthMock(LightpushClientProtocol, HealthStatus.READY), + protoHealthMock(FilterClientProtocol, HealthStatus.READY), + protoHealthMock(StoreClientProtocol, HealthStatus.READY), + ] + var strength = initTable[WakuProtocol, int]() + strength[LightpushClientProtocol] = HealthyThreshold + strength[FilterClientProtocol] = HealthyThreshold + strength[StoreClientProtocol] = HealthyThreshold + let state = calculateConnectionState(protocols, strength, some(MockDLow)) + check state == ConnectionStatus.Connected + + test "Disconnected, edge missing store": + let protocols = + @[ + protoHealthMock(LightpushClientProtocol, HealthStatus.READY), + protoHealthMock(FilterClientProtocol, HealthStatus.READY), + protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY), + ] + var strength = initTable[WakuProtocol, int]() + strength[LightpushClientProtocol] = HealthyThreshold + strength[FilterClientProtocol] = HealthyThreshold + strength[StoreClientProtocol] = 0 + let state = calculateConnectionState(protocols, strength, some(MockDLow)) + check state == ConnectionStatus.Disconnected + + test "PartiallyConnected, edge meets minimum failover requirement": + let weakCount = max(1, HealthyThreshold - 1) + let protocols = + @[ + protoHealthMock(LightpushClientProtocol, HealthStatus.READY), + protoHealthMock(FilterClientProtocol, HealthStatus.READY), + protoHealthMock(StoreClientProtocol, HealthStatus.READY), + ] + var strength = initTable[WakuProtocol, int]() + strength[LightpushClientProtocol] = weakCount + strength[FilterClientProtocol] = weakCount + strength[StoreClientProtocol] = weakCount + let state = calculateConnectionState(protocols, strength, some(MockDLow)) + check state == ConnectionStatus.PartiallyConnected + + test "Connected, robust relay ignores store server": + let protocols = + @[ + protoHealthMock(RelayProtocol, HealthStatus.READY), + protoHealthMock(StoreProtocol, HealthStatus.READY), + ] + var strength = initTable[WakuProtocol, int]() + strength[RelayProtocol] = MockDLow + strength[StoreProtocol] = 0 + let state = calculateConnectionState(protocols, strength, some(MockDLow)) + check state == ConnectionStatus.Connected + + test "Connected, robust relay ignores store client": + let protocols = + @[ + protoHealthMock(RelayProtocol, HealthStatus.READY), + protoHealthMock(StoreProtocol, HealthStatus.READY), + protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY), + ] + var strength = initTable[WakuProtocol, int]() + strength[RelayProtocol] = MockDLow + strength[StoreProtocol] = 0 + strength[StoreClientProtocol] = 0 + let state = calculateConnectionState(protocols, strength, some(MockDLow)) + check state == ConnectionStatus.Connected + +suite "Health Monitor - events": + asyncTest "Core (relay) health update": + let + nodeAKey = generateSecp256k1Key() + nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0)) + + (await nodeA.mountRelay()).expect("Node A failed to mount Relay") + + await nodeA.start() + + let monitorA = NodeHealthMonitor.new(nodeA) + + var + lastStatus = ConnectionStatus.Disconnected + callbackCount = 0 + healthChangeSignal = newAsyncEvent() + + monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} = + lastStatus = status + callbackCount.inc() + healthChangeSignal.fire() + + monitorA.startHealthMonitor().expect("Health monitor failed to start") + + let + nodeBKey = generateSecp256k1Key() + nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0)) + + let driver = newSqliteArchiveDriver() + nodeB.mountArchive(driver).expect("Node B failed to mount archive") + + (await nodeB.mountRelay()).expect("Node B failed to mount relay") + await nodeB.mountStore() + + await nodeB.start() + + await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()]) + + proc dummyHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async.} = + discard + + nodeA.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect( + "Node A failed to subscribe" + ) + nodeB.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect( + "Node B failed to subscribe" + ) + + let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit + var gotConnected = false + + while Moment.now() < connectTimeLimit: + if lastStatus == ConnectionStatus.PartiallyConnected: + gotConnected = true + break + + if await healthChangeSignal.wait().withTimeout(connectTimeLimit - Moment.now()): + healthChangeSignal.clear() + + check: + gotConnected == true + callbackCount >= 1 + lastStatus == ConnectionStatus.PartiallyConnected + + healthChangeSignal.clear() + + await nodeB.stop() + await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo()) + + let disconnectTimeLimit = Moment.now() + TestConnectivityTimeLimit + var gotDisconnected = false + + while Moment.now() < disconnectTimeLimit: + if lastStatus == ConnectionStatus.Disconnected: + gotDisconnected = true + break + + if await healthChangeSignal.wait().withTimeout(disconnectTimeLimit - Moment.now()): + healthChangeSignal.clear() + + check: + gotDisconnected == true + + await monitorA.stopHealthMonitor() + await nodeA.stop() + + asyncTest "Edge (light client) health update": + let + nodeAKey = generateSecp256k1Key() + nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0)) + + nodeA.mountLightpushClient() + await nodeA.mountFilterClient() + nodeA.mountStoreClient() + + await nodeA.start() + + let monitorA = NodeHealthMonitor.new(nodeA) + + var + lastStatus = ConnectionStatus.Disconnected + callbackCount = 0 + healthChangeSignal = newAsyncEvent() + + monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} = + lastStatus = status + callbackCount.inc() + healthChangeSignal.fire() + + monitorA.startHealthMonitor().expect("Health monitor failed to start") + + let + nodeBKey = generateSecp256k1Key() + nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0)) + + let driver = newSqliteArchiveDriver() + nodeB.mountArchive(driver).expect("Node B failed to mount archive") + + (await nodeB.mountRelay()).expect("Node B failed to mount relay") + + (await nodeB.mountLightpush()).expect("Node B failed to mount lightpush") + await nodeB.mountFilter() + await nodeB.mountStore() + + await nodeB.start() + + await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()]) + + let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit + var gotConnected = false + + while Moment.now() < connectTimeLimit: + if lastStatus == ConnectionStatus.PartiallyConnected: + gotConnected = true + break + + if await healthChangeSignal.wait().withTimeout(connectTimeLimit - Moment.now()): + healthChangeSignal.clear() + + check: + gotConnected == true + callbackCount >= 1 + lastStatus == ConnectionStatus.PartiallyConnected + + healthChangeSignal.clear() + + await nodeB.stop() + await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo()) + + let disconnectTimeLimit = Moment.now() + TestConnectivityTimeLimit + var gotDisconnected = false + + while Moment.now() < disconnectTimeLimit: + if lastStatus == ConnectionStatus.Disconnected: + gotDisconnected = true + break + + if await healthChangeSignal.wait().withTimeout(disconnectTimeLimit - Moment.now()): + healthChangeSignal.clear() + + check: + gotDisconnected == true + lastStatus == ConnectionStatus.Disconnected + + await monitorA.stopHealthMonitor() + await nodeA.stop() diff --git a/tests/waku_relay/utils.nim b/tests/waku_relay/utils.nim index d5703d415..4e958a4ea 100644 --- a/tests/waku_relay/utils.nim +++ b/tests/waku_relay/utils.nim @@ -11,15 +11,15 @@ import from std/times import epochTime import - waku/ - [ - waku_relay, - node/waku_node, - node/peer_manager, - waku_core, - waku_node, - waku_rln_relay, - ], + waku/[ + waku_relay, + node/waku_node, + node/peer_manager, + waku_core, + waku_node, + waku_rln_relay, + common/broker/broker_context, + ], ../waku_store/store_utils, ../waku_archive/archive_utils, ../testlib/[wakucore, futures] diff --git a/tests/wakunode_rest/test_rest_health.nim b/tests/wakunode_rest/test_rest_health.nim index 2a70fee5f..37abaf4f5 100644 --- a/tests/wakunode_rest/test_rest_health.nim +++ b/tests/wakunode_rest/test_rest_health.nim @@ -10,6 +10,7 @@ import libp2p/crypto/crypto import waku/[ + common/waku_protocol, waku_node, node/waku_node as waku_node2, # TODO: Remove after moving `git_version` to the app code. @@ -78,47 +79,39 @@ suite "Waku v2 REST API - health": # When var response = await client.healthCheck() + let report = response.data # Then check: response.status == 200 $response.contentType == $MIMETYPE_JSON - response.data.nodeHealth == HealthStatus.READY - response.data.protocolsHealth.len() == 15 - response.data.protocolsHealth[0].protocol == "Relay" - response.data.protocolsHealth[0].health == HealthStatus.NOT_READY - response.data.protocolsHealth[0].desc == some("No connected peers") - response.data.protocolsHealth[1].protocol == "Rln Relay" - response.data.protocolsHealth[1].health == HealthStatus.READY - response.data.protocolsHealth[2].protocol == "Lightpush" - response.data.protocolsHealth[2].health == HealthStatus.NOT_MOUNTED - response.data.protocolsHealth[3].protocol == "Legacy Lightpush" - response.data.protocolsHealth[3].health == HealthStatus.NOT_MOUNTED - response.data.protocolsHealth[4].protocol == "Filter" - response.data.protocolsHealth[4].health == HealthStatus.NOT_MOUNTED - response.data.protocolsHealth[5].protocol == "Store" - response.data.protocolsHealth[5].health == HealthStatus.NOT_MOUNTED - response.data.protocolsHealth[6].protocol == "Legacy Store" - response.data.protocolsHealth[6].health == HealthStatus.NOT_MOUNTED - response.data.protocolsHealth[7].protocol == "Peer Exchange" - response.data.protocolsHealth[7].health == HealthStatus.NOT_MOUNTED - response.data.protocolsHealth[8].protocol == "Rendezvous" - response.data.protocolsHealth[8].health == HealthStatus.NOT_MOUNTED - response.data.protocolsHealth[9].protocol == "Mix" - response.data.protocolsHealth[9].health == HealthStatus.NOT_MOUNTED - response.data.protocolsHealth[10].protocol == "Lightpush Client" - response.data.protocolsHealth[10].health == HealthStatus.NOT_READY - response.data.protocolsHealth[10].desc == + report.nodeHealth == HealthStatus.READY + report.protocolsHealth.len() == 15 + + report.getHealth(RelayProtocol).health == HealthStatus.NOT_READY + report.getHealth(RelayProtocol).desc == some("No connected peers") + + report.getHealth(RlnRelayProtocol).health == HealthStatus.READY + + report.getHealth(LightpushProtocol).health == HealthStatus.NOT_MOUNTED + report.getHealth(LegacyLightpushProtocol).health == HealthStatus.NOT_MOUNTED + report.getHealth(FilterProtocol).health == HealthStatus.NOT_MOUNTED + report.getHealth(StoreProtocol).health == HealthStatus.NOT_MOUNTED + report.getHealth(LegacyStoreProtocol).health == HealthStatus.NOT_MOUNTED + report.getHealth(PeerExchangeProtocol).health == HealthStatus.NOT_MOUNTED + report.getHealth(RendezvousProtocol).health == HealthStatus.NOT_MOUNTED + report.getHealth(MixProtocol).health == HealthStatus.NOT_MOUNTED + + report.getHealth(LightpushClientProtocol).health == HealthStatus.NOT_READY + report.getHealth(LightpushClientProtocol).desc == some("No Lightpush service peer available yet") - response.data.protocolsHealth[11].protocol == "Legacy Lightpush Client" - response.data.protocolsHealth[11].health == HealthStatus.NOT_MOUNTED - response.data.protocolsHealth[12].protocol == "Store Client" - response.data.protocolsHealth[12].health == HealthStatus.NOT_MOUNTED - response.data.protocolsHealth[13].protocol == "Legacy Store Client" - response.data.protocolsHealth[13].health == HealthStatus.NOT_MOUNTED - response.data.protocolsHealth[14].protocol == "Filter Client" - response.data.protocolsHealth[14].health == HealthStatus.NOT_READY - response.data.protocolsHealth[14].desc == + + report.getHealth(LegacyLightpushClientProtocol).health == HealthStatus.NOT_MOUNTED + report.getHealth(StoreClientProtocol).health == HealthStatus.NOT_MOUNTED + report.getHealth(LegacyStoreClientProtocol).health == HealthStatus.NOT_MOUNTED + + report.getHealth(FilterClientProtocol).health == HealthStatus.NOT_READY + report.getHealth(FilterClientProtocol).desc == some("No Filter service peer available yet") await restServer.stop() diff --git a/waku/api/api.nim b/waku/api/api.nim index 41f4fd240..7f13919b3 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -1,7 +1,7 @@ -import chronicles, chronos, results +import chronicles, chronos, results, std/strutils import waku/factory/waku -import waku/[requests/health_request, waku_core, waku_node] +import waku/[requests/health_requests, waku_core, waku_node] import waku/node/delivery_service/send_service import waku/node/delivery_service/subscription_service import ./[api_conf, types] @@ -25,16 +25,8 @@ proc checkApiAvailability(w: Waku): Result[void, string] = if w.isNil(): return err("Waku node is not initialized") - # check if health is satisfactory - # If Node is not healthy, return err("Waku node is not healthy") - let healthStatus = RequestNodeHealth.request(w.brokerCtx) - - if healthStatus.isErr(): - warn "Failed to get Waku node health status: ", error = healthStatus.error - # Let's suppose the node is hesalthy enough, go ahead - else: - if healthStatus.get().healthStatus == NodeHealth.Unhealthy: - return err("Waku node is not healthy, has got no connections.") + # TODO: Conciliate request-bouncing health checks here with unit testing. + # (For now, better to just allow all sends and rely on retries.) return ok() diff --git a/waku/api/types.nim b/waku/api/types.nim index a0626e98c..9eae503c8 100644 --- a/waku/api/types.nim +++ b/waku/api/types.nim @@ -14,10 +14,10 @@ type RequestId* = distinct string - NodeHealth* {.pure.} = enum - Healthy - MinimallyHealthy - Unhealthy + ConnectionStatus* {.pure.} = enum + Disconnected + PartiallyConnected + Connected proc new*(T: typedesc[RequestId], rng: ref HmacDrbgContext): T = ## Generate a new RequestId using the provided RNG. diff --git a/waku/common/waku_protocol.nim b/waku/common/waku_protocol.nim new file mode 100644 index 000000000..5063f4c98 --- /dev/null +++ b/waku/common/waku_protocol.nim @@ -0,0 +1,24 @@ +{.push raises: [].} + +type WakuProtocol* {.pure.} = enum + RelayProtocol = "Relay" + RlnRelayProtocol = "Rln Relay" + StoreProtocol = "Store" + LegacyStoreProtocol = "Legacy Store" + FilterProtocol = "Filter" + LightpushProtocol = "Lightpush" + LegacyLightpushProtocol = "Legacy Lightpush" + PeerExchangeProtocol = "Peer Exchange" + RendezvousProtocol = "Rendezvous" + MixProtocol = "Mix" + StoreClientProtocol = "Store Client" + LegacyStoreClientProtocol = "Legacy Store Client" + FilterClientProtocol = "Filter Client" + LightpushClientProtocol = "Lightpush Client" + LegacyLightpushClientProtocol = "Legacy Lightpush Client" + +const + RelayProtocols* = {RelayProtocol} + StoreClientProtocols* = {StoreClientProtocol, LegacyStoreClientProtocol} + LightpushClientProtocols* = {LightpushClientProtocol, LegacyLightpushClientProtocol} + FilterClientProtocols* = {FilterClientProtocol} diff --git a/waku/events/events.nim b/waku/events/events.nim index 2a0af8828..46dd4fdd3 100644 --- a/waku/events/events.nim +++ b/waku/events/events.nim @@ -1,3 +1,3 @@ -import ./[message_events, delivery_events] +import ./[message_events, delivery_events, health_events, peer_events] -export message_events, delivery_events +export message_events, delivery_events, health_events, peer_events diff --git a/waku/events/health_events.nim b/waku/events/health_events.nim new file mode 100644 index 000000000..1e6decedb --- /dev/null +++ b/waku/events/health_events.nim @@ -0,0 +1,27 @@ +import waku/common/broker/event_broker + +import waku/api/types +import waku/node/health_monitor/[protocol_health, topic_health] +import waku/waku_core/topics + +export protocol_health, topic_health + +# Notify health changes to node connectivity +EventBroker: + type EventConnectionStatusChange* = object + connectionStatus*: ConnectionStatus + +# Notify health changes to a subscribed topic +# TODO: emit content topic health change events when subscribe/unsubscribe +# from/to content topic is provided in the new API (so we know which +# content topics are of interest to the application) +EventBroker: + type EventContentTopicHealthChange* = object + contentTopic*: ContentTopic + health*: TopicHealth + +# Notify health changes to a shard (pubsub topic) +EventBroker: + type EventShardTopicHealthChange* = object + topic*: PubsubTopic + health*: TopicHealth diff --git a/waku/events/peer_events.nim b/waku/events/peer_events.nim new file mode 100644 index 000000000..49dfa9f9a --- /dev/null +++ b/waku/events/peer_events.nim @@ -0,0 +1,13 @@ +import waku/common/broker/event_broker +import libp2p/switch + +type WakuPeerEventKind* {.pure.} = enum + EventConnected + EventDisconnected + EventIdentified + EventMetadataUpdated + +EventBroker: + type EventWakuPeer* = object + peerId*: PeerId + kind*: WakuPeerEventKind diff --git a/waku/factory/app_callbacks.nim b/waku/factory/app_callbacks.nim index d28b9f2d1..f1d3369be 100644 --- a/waku/factory/app_callbacks.nim +++ b/waku/factory/app_callbacks.nim @@ -1,6 +1,7 @@ -import ../waku_relay, ../node/peer_manager +import ../waku_relay, ../node/peer_manager, ../node/health_monitor/connection_status type AppCallbacks* = ref object relayHandler*: WakuRelayHandler topicHealthChangeHandler*: TopicHealthChangeHandler connectionChangeHandler*: ConnectionChangeHandler + connectionStatusChangeHandler*: ConnectionStatusChangeHandler diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index f379f92bb..e0b643fc0 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -15,7 +15,8 @@ import ../waku_node, ../node/peer_manager, ../common/rate_limit/setting, - ../common/utils/parse_size_units + ../common/utils/parse_size_units, + ../common/broker/broker_context type WakuNodeBuilder* = object # General diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 3748847f1..dd253129c 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -17,35 +17,36 @@ import eth/p2p/discoveryv5/enr, presto, metrics, - metrics/chronos_httpserver -import - ../common/logging, - ../waku_core, - ../waku_node, - ../node/peer_manager, - ../node/health_monitor, - ../node/waku_metrics, - ../node/delivery_service/delivery_service, - ../rest_api/message_cache, - ../rest_api/endpoint/server, - ../rest_api/endpoint/builder as rest_server_builder, - ../waku_archive, - ../waku_relay/protocol, - ../discovery/waku_dnsdisc, - ../discovery/waku_discv5, - ../discovery/autonat_service, - ../waku_enr/sharding, - ../waku_rln_relay, - ../waku_store, - ../waku_filter_v2, - ../factory/node_factory, - ../factory/internal_config, - ../factory/app_callbacks, - ../waku_enr/multiaddr, - ./waku_conf, - ../common/broker/broker_context, - ../requests/health_request, - ../api/types + metrics/chronos_httpserver, + waku/[ + waku_core, + waku_node, + waku_archive, + waku_rln_relay, + waku_store, + waku_filter_v2, + waku_relay/protocol, + waku_enr/sharding, + waku_enr/multiaddr, + api/types, + common/logging, + common/broker/broker_context, + node/peer_manager, + node/health_monitor, + node/waku_metrics, + node/delivery_service/delivery_service, + rest_api/message_cache, + rest_api/endpoint/server, + rest_api/endpoint/builder as rest_server_builder, + discovery/waku_dnsdisc, + discovery/waku_discv5, + discovery/autonat_service, + requests/health_requests, + factory/node_factory, + factory/internal_config, + factory/app_callbacks, + ], + ./waku_conf logScope: topics = "wakunode waku" @@ -118,7 +119,10 @@ proc newCircuitRelay(isRelayClient: bool): Relay = return Relay.new() proc setupAppCallbacks( - node: WakuNode, conf: WakuConf, appCallbacks: AppCallbacks + node: WakuNode, + conf: WakuConf, + appCallbacks: AppCallbacks, + healthMonitor: NodeHealthMonitor, ): Result[void, string] = if appCallbacks.isNil(): info "No external callbacks to be set" @@ -159,6 +163,13 @@ proc setupAppCallbacks( err("Cannot configure connectionChangeHandler callback with empty peer manager") node.peerManager.onConnectionChange = appCallbacks.connectionChangeHandler + if not appCallbacks.connectionStatusChangeHandler.isNil(): + if healthMonitor.isNil(): + return + err("Cannot configure connectionStatusChangeHandler with empty health monitor") + + healthMonitor.onConnectionStatusChange = appCallbacks.connectionStatusChangeHandler + return ok() proc new*( @@ -192,7 +203,7 @@ proc new*( else: nil - node.setupAppCallbacks(wakuConf, appCallbacks).isOkOr: + node.setupAppCallbacks(wakuConf, appCallbacks, healthMonitor).isOkOr: error "Failed setting up app callbacks", error = error return err("Failed setting up app callbacks: " & $error) @@ -409,60 +420,48 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: waku[].healthMonitor.startHealthMonitor().isOkOr: return err("failed to start health monitor: " & $error) - ## Setup RequestNodeHealth provider + ## Setup RequestConnectionStatus provider - RequestNodeHealth.setProvider( + RequestConnectionStatus.setProvider( globalBrokerContext(), - proc(): Result[RequestNodeHealth, string] = - let healthReportFut = waku[].healthMonitor.getNodeHealthReport() - if not healthReportFut.completed(): - return err("Health report not available") + proc(): Result[RequestConnectionStatus, string] = try: - let healthReport = healthReportFut.read() - - # Check if Relay or Lightpush Client is ready (MinimallyHealthy condition) - var relayReady = false - var lightpushClientReady = false - var storeClientReady = false - var filterClientReady = false - - for protocolHealth in healthReport.protocolsHealth: - if protocolHealth.protocol == "Relay" and - protocolHealth.health == HealthStatus.READY: - relayReady = true - elif protocolHealth.protocol == "Lightpush Client" and - protocolHealth.health == HealthStatus.READY: - lightpushClientReady = true - elif protocolHealth.protocol == "Store Client" and - protocolHealth.health == HealthStatus.READY: - storeClientReady = true - elif protocolHealth.protocol == "Filter Client" and - protocolHealth.health == HealthStatus.READY: - filterClientReady = true - - # Determine node health based on protocol states - let isMinimallyHealthy = relayReady or lightpushClientReady - let nodeHealth = - if isMinimallyHealthy and storeClientReady and filterClientReady: - NodeHealth.Healthy - elif isMinimallyHealthy: - NodeHealth.MinimallyHealthy - else: - NodeHealth.Unhealthy - - debug "Providing health report", - nodeHealth = $nodeHealth, - relayReady = relayReady, - lightpushClientReady = lightpushClientReady, - storeClientReady = storeClientReady, - filterClientReady = filterClientReady, - details = $(healthReport) - - return ok(RequestNodeHealth(healthStatus: nodeHealth)) - except CatchableError as exc: - err("Failed to read health report: " & exc.msg), + let healthReport = waku[].healthMonitor.getSyncNodeHealthReport() + return + ok(RequestConnectionStatus(connectionStatus: healthReport.connectionStatus)) + except CatchableError: + err("Failed to read health report: " & getCurrentExceptionMsg()), ).isOkOr: - error "Failed to set RequestNodeHealth provider", error = error + error "Failed to set RequestConnectionStatus provider", error = error + + ## Setup RequestProtocolHealth provider + + RequestProtocolHealth.setProvider( + globalBrokerContext(), + proc( + protocol: WakuProtocol + ): Future[Result[RequestProtocolHealth, string]] {.async.} = + try: + let protocolHealthStatus = + await waku[].healthMonitor.getProtocolHealthInfo(protocol) + return ok(RequestProtocolHealth(healthStatus: protocolHealthStatus)) + except CatchableError: + return err("Failed to get protocol health: " & getCurrentExceptionMsg()), + ).isOkOr: + error "Failed to set RequestProtocolHealth provider", error = error + + ## Setup RequestHealthReport provider (The lost child) + + RequestHealthReport.setProvider( + globalBrokerContext(), + proc(): Future[Result[RequestHealthReport, string]] {.async.} = + try: + let report = await waku[].healthMonitor.getNodeHealthReport() + return ok(RequestHealthReport(healthReport: report)) + except CatchableError: + return err("Failed to get health report: " & getCurrentExceptionMsg()), + ).isOkOr: + error "Failed to set RequestHealthReport provider", error = error if conf.restServerConf.isSome(): rest_server_builder.startRestServerProtocolSupport( @@ -521,8 +520,8 @@ proc stop*(waku: Waku): Future[Result[void, string]] {.async: (raises: []).} = if not waku.healthMonitor.isNil(): await waku.healthMonitor.stopHealthMonitor() - ## Clear RequestNodeHealth provider - RequestNodeHealth.clearProvider(waku.brokerCtx) + ## Clear RequestConnectionStatus provider + RequestConnectionStatus.clearProvider(waku.brokerCtx) if not waku.restServer.isNil(): await waku.restServer.stop() diff --git a/waku/node/delivery_service/send_service/relay_processor.nim b/waku/node/delivery_service/send_service/relay_processor.nim index 974c22f6c..833d15845 100644 --- a/waku/node/delivery_service/send_service/relay_processor.nim +++ b/waku/node/delivery_service/send_service/relay_processor.nim @@ -1,7 +1,7 @@ import std/options import chronos, chronicles import waku/[waku_core], waku/waku_lightpush/[common, rpc] -import waku/requests/health_request +import waku/requests/health_requests import waku/common/broker/broker_context import waku/api/types import ./[delivery_task, send_processor] @@ -32,7 +32,7 @@ proc new*( ) proc isTopicHealthy(self: RelaySendProcessor, topic: PubsubTopic): bool {.gcsafe.} = - let healthReport = RequestRelayTopicsHealth.request(self.brokerCtx, @[topic]).valueOr: + let healthReport = RequestShardTopicsHealth.request(self.brokerCtx, @[topic]).valueOr: error "isTopicHealthy: failed to get health report", topic = topic, error = error return false diff --git a/waku/node/health_monitor.nim b/waku/node/health_monitor.nim index 854a8bbc0..6e42352d4 100644 --- a/waku/node/health_monitor.nim +++ b/waku/node/health_monitor.nim @@ -1,4 +1,9 @@ import - health_monitor/[node_health_monitor, protocol_health, online_monitor, health_status] + health_monitor/[ + node_health_monitor, protocol_health, online_monitor, health_status, + connection_status, health_report, + ] -export node_health_monitor, protocol_health, online_monitor, health_status +export + node_health_monitor, protocol_health, online_monitor, health_status, + connection_status, health_report diff --git a/waku/node/health_monitor/connection_status.nim b/waku/node/health_monitor/connection_status.nim new file mode 100644 index 000000000..77696130a --- /dev/null +++ b/waku/node/health_monitor/connection_status.nim @@ -0,0 +1,15 @@ +import chronos, results, std/strutils, ../../api/types + +export ConnectionStatus + +proc init*( + t: typedesc[ConnectionStatus], strRep: string +): Result[ConnectionStatus, string] = + try: + let status = parseEnum[ConnectionStatus](strRep) + return ok(status) + except ValueError: + return err("Invalid ConnectionStatus string representation: " & strRep) + +type ConnectionStatusChangeHandler* = + proc(status: ConnectionStatus): Future[void] {.gcsafe, raises: [Defect].} diff --git a/waku/node/health_monitor/health_report.nim b/waku/node/health_monitor/health_report.nim new file mode 100644 index 000000000..d6c23cd28 --- /dev/null +++ b/waku/node/health_monitor/health_report.nim @@ -0,0 +1,10 @@ +{.push raises: [].} + +import ./health_status, ./connection_status, ./protocol_health + +type HealthReport* = object + ## Rest API type returned for /health endpoint + ## + nodeHealth*: HealthStatus # legacy "READY" health indicator + connectionStatus*: ConnectionStatus # new "Connected" health indicator + protocolsHealth*: seq[ProtocolHealth] diff --git a/waku/node/health_monitor/node_health_monitor.nim b/waku/node/health_monitor/node_health_monitor.nim index 4b13dfd3d..ba0518e61 100644 --- a/waku/node/health_monitor/node_health_monitor.nim +++ b/waku/node/health_monitor/node_health_monitor.nim @@ -1,55 +1,89 @@ {.push raises: [].} import - std/[options, sets, random, sequtils], + std/[options, sets, random, sequtils, json, strutils, tables], chronos, chronicles, - libp2p/protocols/rendezvous - -import - ../waku_node, - ../kernel_api, - ../../waku_rln_relay, - ../../waku_relay, - ../peer_manager, - ./online_monitor, - ./health_status, - ./protocol_health + libp2p/protocols/rendezvous, + libp2p/protocols/pubsub, + libp2p/protocols/pubsub/rpc/messages, + waku/[ + waku_relay, + waku_rln_relay, + api/types, + events/health_events, + events/peer_events, + node/waku_node, + node/peer_manager, + node/kernel_api, + node/health_monitor/online_monitor, + node/health_monitor/health_status, + node/health_monitor/health_report, + node/health_monitor/connection_status, + node/health_monitor/protocol_health, + ] ## This module is aimed to check the state of the "self" Waku Node # randomize initializes sdt/random's random number generator # if not called, the outcome of randomization procedures will be the same in every run -randomize() +random.randomize() -type - HealthReport* = object - nodeHealth*: HealthStatus - protocolsHealth*: seq[ProtocolHealth] +const HealthyThreshold* = 2 + ## minimum peers required for all services for a Connected status, excluding Relay - NodeHealthMonitor* = ref object - nodeHealth: HealthStatus - node: WakuNode - onlineMonitor*: OnlineMonitor - keepAliveFut: Future[void] +type NodeHealthMonitor* = ref object + nodeHealth: HealthStatus + node: WakuNode + onlineMonitor*: OnlineMonitor + keepAliveFut: Future[void] + healthLoopFut: Future[void] + healthUpdateEvent: AsyncEvent + connectionStatus: ConnectionStatus + onConnectionStatusChange*: ConnectionStatusChangeHandler + cachedProtocols: seq[ProtocolHealth] + ## state of each protocol to report. + ## calculated on last event that can change any protocol's state so fetching a report is fast. + strength: Table[WakuProtocol, int] + ## latest known connectivity strength (e.g. connected peer count) metric for each protocol. + ## if it doesn't make sense for the protocol in question, this is set to zero. + relayObserver: PubSubObserver + peerEventListener: EventWakuPeerListener + +func getHealth*(report: HealthReport, kind: WakuProtocol): ProtocolHealth = + for h in report.protocolsHealth: + if h.protocol == $kind: + return h + # Shouldn't happen, but if it does, then assume protocol is not mounted + return ProtocolHealth.init(kind) + +proc countCapablePeers(hm: NodeHealthMonitor, codec: string): int = + if isNil(hm.node.peerManager): + return 0 + + return hm.node.peerManager.getCapablePeersCount(codec) proc getRelayHealth(hm: NodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Relay") + var p = ProtocolHealth.init(WakuProtocol.RelayProtocol) - if hm.node.wakuRelay == nil: + if isNil(hm.node.wakuRelay): + hm.strength[WakuProtocol.RelayProtocol] = 0 return p.notMounted() let relayPeers = hm.node.wakuRelay.getConnectedPubSubPeers(pubsubTopic = "").valueOr: + hm.strength[WakuProtocol.RelayProtocol] = 0 return p.notMounted() - if relayPeers.len() == 0: + let count = relayPeers.len + hm.strength[WakuProtocol.RelayProtocol] = count + if count == 0: return p.notReady("No connected peers") return p.ready() proc getRlnRelayHealth(hm: NodeHealthMonitor): Future[ProtocolHealth] {.async.} = - var p = ProtocolHealth.init("Rln Relay") - if hm.node.wakuRlnRelay.isNil(): + var p = ProtocolHealth.init(WakuProtocol.RlnRelayProtocol) + if isNil(hm.node.wakuRlnRelay): return p.notMounted() const FutIsReadyTimout = 5.seconds @@ -72,121 +106,144 @@ proc getRlnRelayHealth(hm: NodeHealthMonitor): Future[ProtocolHealth] {.async.} proc getLightpushHealth( hm: NodeHealthMonitor, relayHealth: HealthStatus ): ProtocolHealth = - var p = ProtocolHealth.init("Lightpush") + var p = ProtocolHealth.init(WakuProtocol.LightpushProtocol) - if hm.node.wakuLightPush == nil: + if isNil(hm.node.wakuLightPush): + hm.strength[WakuProtocol.LightpushProtocol] = 0 return p.notMounted() + let peerCount = countCapablePeers(hm, WakuLightPushCodec) + hm.strength[WakuProtocol.LightpushProtocol] = peerCount + if relayHealth == HealthStatus.READY: return p.ready() return p.notReady("Node has no relay peers to fullfill push requests") -proc getLightpushClientHealth( - hm: NodeHealthMonitor, relayHealth: HealthStatus -): ProtocolHealth = - var p = ProtocolHealth.init("Lightpush Client") - - if hm.node.wakuLightpushClient == nil: - return p.notMounted() - - let selfServiceAvailable = - hm.node.wakuLightPush != nil and relayHealth == HealthStatus.READY - let servicePeerAvailable = hm.node.peerManager.selectPeer(WakuLightPushCodec).isSome() - - if selfServiceAvailable or servicePeerAvailable: - return p.ready() - - return p.notReady("No Lightpush service peer available yet") - proc getLegacyLightpushHealth( hm: NodeHealthMonitor, relayHealth: HealthStatus ): ProtocolHealth = - var p = ProtocolHealth.init("Legacy Lightpush") + var p = ProtocolHealth.init(WakuProtocol.LegacyLightpushProtocol) - if hm.node.wakuLegacyLightPush == nil: + if isNil(hm.node.wakuLegacyLightPush): + hm.strength[WakuProtocol.LegacyLightpushProtocol] = 0 return p.notMounted() + let peerCount = countCapablePeers(hm, WakuLegacyLightPushCodec) + hm.strength[WakuProtocol.LegacyLightpushProtocol] = peerCount + if relayHealth == HealthStatus.READY: return p.ready() return p.notReady("Node has no relay peers to fullfill push requests") -proc getLegacyLightpushClientHealth( - hm: NodeHealthMonitor, relayHealth: HealthStatus -): ProtocolHealth = - var p = ProtocolHealth.init("Legacy Lightpush Client") - - if hm.node.wakuLegacyLightpushClient == nil: - return p.notMounted() - - if (hm.node.wakuLegacyLightPush != nil and relayHealth == HealthStatus.READY) or - hm.node.peerManager.selectPeer(WakuLegacyLightPushCodec).isSome(): - return p.ready() - - return p.notReady("No Lightpush service peer available yet") - proc getFilterHealth(hm: NodeHealthMonitor, relayHealth: HealthStatus): ProtocolHealth = - var p = ProtocolHealth.init("Filter") + var p = ProtocolHealth.init(WakuProtocol.FilterProtocol) - if hm.node.wakuFilter == nil: + if isNil(hm.node.wakuFilter): + hm.strength[WakuProtocol.FilterProtocol] = 0 return p.notMounted() + let peerCount = countCapablePeers(hm, WakuFilterSubscribeCodec) + hm.strength[WakuProtocol.FilterProtocol] = peerCount + if relayHealth == HealthStatus.READY: return p.ready() return p.notReady("Relay is not ready, filter will not be able to sort out messages") -proc getFilterClientHealth( - hm: NodeHealthMonitor, relayHealth: HealthStatus -): ProtocolHealth = - var p = ProtocolHealth.init("Filter Client") - - if hm.node.wakuFilterClient == nil: - return p.notMounted() - - if hm.node.peerManager.selectPeer(WakuFilterSubscribeCodec).isSome(): - return p.ready() - - return p.notReady("No Filter service peer available yet") - proc getStoreHealth(hm: NodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Store") + var p = ProtocolHealth.init(WakuProtocol.StoreProtocol) - if hm.node.wakuStore == nil: + if isNil(hm.node.wakuStore): + hm.strength[WakuProtocol.StoreProtocol] = 0 return p.notMounted() + let peerCount = countCapablePeers(hm, WakuStoreCodec) + hm.strength[WakuProtocol.StoreProtocol] = peerCount return p.ready() -proc getStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Store Client") +proc getLegacyStoreHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init(WakuProtocol.LegacyStoreProtocol) - if hm.node.wakuStoreClient == nil: + if isNil(hm.node.wakuLegacyStore): + hm.strength[WakuProtocol.LegacyStoreProtocol] = 0 return p.notMounted() - if hm.node.peerManager.selectPeer(WakuStoreCodec).isSome() or hm.node.wakuStore != nil: + let peerCount = hm.countCapablePeers(WakuLegacyStoreCodec) + hm.strength[WakuProtocol.LegacyStoreProtocol] = peerCount + return p.ready() + +proc getLightpushClientHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init(WakuProtocol.LightpushClientProtocol) + + if isNil(hm.node.wakuLightpushClient): + hm.strength[WakuProtocol.LightpushClientProtocol] = 0 + return p.notMounted() + + let peerCount = countCapablePeers(hm, WakuLightPushCodec) + hm.strength[WakuProtocol.LightpushClientProtocol] = peerCount + + if peerCount > 0: + return p.ready() + return p.notReady("No Lightpush service peer available yet") + +proc getLegacyLightpushClientHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init(WakuProtocol.LegacyLightpushClientProtocol) + + if isNil(hm.node.wakuLegacyLightpushClient): + hm.strength[WakuProtocol.LegacyLightpushClientProtocol] = 0 + return p.notMounted() + + let peerCount = countCapablePeers(hm, WakuLegacyLightPushCodec) + hm.strength[WakuProtocol.LegacyLightpushClientProtocol] = peerCount + + if peerCount > 0: + return p.ready() + return p.notReady("No Lightpush service peer available yet") + +proc getFilterClientHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init(WakuProtocol.FilterClientProtocol) + + if isNil(hm.node.wakuFilterClient): + hm.strength[WakuProtocol.FilterClientProtocol] = 0 + return p.notMounted() + + let peerCount = countCapablePeers(hm, WakuFilterSubscribeCodec) + hm.strength[WakuProtocol.FilterClientProtocol] = peerCount + + if peerCount > 0: + return p.ready() + return p.notReady("No Filter service peer available yet") + +proc getStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init(WakuProtocol.StoreClientProtocol) + + if isNil(hm.node.wakuStoreClient): + hm.strength[WakuProtocol.StoreClientProtocol] = 0 + return p.notMounted() + + let peerCount = countCapablePeers(hm, WakuStoreCodec) + hm.strength[WakuProtocol.StoreClientProtocol] = peerCount + + if peerCount > 0 or not isNil(hm.node.wakuStore): return p.ready() return p.notReady( "No Store service peer available yet, neither Store service set up for the node" ) -proc getLegacyStoreHealth(hm: NodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Legacy Store") - - if hm.node.wakuLegacyStore == nil: - return p.notMounted() - - return p.ready() - proc getLegacyStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Legacy Store Client") + var p = ProtocolHealth.init(WakuProtocol.LegacyStoreClientProtocol) - if hm.node.wakuLegacyStoreClient == nil: + if isNil(hm.node.wakuLegacyStoreClient): + hm.strength[WakuProtocol.LegacyStoreClientProtocol] = 0 return p.notMounted() - if hm.node.peerManager.selectPeer(WakuLegacyStoreCodec).isSome() or - hm.node.wakuLegacyStore != nil: + let peerCount = countCapablePeers(hm, WakuLegacyStoreCodec) + hm.strength[WakuProtocol.LegacyStoreClientProtocol] = peerCount + + if peerCount > 0 or not isNil(hm.node.wakuLegacyStore): return p.ready() return p.notReady( @@ -194,38 +251,305 @@ proc getLegacyStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth = ) proc getPeerExchangeHealth(hm: NodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Peer Exchange") + var p = ProtocolHealth.init(WakuProtocol.PeerExchangeProtocol) - if hm.node.wakuPeerExchange == nil: + if isNil(hm.node.wakuPeerExchange): + hm.strength[WakuProtocol.PeerExchangeProtocol] = 0 return p.notMounted() + let peerCount = countCapablePeers(hm, WakuPeerExchangeCodec) + hm.strength[WakuProtocol.PeerExchangeProtocol] = peerCount + return p.ready() proc getRendezvousHealth(hm: NodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Rendezvous") + var p = ProtocolHealth.init(WakuProtocol.RendezvousProtocol) - if hm.node.wakuRendezvous == nil: + if isNil(hm.node.wakuRendezvous): + hm.strength[WakuProtocol.RendezvousProtocol] = 0 return p.notMounted() - if hm.node.peerManager.switch.peerStore.peers(RendezVousCodec).len() == 0: + let peerCount = countCapablePeers(hm, RendezVousCodec) + hm.strength[WakuProtocol.RendezvousProtocol] = peerCount + if peerCount == 0: return p.notReady("No Rendezvous peers are available yet") return p.ready() proc getMixHealth(hm: NodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Mix") + var p = ProtocolHealth.init(WakuProtocol.MixProtocol) - if hm.node.wakuMix.isNil(): + if isNil(hm.node.wakuMix): return p.notMounted() return p.ready() +proc getSyncProtocolHealthInfo*( + hm: NodeHealthMonitor, protocol: WakuProtocol +): ProtocolHealth = + ## Get ProtocolHealth for a given protocol that can provide it synchronously + ## + case protocol + of WakuProtocol.RelayProtocol: + return hm.getRelayHealth() + of WakuProtocol.StoreProtocol: + return hm.getStoreHealth() + of WakuProtocol.LegacyStoreProtocol: + return hm.getLegacyStoreHealth() + of WakuProtocol.FilterProtocol: + return hm.getFilterHealth(hm.getRelayHealth().health) + of WakuProtocol.LightpushProtocol: + return hm.getLightpushHealth(hm.getRelayHealth().health) + of WakuProtocol.LegacyLightpushProtocol: + return hm.getLegacyLightpushHealth(hm.getRelayHealth().health) + of WakuProtocol.PeerExchangeProtocol: + return hm.getPeerExchangeHealth() + of WakuProtocol.RendezvousProtocol: + return hm.getRendezvousHealth() + of WakuProtocol.MixProtocol: + return hm.getMixHealth() + of WakuProtocol.StoreClientProtocol: + return hm.getStoreClientHealth() + of WakuProtocol.LegacyStoreClientProtocol: + return hm.getLegacyStoreClientHealth() + of WakuProtocol.FilterClientProtocol: + return hm.getFilterClientHealth() + of WakuProtocol.LightpushClientProtocol: + return hm.getLightpushClientHealth() + of WakuProtocol.LegacyLightpushClientProtocol: + return hm.getLegacyLightpushClientHealth() + of WakuProtocol.RlnRelayProtocol: + # Could waitFor here but we don't want to block the main thread. + # Could also return a cached value from a previous check. + var p = ProtocolHealth.init(protocol) + return p.notReady("RLN Relay health check is async") + else: + var p = ProtocolHealth.init(protocol) + return p.notMounted() + +proc getProtocolHealthInfo*( + hm: NodeHealthMonitor, protocol: WakuProtocol +): Future[ProtocolHealth] {.async.} = + ## Get ProtocolHealth for a given protocol + ## + case protocol + of WakuProtocol.RlnRelayProtocol: + return await hm.getRlnRelayHealth() + else: + return hm.getSyncProtocolHealthInfo(protocol) + +proc getSyncAllProtocolHealthInfo(hm: NodeHealthMonitor): seq[ProtocolHealth] = + ## Get ProtocolHealth for the subset of protocols that can provide it synchronously + ## + var protocols: seq[ProtocolHealth] = @[] + let relayHealth = hm.getRelayHealth() + protocols.add(relayHealth) + + protocols.add(hm.getLightpushHealth(relayHealth.health)) + protocols.add(hm.getLegacyLightpushHealth(relayHealth.health)) + protocols.add(hm.getFilterHealth(relayHealth.health)) + protocols.add(hm.getStoreHealth()) + protocols.add(hm.getLegacyStoreHealth()) + protocols.add(hm.getPeerExchangeHealth()) + protocols.add(hm.getRendezvousHealth()) + protocols.add(hm.getMixHealth()) + + protocols.add(hm.getLightpushClientHealth()) + protocols.add(hm.getLegacyLightpushClientHealth()) + protocols.add(hm.getStoreClientHealth()) + protocols.add(hm.getLegacyStoreClientHealth()) + protocols.add(hm.getFilterClientHealth()) + return protocols + +proc getAllProtocolHealthInfo( + hm: NodeHealthMonitor +): Future[seq[ProtocolHealth]] {.async.} = + ## Get ProtocolHealth for all protocols + ## + var protocols = hm.getSyncAllProtocolHealthInfo() + + let rlnHealth = await hm.getRlnRelayHealth() + protocols.add(rlnHealth) + + return protocols + +proc calculateConnectionState*( + protocols: seq[ProtocolHealth], + strength: Table[WakuProtocol, int], ## latest connectivity strength (e.g. peer count) for a protocol + dLowOpt: Option[int], ## minimum relay peers for Connected status if in Core (Relay) mode +): ConnectionStatus = + var + relayCount = 0 + lightpushCount = 0 + filterCount = 0 + storeClientCount = 0 + + for p in protocols: + let kind = + try: + parseEnum[WakuProtocol](p.protocol) + except ValueError: + continue + + if p.health != HealthStatus.READY: + continue + + let strength = strength.getOrDefault(kind, 0) + + if kind in RelayProtocols: + relayCount = max(relayCount, strength) + elif kind in StoreClientProtocols: + storeClientCount = max(storeClientCount, strength) + elif kind in LightpushClientProtocols: + lightpushCount = max(lightpushCount, strength) + elif kind in FilterClientProtocols: + filterCount = max(filterCount, strength) + + debug "calculateConnectionState", + protocol = kind, + strength = strength, + relayCount = relayCount, + storeClientCount = storeClientCount, + lightpushCount = lightpushCount, + filterCount = filterCount + + # Relay connectivity should be a sufficient check in Core mode. + # "Store peers" are relay peers because incoming messages in + # the relay are input to the store server. + # But if Store server (or client, even) is not mounted as well, this logic assumes + # the user knows what they're doing. + + if dLowOpt.isSome(): + if relayCount >= dLowOpt.get(): + return ConnectionStatus.Connected + + if relayCount > 0: + return ConnectionStatus.PartiallyConnected + + # No relay connectivity. Relay might not be mounted, or may just have zero peers. + # Fall back to Edge check in any case to be sure. + + let canSend = lightpushCount > 0 + let canReceive = filterCount > 0 + let canStore = storeClientCount > 0 + + let meetsMinimum = canSend and canReceive and canStore + + if not meetsMinimum: + return ConnectionStatus.Disconnected + + let isEdgeRobust = + (lightpushCount >= HealthyThreshold) and (filterCount >= HealthyThreshold) and + (storeClientCount >= HealthyThreshold) + + if isEdgeRobust: + return ConnectionStatus.Connected + + return ConnectionStatus.PartiallyConnected + +proc calculateConnectionState*(hm: NodeHealthMonitor): ConnectionStatus = + let dLow = + if isNil(hm.node.wakuRelay): + none(int) + else: + some(hm.node.wakuRelay.parameters.dLow) + return calculateConnectionState(hm.cachedProtocols, hm.strength, dLow) + +proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} = + ## Get a HealthReport that includes all protocols + ## + var report: HealthReport + + if hm.nodeHealth == HealthStatus.INITIALIZING or + hm.nodeHealth == HealthStatus.SHUTTING_DOWN: + report.nodeHealth = hm.nodeHealth + report.connectionStatus = ConnectionStatus.Disconnected + return report + + if hm.cachedProtocols.len == 0: + hm.cachedProtocols = await hm.getAllProtocolHealthInfo() + hm.connectionStatus = hm.calculateConnectionState() + + report.nodeHealth = HealthStatus.READY + report.connectionStatus = hm.connectionStatus + report.protocolsHealth = hm.cachedProtocols + return report + +proc getSyncNodeHealthReport*(hm: NodeHealthMonitor): HealthReport = + ## Get a HealthReport that includes the subset of protocols that inform health synchronously + ## + var report: HealthReport + + if hm.nodeHealth == HealthStatus.INITIALIZING or + hm.nodeHealth == HealthStatus.SHUTTING_DOWN: + report.nodeHealth = hm.nodeHealth + report.connectionStatus = ConnectionStatus.Disconnected + return report + + if hm.cachedProtocols.len == 0: + hm.cachedProtocols = hm.getSyncAllProtocolHealthInfo() + hm.connectionStatus = hm.calculateConnectionState() + + report.nodeHealth = HealthStatus.READY + report.connectionStatus = hm.connectionStatus + report.protocolsHealth = hm.cachedProtocols + return report + +proc onRelayMsg( + hm: NodeHealthMonitor, peer: PubSubPeer, msg: var RPCMsg +) {.gcsafe, raises: [].} = + ## Inspect Relay events for health-update relevance in Core (Relay) mode. + ## + ## For Core (Relay) mode, the connectivity health state is mostly determined + ## by the relay protocol state (it is the dominant factor), and we know + ## that a peer Relay can only affect this Relay's health if there is a + ## subscription change or a mesh (GRAFT/PRUNE) change. + ## + + if msg.subscriptions.len == 0: + if msg.control.isNone(): + return + let ctrl = msg.control.get() + if ctrl.graft.len == 0 and ctrl.prune.len == 0: + return + + hm.healthUpdateEvent.fire() + +proc healthLoop(hm: NodeHealthMonitor) {.async.} = + ## Re-evaluate the global health state of the node when notified of a potential change, + ## and call back the application if an actual change from the last notified state happened. + info "Health monitor loop start" + while true: + try: + await hm.healthUpdateEvent.wait() + hm.healthUpdateEvent.clear() + + hm.cachedProtocols = await hm.getAllProtocolHealthInfo() + let newConnectionStatus = hm.calculateConnectionState() + + if newConnectionStatus != hm.connectionStatus: + hm.connectionStatus = newConnectionStatus + + EventConnectionStatusChange.emit(hm.node.brokerCtx, newConnectionStatus) + + if not isNil(hm.onConnectionStatusChange): + await hm.onConnectionStatusChange(newConnectionStatus) + except CancelledError: + break + except Exception as e: + error "HealthMonitor: error in update loop", error = e.msg + + # safety cooldown to protect from edge cases + await sleepAsync(100.milliseconds) + + info "Health monitor loop end" + proc selectRandomPeersForKeepalive( node: WakuNode, outPeers: seq[PeerId], numRandomPeers: int ): Future[seq[PeerId]] {.async.} = ## Select peers for random keepalive, prioritizing mesh peers - if node.wakuRelay.isNil(): + if isNil(node.wakuRelay): return selectRandomPeers(outPeers, numRandomPeers) let meshPeers = node.wakuRelay.getPeersInMesh().valueOr: @@ -359,45 +683,55 @@ proc startKeepalive*( hm.keepAliveFut = hm.node.keepAliveLoop(randomPeersKeepalive, allPeersKeepalive) return ok() -proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} = - var report: HealthReport - report.nodeHealth = hm.nodeHealth - - let relayHealth = hm.getRelayHealth() - report.protocolsHealth.add(relayHealth) - report.protocolsHealth.add(await hm.getRlnRelayHealth()) - report.protocolsHealth.add(hm.getLightpushHealth(relayHealth.health)) - report.protocolsHealth.add(hm.getLegacyLightpushHealth(relayHealth.health)) - report.protocolsHealth.add(hm.getFilterHealth(relayHealth.health)) - report.protocolsHealth.add(hm.getStoreHealth()) - report.protocolsHealth.add(hm.getLegacyStoreHealth()) - report.protocolsHealth.add(hm.getPeerExchangeHealth()) - report.protocolsHealth.add(hm.getRendezvousHealth()) - report.protocolsHealth.add(hm.getMixHealth()) - - report.protocolsHealth.add(hm.getLightpushClientHealth(relayHealth.health)) - report.protocolsHealth.add(hm.getLegacyLightpushClientHealth(relayHealth.health)) - report.protocolsHealth.add(hm.getStoreClientHealth()) - report.protocolsHealth.add(hm.getLegacyStoreClientHealth()) - report.protocolsHealth.add(hm.getFilterClientHealth(relayHealth.health)) - return report - proc setOverallHealth*(hm: NodeHealthMonitor, health: HealthStatus) = hm.nodeHealth = health proc startHealthMonitor*(hm: NodeHealthMonitor): Result[void, string] = hm.onlineMonitor.startOnlineMonitor() + + if isNil(hm.node.peerManager): + return err("startHealthMonitor: no node peerManager to monitor") + + if not isNil(hm.node.wakuRelay): + hm.relayObserver = PubSubObserver( + onRecv: proc(peer: PubSubPeer, msgs: var RPCMsg) {.gcsafe, raises: [].} = + hm.onRelayMsg(peer, msgs) + ) + hm.node.wakuRelay.addObserver(hm.relayObserver) + + hm.peerEventListener = EventWakuPeer.listen( + hm.node.brokerCtx, + proc(evt: EventWakuPeer): Future[void] {.async: (raises: []), gcsafe.} = + ## Recompute health on any peer changing anything (join, leave, identify, metadata update) + hm.healthUpdateEvent.fire(), + ).valueOr: + return err("Failed to subscribe to peer events: " & error) + + hm.healthUpdateEvent = newAsyncEvent() + hm.healthUpdateEvent.fire() + + hm.healthLoopFut = hm.healthLoop() + hm.startKeepalive().isOkOr: return err("startHealthMonitor: failed starting keep alive: " & error) return ok() proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} = - if not hm.onlineMonitor.isNil(): + if not isNil(hm.onlineMonitor): await hm.onlineMonitor.stopOnlineMonitor() - if not hm.keepAliveFut.isNil(): + if not isNil(hm.keepAliveFut): await hm.keepAliveFut.cancelAndWait() + if not isNil(hm.healthLoopFut): + await hm.healthLoopFut.cancelAndWait() + + if hm.peerEventListener.id != 0: + EventWakuPeer.dropListener(hm.node.brokerCtx, hm.peerEventListener) + + if not isNil(hm.node.wakuRelay) and not isNil(hm.relayObserver): + hm.node.wakuRelay.removeObserver(hm.relayObserver) + proc new*( T: type NodeHealthMonitor, node: WakuNode, @@ -406,4 +740,10 @@ proc new*( let om = OnlineMonitor.init(dnsNameServers) om.setPeerStoreToOnlineMonitor(node.switch.peerStore) om.addOnlineStateObserver(node.peerManager.getOnlineStateObserver()) - T(nodeHealth: INITIALIZING, node: node, onlineMonitor: om) + T( + nodeHealth: INITIALIZING, + node: node, + onlineMonitor: om, + connectionStatus: ConnectionStatus.Disconnected, + strength: initTable[WakuProtocol, int](), + ) diff --git a/waku/node/health_monitor/protocol_health.nim b/waku/node/health_monitor/protocol_health.nim index 7bacea94b..4479888c8 100644 --- a/waku/node/health_monitor/protocol_health.nim +++ b/waku/node/health_monitor/protocol_health.nim @@ -1,5 +1,8 @@ import std/[options, strformat] import ./health_status +import waku/common/waku_protocol + +export waku_protocol type ProtocolHealth* = object protocol*: string @@ -39,8 +42,7 @@ proc shuttingDown*(p: var ProtocolHealth): ProtocolHealth = proc `$`*(p: ProtocolHealth): string = return fmt"protocol: {p.protocol}, health: {p.health}, description: {p.desc}" -proc init*(p: typedesc[ProtocolHealth], protocol: string): ProtocolHealth = - let p = ProtocolHealth( - protocol: protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]() +proc init*(p: typedesc[ProtocolHealth], protocol: WakuProtocol): ProtocolHealth = + return ProtocolHealth( + protocol: $protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]() ) - return p diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index bdb68905e..834fb19cf 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -1,27 +1,31 @@ {.push raises: [].} import - std/[options, sets, sequtils, times, strformat, strutils, math, random, tables], + std/ + [ + options, sets, sequtils, times, strformat, strutils, math, random, tables, + algorithm, + ], chronos, chronicles, metrics, - libp2p/multistream, - libp2p/muxers/muxer, - libp2p/nameresolving/nameresolver, - libp2p/peerstore - -import - ../../common/nimchronos, - ../../common/enr, - ../../common/callbacks, - ../../common/utils/parse_size_units, - ../../waku_core, - ../../waku_relay, - ../../waku_relay/protocol, - ../../waku_enr/sharding, - ../../waku_enr/capabilities, - ../../waku_metadata, - ../health_monitor/online_monitor, + libp2p/[multistream, muxers/muxer, nameresolving/nameresolver, peerstore], + waku/[ + waku_core, + waku_relay, + waku_metadata, + waku_core/topics/sharding, + waku_relay/protocol, + waku_enr/sharding, + waku_enr/capabilities, + events/peer_events, + common/nimchronos, + common/enr, + common/callbacks, + common/utils/parse_size_units, + common/broker/broker_context, + node/health_monitor/online_monitor, + ], ./peer_store/peer_storage, ./waku_peer_store @@ -84,6 +88,7 @@ type ConnectionChangeHandler* = proc( ): Future[void] {.gcsafe, raises: [Defect].} type PeerManager* = ref object of RootObj + brokerCtx: BrokerContext switch*: Switch wakuMetadata*: WakuMetadata initialBackoffInSec*: int @@ -483,8 +488,9 @@ proc canBeConnected*(pm: PeerManager, peerId: PeerId): bool = proc connectedPeers*( pm: PeerManager, protocol: string = "" ): (seq[PeerId], seq[PeerId]) = - ## Returns the peerIds of physical connections (in and out) - ## If a protocol is specified, only returns peers with at least one stream of that protocol + ## Returns the PeerIds of peers with an active socket connection. + ## If a protocol is specified, it returns peers that currently have one + ## or more active logical streams for that protocol. var inPeers: seq[PeerId] var outPeers: seq[PeerId] @@ -500,6 +506,65 @@ proc connectedPeers*( return (inPeers, outPeers) +proc capablePeers*(pm: PeerManager, protocol: string): (seq[PeerId], seq[PeerId]) = + ## Returns the PeerIds of peers with an active socket connection. + ## If a protocol is specified, it returns peers that have identified + ## themselves as supporting the protocol. + + var inPeers: seq[PeerId] + var outPeers: seq[PeerId] + + for peerId, muxers in pm.switch.connManager.getConnections(): + # filter out peers that don't have the capability registered in the peer store + if pm.switch.peerStore.hasPeer(peerId, protocol): + for peerConn in muxers: + if peerConn.connection.transportDir == Direction.In: + inPeers.add(peerId) + elif peerConn.connection.transportDir == Direction.Out: + outPeers.add(peerId) + + return (inPeers, outPeers) + +proc getConnectedPeersCount*(pm: PeerManager, protocol: string): int = + ## Returns the total number of unique connected peers (inbound + outbound) + ## with active streams for a specific protocol. + let (inPeers, outPeers) = pm.connectedPeers(protocol) + var peers = initHashSet[PeerId](nextPowerOfTwo(inPeers.len + outPeers.len)) + for p in inPeers: + peers.incl(p) + for p in outPeers: + peers.incl(p) + return peers.len + +proc getCapablePeersCount*(pm: PeerManager, protocol: string): int = + ## Returns the total number of unique connected peers (inbound + outbound) + ## who have identified themselves as supporting the given protocol. + let (inPeers, outPeers) = pm.capablePeers(protocol) + var peers = initHashSet[PeerId](nextPowerOfTwo(inPeers.len + outPeers.len)) + for p in inPeers: + peers.incl(p) + for p in outPeers: + peers.incl(p) + return peers.len + +proc getPeersForShard*(pm: PeerManager, protocolId: string, shard: PubsubTopic): int = + let (inPeers, outPeers) = pm.connectedPeers(protocolId) + let connectedProtocolPeers = inPeers & outPeers + if connectedProtocolPeers.len == 0: + return 0 + + let shardInfo = RelayShard.parse(shard).valueOr: + # count raw peers of the given protocol if for some reason we can't get + # a shard mapping out of the gossipsub topic string. + return connectedProtocolPeers.len + + var shardPeers = 0 + for peerId in connectedProtocolPeers: + if pm.switch.peerStore.hasShard(peerId, shardInfo.clusterId, shardInfo.shardId): + shardPeers.inc() + + return shardPeers + proc disconnectAllPeers*(pm: PeerManager) {.async.} = let (inPeerIds, outPeerIds) = pm.connectedPeers() let connectedPeers = concat(inPeerIds, outPeerIds) @@ -635,7 +700,7 @@ proc getPeerIp(pm: PeerManager, peerId: PeerId): Option[string] = # Event Handling # #~~~~~~~~~~~~~~~~~# -proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = +proc refreshPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = let res = catch: await pm.switch.dial(peerId, WakuMetadataCodec) @@ -664,6 +729,10 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = let shards = metadata.shards.mapIt(it.uint16) pm.switch.peerStore.setShardInfo(peerId, shards) + # TODO: should only trigger an event if metadata actually changed + # should include the shard subscription delta in the event when + # it is a MetadataUpdated event + EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventMetadataUpdated) return info "disconnecting from peer", peerId = peerId, reason = reason @@ -673,14 +742,14 @@ proc onPeerMetadata(pm: PeerManager, peerId: PeerId) {.async.} = # called when a peer i) first connects to us ii) disconnects all connections from us proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = if not pm.wakuMetadata.isNil() and event.kind == PeerEventKind.Joined: - await pm.onPeerMetadata(peerId) + await pm.refreshPeerMetadata(peerId) var peerStore = pm.switch.peerStore var direction: PeerDirection var connectedness: Connectedness case event.kind - of Joined: + of PeerEventKind.Joined: direction = if event.initiator: Outbound else: Inbound connectedness = Connected @@ -708,10 +777,12 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = asyncSpawn(pm.switch.disconnect(peerId)) peerStore.delete(peerId) + EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventConnected) + if not pm.onConnectionChange.isNil(): # we don't want to await for the callback to finish asyncSpawn pm.onConnectionChange(peerId, Joined) - of Left: + of PeerEventKind.Left: direction = UnknownDirection connectedness = CanConnect @@ -723,12 +794,16 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = pm.ipTable.del(ip) break + EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventDisconnected) + if not pm.onConnectionChange.isNil(): # we don't want to await for the callback to finish asyncSpawn pm.onConnectionChange(peerId, Left) - of Identified: + of PeerEventKind.Identified: info "event identified", peerId = peerId + EventWakuPeer.emit(pm.brokerCtx, peerId, WakuPeerEventKind.EventIdentified) + peerStore[ConnectionBook][peerId] = connectedness peerStore[DirectionBook][peerId] = direction @@ -1085,8 +1160,11 @@ proc new*( error "Max backoff time can't be over 1 week", maxBackoff = backoff raise newException(Defect, "Max backoff time can't be over 1 week") + let brokerCtx = globalBrokerContext() + let pm = PeerManager( switch: switch, + brokerCtx: brokerCtx, wakuMetadata: wakuMetadata, storage: storage, initialBackoffInSec: initialBackoffInSec, diff --git a/waku/node/peer_manager/waku_peer_store.nim b/waku/node/peer_manager/waku_peer_store.nim index b7f2669e5..a03b5ae2e 100644 --- a/waku/node/peer_manager/waku_peer_store.nim +++ b/waku/node/peer_manager/waku_peer_store.nim @@ -162,7 +162,9 @@ proc connectedness*(peerStore: PeerStore, peerId: PeerId): Connectedness = peerStore[ConnectionBook].book.getOrDefault(peerId, NotConnected) proc hasShard*(peerStore: PeerStore, peerId: PeerID, cluster, shard: uint16): bool = - peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard) + return + peerStore[ENRBook].book.getOrDefault(peerId).containsShard(cluster, shard) or + peerStore[ShardBook].book.getOrDefault(peerId, @[]).contains(shard) proc hasCapability*(peerStore: PeerStore, peerId: PeerID, cap: Capabilities): bool = peerStore[ENRBook].book.getOrDefault(peerId).supportsCapability(cap) @@ -219,7 +221,8 @@ proc getPeersByShard*( peerStore: PeerStore, cluster, shard: uint16 ): seq[RemotePeerInfo] = return peerStore.peers.filterIt( - it.enr.isSome() and it.enr.get().containsShard(cluster, shard) + (it.enr.isSome() and it.enr.get().containsShard(cluster, shard)) or + it.shards.contains(shard) ) proc getPeersByCapability*( diff --git a/waku/node/waku_node.nim b/waku/node/waku_node.nim index d556811ac..cb3d81c7c 100644 --- a/waku/node/waku_node.nim +++ b/waku/node/waku_node.nim @@ -42,6 +42,7 @@ import waku_store/resume, waku_store_sync, waku_filter_v2, + waku_filter_v2/common as filter_common, waku_filter_v2/client as filter_client, waku_metadata, waku_rendezvous/protocol, @@ -57,12 +58,18 @@ import common/rate_limit/setting, common/callbacks, common/nimchronos, + common/broker/broker_context, + common/broker/request_broker, waku_mix, requests/node_requests, - common/broker/broker_context, + requests/health_requests, + events/health_events, + events/peer_events, ], ./net_config, - ./peer_manager + ./peer_manager, + ./health_monitor/health_status, + ./health_monitor/topic_health declarePublicCounter waku_node_messages, "number of messages received", ["type"] @@ -91,6 +98,9 @@ const clientId* = "Nimbus Waku v2 node" const WakuNodeVersionString* = "version / git commit hash: " & git_version +const EdgeTopicHealthyThreshold = 2 + ## Lightpush server and filter server requirement for a healthy topic in edge mode + # key and crypto modules different type # TODO: Move to application instance (e.g., `WakuNode2`) @@ -135,6 +145,10 @@ type topicSubscriptionQueue*: AsyncEventQueue[SubscriptionEvent] rateLimitSettings*: ProtocolRateLimitSettings wakuMix*: WakuMix + edgeTopicsHealth*: Table[PubsubTopic, TopicHealth] + edgeHealthEvent*: AsyncEvent + edgeHealthLoop: Future[void] + peerEventListener*: EventWakuPeerListener proc deduceRelayShard( node: WakuNode, @@ -469,7 +483,52 @@ proc updateAnnouncedAddrWithPrimaryIpAddr*(node: WakuNode): Result[void, string] return ok() -proc startProvidersAndListeners(node: WakuNode) = +proc calculateEdgeTopicHealth(node: WakuNode, shard: PubsubTopic): TopicHealth = + let filterPeers = + node.peerManager.getPeersForShard(filter_common.WakuFilterSubscribeCodec, shard) + let lightpushPeers = + node.peerManager.getPeersForShard(lightpush_protocol.WakuLightPushCodec, shard) + + if filterPeers >= EdgeTopicHealthyThreshold and + lightpushPeers >= EdgeTopicHealthyThreshold: + return TopicHealth.SUFFICIENTLY_HEALTHY + elif filterPeers > 0 and lightpushPeers > 0: + return TopicHealth.MINIMALLY_HEALTHY + + return TopicHealth.UNHEALTHY + +proc loopEdgeHealth(node: WakuNode) {.async.} = + while node.started: + await node.edgeHealthEvent.wait() + node.edgeHealthEvent.clear() + + try: + for shard in node.edgeTopicsHealth.keys: + if not node.wakuRelay.isNil and node.wakuRelay.isSubscribed(shard): + continue + + let oldHealth = node.edgeTopicsHealth.getOrDefault(shard, TopicHealth.UNHEALTHY) + let newHealth = node.calculateEdgeTopicHealth(shard) + if newHealth != oldHealth: + node.edgeTopicsHealth[shard] = newHealth + EventShardTopicHealthChange.emit(node.brokerCtx, shard, newHealth) + except CancelledError: + break + except CatchableError as e: + warn "Error in edge health check", error = e.msg + + # safety cooldown to protect from edge cases + await sleepAsync(100.milliseconds) + +proc startProvidersAndListeners*(node: WakuNode) = + node.peerEventListener = EventWakuPeer.listen( + node.brokerCtx, + proc(evt: EventWakuPeer) {.async: (raises: []), gcsafe.} = + node.edgeHealthEvent.fire(), + ).valueOr: + error "Failed to listen to peer events", error = error + return + RequestRelayShard.setProvider( node.brokerCtx, proc( @@ -481,8 +540,60 @@ proc startProvidersAndListeners(node: WakuNode) = ).isOkOr: error "Can't set provider for RequestRelayShard", error = error -proc stopProvidersAndListeners(node: WakuNode) = + RequestShardTopicsHealth.setProvider( + node.brokerCtx, + proc(topics: seq[PubsubTopic]): Result[RequestShardTopicsHealth, string] = + var response: RequestShardTopicsHealth + + for shard in topics: + var healthStatus = TopicHealth.UNHEALTHY + + if not node.wakuRelay.isNil: + healthStatus = + node.wakuRelay.topicsHealth.getOrDefault(shard, TopicHealth.NOT_SUBSCRIBED) + + if healthStatus == TopicHealth.NOT_SUBSCRIBED: + healthStatus = node.calculateEdgeTopicHealth(shard) + + response.topicHealth.add((shard, healthStatus)) + + return ok(response), + ).isOkOr: + error "Can't set provider for RequestShardTopicsHealth", error = error + + RequestContentTopicsHealth.setProvider( + node.brokerCtx, + proc(topics: seq[ContentTopic]): Result[RequestContentTopicsHealth, string] = + var response: RequestContentTopicsHealth + + for contentTopic in topics: + var topicHealth = TopicHealth.NOT_SUBSCRIBED + + let shardResult = node.deduceRelayShard(contentTopic, none[PubsubTopic]()) + + if shardResult.isOk(): + let shardObj = shardResult.get() + let pubsubTopic = $shardObj + if not isNil(node.wakuRelay): + topicHealth = node.wakuRelay.topicsHealth.getOrDefault( + pubsubTopic, TopicHealth.NOT_SUBSCRIBED + ) + + if topicHealth == TopicHealth.NOT_SUBSCRIBED and + pubsubTopic in node.edgeTopicsHealth: + topicHealth = node.calculateEdgeTopicHealth(pubsubTopic) + + response.contentTopicHealth.add((topic: contentTopic, health: topicHealth)) + + return ok(response), + ).isOkOr: + error "Can't set provider for RequestContentTopicsHealth", error = error + +proc stopProvidersAndListeners*(node: WakuNode) = + EventWakuPeer.dropListener(node.brokerCtx, node.peerEventListener) RequestRelayShard.clearProvider(node.brokerCtx) + RequestContentTopicsHealth.clearProvider(node.brokerCtx) + RequestShardTopicsHealth.clearProvider(node.brokerCtx) proc start*(node: WakuNode) {.async.} = ## Starts a created Waku Node and @@ -532,6 +643,9 @@ proc start*(node: WakuNode) {.async.} = ## The switch will update addresses after start using the addressMapper await node.switch.start() + node.edgeHealthEvent = newAsyncEvent() + node.edgeHealthLoop = loopEdgeHealth(node) + node.startProvidersAndListeners() node.started = true @@ -549,6 +663,10 @@ proc stop*(node: WakuNode) {.async.} = node.stopProvidersAndListeners() + if not node.edgeHealthLoop.isNil: + await node.edgeHealthLoop.cancelAndWait() + node.edgeHealthLoop = nil + await node.switch.stop() node.peerManager.stop() diff --git a/waku/requests/health_request.nim b/waku/requests/health_request.nim deleted file mode 100644 index 9f98eba67..000000000 --- a/waku/requests/health_request.nim +++ /dev/null @@ -1,21 +0,0 @@ -import waku/common/broker/[request_broker, multi_request_broker] - -import waku/api/types -import waku/node/health_monitor/[protocol_health, topic_health] -import waku/waku_core/topics - -export protocol_health, topic_health - -RequestBroker(sync): - type RequestNodeHealth* = object - healthStatus*: NodeHealth - -RequestBroker(sync): - type RequestRelayTopicsHealth* = object - topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]] - - proc signature(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] - -MultiRequestBroker: - type RequestProtocolHealth* = object - healthStatus*: ProtocolHealth diff --git a/waku/requests/health_requests.nim b/waku/requests/health_requests.nim new file mode 100644 index 000000000..3554922b3 --- /dev/null +++ b/waku/requests/health_requests.nim @@ -0,0 +1,39 @@ +import waku/common/broker/request_broker + +import waku/api/types +import waku/node/health_monitor/[protocol_health, topic_health, health_report] +import waku/waku_core/topics +import waku/common/waku_protocol + +export protocol_health, topic_health + +# Get the overall node connectivity status +RequestBroker(sync): + type RequestConnectionStatus* = object + connectionStatus*: ConnectionStatus + +# Get the health status of a set of content topics +RequestBroker(sync): + type RequestContentTopicsHealth* = object + contentTopicHealth*: seq[tuple[topic: ContentTopic, health: TopicHealth]] + + proc signature(topics: seq[ContentTopic]): Result[RequestContentTopicsHealth, string] + +# Get a consolidated node health report +RequestBroker: + type RequestHealthReport* = object + healthReport*: HealthReport + +# Get the health status of a set of shards (pubsub topics) +RequestBroker(sync): + type RequestShardTopicsHealth* = object + topicHealth*: seq[tuple[topic: PubsubTopic, health: TopicHealth]] + + proc signature(topics: seq[PubsubTopic]): Result[RequestShardTopicsHealth, string] + +# Get the health status of a mounted protocol +RequestBroker: + type RequestProtocolHealth* = object + healthStatus*: ProtocolHealth + + proc signature(protocol: WakuProtocol): Future[Result[RequestProtocolHealth, string]] diff --git a/waku/requests/requests.nim b/waku/requests/requests.nim index 03e10f882..9225c0f3e 100644 --- a/waku/requests/requests.nim +++ b/waku/requests/requests.nim @@ -1,3 +1,3 @@ -import ./[health_request, rln_requests, node_requests] +import ./[health_requests, rln_requests, node_requests] -export health_request, rln_requests, node_requests +export health_requests, rln_requests, node_requests diff --git a/waku/rest_api/endpoint/health/types.nim b/waku/rest_api/endpoint/health/types.nim index 57f8b284c..88fa736a8 100644 --- a/waku/rest_api/endpoint/health/types.nim +++ b/waku/rest_api/endpoint/health/types.nim @@ -2,7 +2,8 @@ import results import chronicles, json_serialization, json_serialization/std/options -import ../../../waku_node, ../serdes +import ../serdes +import waku/[waku_node, api/types] #### Serialization and deserialization @@ -44,6 +45,7 @@ proc writeValue*( ) {.raises: [IOError].} = writer.beginRecord() writer.writeField("nodeHealth", $value.nodeHealth) + writer.writeField("connectionStatus", $value.connectionStatus) writer.writeField("protocolsHealth", value.protocolsHealth) writer.endRecord() @@ -52,6 +54,7 @@ proc readValue*( ) {.raises: [SerializationError, IOError].} = var nodeHealth: Option[HealthStatus] + connectionStatus: Option[ConnectionStatus] protocolsHealth: Option[seq[ProtocolHealth]] for fieldName in readObjectFields(reader): @@ -66,6 +69,16 @@ proc readValue*( reader.raiseUnexpectedValue("Invalid `health` value: " & $error) nodeHealth = some(health) + of "connectionStatus": + if connectionStatus.isSome(): + reader.raiseUnexpectedField( + "Multiple `connectionStatus` fields found", "HealthReport" + ) + + let state = ConnectionStatus.init(reader.readValue(string)).valueOr: + reader.raiseUnexpectedValue("Invalid `connectionStatus` value: " & $error) + + connectionStatus = some(state) of "protocolsHealth": if protocolsHealth.isSome(): reader.raiseUnexpectedField( @@ -79,5 +92,8 @@ proc readValue*( if nodeHealth.isNone(): reader.raiseUnexpectedValue("Field `nodeHealth` is missing") - value = - HealthReport(nodeHealth: nodeHealth.get, protocolsHealth: protocolsHealth.get(@[])) + value = HealthReport( + nodeHealth: nodeHealth.get, + connectionStatus: connectionStatus.get, + protocolsHealth: protocolsHealth.get(@[]), + ) diff --git a/waku/waku_relay/protocol.nim b/waku/waku_relay/protocol.nim index 3f343269a..17470af29 100644 --- a/waku/waku_relay/protocol.nim +++ b/waku/waku_relay/protocol.nim @@ -5,7 +5,7 @@ {.push raises: [].} import - std/[strformat, strutils], + std/[strformat, strutils, sets], stew/byteutils, results, sequtils, @@ -21,11 +21,13 @@ import import waku/waku_core, waku/node/health_monitor/topic_health, - waku/requests/health_request, + waku/requests/health_requests, + waku/events/health_events, ./message_id, - waku/common/broker/broker_context + waku/common/broker/broker_context, + waku/events/peer_events -from ../waku_core/codecs import WakuRelayCodec +from waku/waku_core/codecs import WakuRelayCodec export WakuRelayCodec type ShardMetrics = object @@ -154,6 +156,8 @@ type pubsubTopic: PubsubTopic, message: WakuMessage ): Future[ValidationResult] {.gcsafe, raises: [Defect].} WakuRelay* = ref object of GossipSub + brokerCtx: BrokerContext + peerEventListener: EventWakuPeerListener # seq of tuples: the first entry in the tuple contains the validators are called for every topic # the second entry contains the error messages to be returned when the validator fails wakuValidators: seq[tuple[handler: WakuValidatorHandler, errorMessage: string]] @@ -165,6 +169,11 @@ type topicsHealth*: Table[string, TopicHealth] onTopicHealthChange*: TopicHealthChangeHandler topicHealthLoopHandle*: Future[void] + topicHealthUpdateEvent: AsyncEvent + topicHealthDirty: HashSet[string] + # list of topics that need their health updated in the update event + topicHealthCheckAll: bool + # true if all topics need to have their health status refreshed in the update event msgMetricsPerShard*: Table[string, ShardMetrics] # predefinition for more detailed results from publishing new message @@ -287,6 +296,21 @@ proc initRelayObservers(w: WakuRelay) = ) proc onRecv(peer: PubSubPeer, msgs: var RPCMsg) = + if msgs.control.isSome(): + let ctrl = msgs.control.get() + var topicsChanged = false + + for graft in ctrl.graft: + w.topicHealthDirty.incl(graft.topicID) + topicsChanged = true + + for prune in ctrl.prune: + w.topicHealthDirty.incl(prune.topicID) + topicsChanged = true + + if topicsChanged: + w.topicHealthUpdateEvent.fire() + for msg in msgs.messages: let (msg_id_short, topic, wakuMessage, msgSize) = decodeRpcMessageInfo(peer, msg).valueOr: continue @@ -325,18 +349,6 @@ proc initRelayObservers(w: WakuRelay) = w.addObserver(administrativeObserver) -proc initRequestProviders(w: WakuRelay) = - RequestRelayTopicsHealth.setProvider( - globalBrokerContext(), - proc(topics: seq[PubsubTopic]): Result[RequestRelayTopicsHealth, string] = - var collectedRes: RequestRelayTopicsHealth - for topic in topics: - let health = w.topicsHealth.getOrDefault(topic, TopicHealth.NOT_SUBSCRIBED) - collectedRes.topicHealth.add((topic, health)) - return ok(collectedRes), - ).isOkOr: - error "Cannot set Relay Topics Health request provider", error = error - proc new*( T: type WakuRelay, switch: Switch, maxMessageSize = int(DefaultMaxWakuMessageSize) ): WakuRelayResult[T] = @@ -354,12 +366,25 @@ proc new*( maxMessageSize = maxMessageSize, parameters = GossipsubParameters, ) + w.brokerCtx = globalBrokerContext() procCall GossipSub(w).initPubSub() w.topicsHealth = initTable[string, TopicHealth]() + w.topicHealthUpdateEvent = newAsyncEvent() + w.topicHealthDirty = initHashSet[string]() + w.topicHealthCheckAll = false w.initProtocolHandler() w.initRelayObservers() - w.initRequestProviders() + + w.peerEventListener = EventWakuPeer.listen( + w.brokerCtx, + proc(evt: EventWakuPeer): Future[void] {.async: (raises: []), gcsafe.} = + if evt.kind == WakuPeerEventKind.EventDisconnected: + w.topicHealthCheckAll = true + w.topicHealthUpdateEvent.fire() + , + ).valueOr: + return err("Failed to subscribe to peer events: " & error) except InitializationError: return err("initialization error: " & getCurrentExceptionMsg()) @@ -437,38 +462,58 @@ proc calculateTopicHealth(wakuRelay: WakuRelay, topic: string): TopicHealth = return TopicHealth.MINIMALLY_HEALTHY return TopicHealth.SUFFICIENTLY_HEALTHY -proc updateTopicsHealth(wakuRelay: WakuRelay) {.async.} = - var futs = newSeq[Future[void]]() - for topic in toSeq(wakuRelay.topics.keys): - ## loop over all the topics I'm subscribed to - let - oldHealth = wakuRelay.topicsHealth.getOrDefault(topic) - currentHealth = wakuRelay.calculateTopicHealth(topic) +proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool = + GossipSub(w).topics.hasKey(topic) - if oldHealth == currentHealth: - continue +proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] = + return toSeq(GossipSub(w).topics.keys()) - wakuRelay.topicsHealth[topic] = currentHealth - if not wakuRelay.onTopicHealthChange.isNil(): - let fut = wakuRelay.onTopicHealthChange(topic, currentHealth) - if not fut.completed(): # Fast path for successful sync handlers - futs.add(fut) +proc topicsHealthLoop(w: WakuRelay) {.async.} = + while true: + await w.topicHealthUpdateEvent.wait() + w.topicHealthUpdateEvent.clear() + + var topicsToCheck: seq[string] + + if w.topicHealthCheckAll: + topicsToCheck = toSeq(w.topics.keys) + else: + topicsToCheck = toSeq(w.topicHealthDirty) + + w.topicHealthCheckAll = false + w.topicHealthDirty.clear() + + var futs = newSeq[Future[void]]() + + for topic in topicsToCheck: + # guard against topic being unsubscribed since fire() + if not w.isSubscribed(topic): + continue + + let + oldHealth = w.topicsHealth.getOrDefault(topic, TopicHealth.UNHEALTHY) + currentHealth = w.calculateTopicHealth(topic) + + if oldHealth == currentHealth: + continue + + w.topicsHealth[topic] = currentHealth + + EventShardTopicHealthChange.emit(w.brokerCtx, topic, currentHealth) + + if not w.onTopicHealthChange.isNil(): + futs.add(w.onTopicHealthChange(topic, currentHealth)) if futs.len() > 0: - # slow path - we have to wait for the handlers to complete try: - futs = await allFinished(futs) + discard await allFinished(futs) except CancelledError: - # check for errors in futures - for fut in futs: - if fut.failed: - let err = fut.readError() - warn "Error in health change handler", description = err.msg + break + except CatchableError as e: + warn "Error in topic health callback", error = e.msg -proc topicsHealthLoop(wakuRelay: WakuRelay) {.async.} = - while true: - await wakuRelay.updateTopicsHealth() - await sleepAsync(10.seconds) + # safety cooldown to protect from edge cases + await sleepAsync(100.milliseconds) method start*(w: WakuRelay) {.async, base.} = info "start" @@ -478,15 +523,13 @@ method start*(w: WakuRelay) {.async, base.} = method stop*(w: WakuRelay) {.async, base.} = info "stop" await procCall GossipSub(w).stop() + + if w.peerEventListener.id != 0: + EventWakuPeer.dropListener(w.brokerCtx, w.peerEventListener) + if not w.topicHealthLoopHandle.isNil(): await w.topicHealthLoopHandle.cancelAndWait() -proc isSubscribed*(w: WakuRelay, topic: PubsubTopic): bool = - GossipSub(w).topics.hasKey(topic) - -proc subscribedTopics*(w: WakuRelay): seq[PubsubTopic] = - return toSeq(GossipSub(w).topics.keys()) - proc generateOrderedValidator(w: WakuRelay): ValidatorHandler {.gcsafe.} = # rejects messages that are not WakuMessage let wrappedValidator = proc( @@ -584,7 +627,8 @@ proc subscribe*(w: WakuRelay, pubsubTopic: PubsubTopic, handler: WakuRelayHandle procCall GossipSub(w).subscribe(pubsubTopic, topicHandler) w.topicHandlers[pubsubTopic] = topicHandler - asyncSpawn w.updateTopicsHealth() + w.topicHealthDirty.incl(pubsubTopic) + w.topicHealthUpdateEvent.fire() proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) = ## Unsubscribe all handlers on this pubsub topic @@ -594,6 +638,8 @@ proc unsubscribeAll*(w: WakuRelay, pubsubTopic: PubsubTopic) = procCall GossipSub(w).unsubscribeAll(pubsubTopic) w.topicValidator.del(pubsubTopic) w.topicHandlers.del(pubsubTopic) + w.topicsHealth.del(pubsubTopic) + w.topicHealthDirty.excl(pubsubTopic) proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) = if not w.topicValidator.hasKey(pubsubTopic): @@ -619,6 +665,8 @@ proc unsubscribe*(w: WakuRelay, pubsubTopic: PubsubTopic) = w.topicValidator.del(pubsubTopic) w.topicHandlers.del(pubsubTopic) + w.topicsHealth.del(pubsubTopic) + w.topicHealthDirty.excl(pubsubTopic) proc publish*( w: WakuRelay, pubsubTopic: PubsubTopic, wakuMessage: WakuMessage