From 2e40f2971f97ecf6bc63db101c8edf6a28737cdc Mon Sep 17 00:00:00 2001 From: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> Date: Mon, 16 Jun 2025 18:44:21 +0200 Subject: [PATCH] chore: refactor to unify online and health monitors (#3456) --- .../liteprotocoltester/liteprotocoltester.nim | 137 +++----- apps/wakunode2/wakunode2.nim | 53 +--- library/libwaku.nim | 4 +- .../requests/debug_node_request.nim | 8 +- .../requests/node_lifecycle_request.nim | 5 +- .../requests/peer_manager_request.nim | 3 - tests/wakunode_rest/test_rest_health.nim | 4 +- waku/factory/builder.nim | 1 - waku/factory/node_factory.nim | 2 +- waku/factory/waku.nim | 87 ++++-- waku/node/health_monitor.nim | 295 +----------------- waku/node/health_monitor/health_status.nim | 16 + .../health_monitor/node_health_monitor.nim | 270 ++++++++++++++++ waku/node/health_monitor/online_monitor.nim | 77 +++++ waku/node/health_monitor/protocol_health.nim | 46 +++ waku/node/peer_manager/peer_manager.nim | 48 +-- waku/waku_api/rest/builder.nim | 2 +- waku/waku_api/rest/health/handlers.nim | 2 +- waku/waku_api/rest/health/types.nim | 21 +- 19 files changed, 559 insertions(+), 522 deletions(-) create mode 100644 waku/node/health_monitor/health_status.nim create mode 100644 waku/node/health_monitor/node_health_monitor.nim create mode 100644 waku/node/health_monitor/online_monitor.nim create mode 100644 waku/node/health_monitor/protocol_health.nim diff --git a/apps/liteprotocoltester/liteprotocoltester.nim b/apps/liteprotocoltester/liteprotocoltester.nim index 598d1a7ec..939332cff 100644 --- a/apps/liteprotocoltester/liteprotocoltester.nim +++ b/apps/liteprotocoltester/liteprotocoltester.nim @@ -14,13 +14,11 @@ import waku/[ common/enr, common/logging, - factory/waku, + factory/waku as waku_factory, factory/external_config, waku_node, - node/health_monitor, node/waku_metrics, node/peer_manager, - waku_api/rest/builder as rest_server_builder, waku_lightpush/common, waku_filter_v2, waku_peer_exchange/protocol, @@ -49,7 +47,7 @@ when isMainModule: ## 5. Start monitoring tools and external interfaces ## 6. Setup graceful shutdown hooks - const versionString = "version / git commit hash: " & waku.git_version + const versionString = "version / git commit hash: " & waku_factory.git_version let confRes = LiteProtocolTesterConf.load(version = versionString) if confRes.isErr(): @@ -61,7 +59,7 @@ when isMainModule: ## Logging setup logging.setupLog(conf.logLevel, conf.logFormat) - info "Running Lite Protocol Tester node", version = waku.git_version + info "Running Lite Protocol Tester node", version = waku_factory.git_version logConfig(conf) ##Prepare Waku configuration @@ -69,13 +67,13 @@ when isMainModule: ## - override according to tester functionality ## - var wConf: WakuNodeConf + var wakuNodeConf: WakuNodeConf if conf.configFile.isSome(): try: var configFile {.threadvar.}: InputFile configFile = conf.configFile.get() - wConf = WakuNodeConf.load( + wakuNodeConf = WakuNodeConf.load( version = versionString, printUsage = false, secondarySources = proc( @@ -88,101 +86,54 @@ when isMainModule: error "Loading Waku configuration failed", error = getCurrentExceptionMsg() quit(QuitFailure) - wConf.logLevel = conf.logLevel - wConf.logFormat = conf.logFormat - wConf.nat = conf.nat - wConf.maxConnections = 500 - wConf.restAddress = conf.restAddress - wConf.restPort = conf.restPort - wConf.restAllowOrigin = conf.restAllowOrigin + wakuNodeConf.logLevel = conf.logLevel + wakuNodeConf.logFormat = conf.logFormat + wakuNodeConf.nat = conf.nat + wakuNodeConf.maxConnections = 500 + wakuNodeConf.restAddress = conf.restAddress + wakuNodeConf.restPort = conf.restPort + wakuNodeConf.restAllowOrigin = conf.restAllowOrigin - wConf.dnsAddrsNameServers = @[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")] + wakuNodeConf.dnsAddrsNameServers = + @[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")] - wConf.shards = @[conf.shard] - wConf.contentTopics = conf.contentTopics - wConf.clusterId = conf.clusterId + wakuNodeConf.shards = @[conf.shard] + wakuNodeConf.contentTopics = conf.contentTopics + wakuNodeConf.clusterId = conf.clusterId ## TODO: Depending on the tester needs we might extend here with shards, clusterId, etc... - wConf.metricsServer = true - wConf.metricsServerAddress = parseIpAddress("0.0.0.0") - wConf.metricsServerPort = conf.metricsPort + wakuNodeConf.metricsServer = true + wakuNodeConf.metricsServerAddress = parseIpAddress("0.0.0.0") + wakuNodeConf.metricsServerPort = conf.metricsPort # If bootstrap option is chosen we expect our clients will not mounted # so we will mount PeerExchange manually to gather possible service peers, # if got some we will mount the client protocols afterward. - wConf.peerExchange = false - wConf.relay = false - wConf.filter = false - wConf.lightpush = false - wConf.store = false + wakuNodeConf.peerExchange = false + wakuNodeConf.relay = false + wakuNodeConf.filter = false + wakuNodeConf.lightpush = false + wakuNodeConf.store = false - wConf.rest = false - wConf.relayServiceRatio = "40:60" + wakuNodeConf.rest = false + wakuNodeConf.relayServiceRatio = "40:60" - # NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it - # It will always be called from main thread anyway. - # Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety - var nodeHealthMonitor {.threadvar.}: WakuNodeHealthMonitor - nodeHealthMonitor = WakuNodeHealthMonitor() - nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING) - - let wakuConf = wConf.toWakuConf().valueOr: - error "Waku configuration failed", error = error + let wakuConf = wakuNodeConf.toWakuConf().valueOr: + error "Issue converting toWakuConf", error = $error quit(QuitFailure) - let restServer: WakuRestServerRef = - if wakuConf.restServerConf.isSome(): - rest_server_builder.startRestServerEssentials( - nodeHealthMonitor, wakuConf.restServerConf.get(), wakuConf.portsShift - ).valueOr: - error "Starting essential REST server failed.", error = $error - quit(QuitFailure) - else: - nil - - var wakuApp = Waku.new(wakuConf).valueOr: + var waku = Waku.new(wakuConf).valueOr: error "Waku initialization failed", error = error quit(QuitFailure) - wakuApp.restServer = restServer - - nodeHealthMonitor.setNode(wakuApp.node) - - (waitFor startWaku(addr wakuApp)).isOkOr: + (waitFor startWaku(addr waku)).isOkOr: error "Starting waku failed", error = error quit(QuitFailure) - if wakuConf.restServerConf.isSome(): - rest_server_builder.startRestServerProtocolSupport( - restServer, - wakuApp.node, - wakuApp.wakuDiscv5, - wakuConf.restServerConf.get(), - wakuConf.relay, - wakuConf.lightPush, - wakuConf.clusterId, - wakuConf.shards, - wakuConf.contentTopics, - ).isOkOr: - error "Starting protocols support REST server failed.", error = $error - quit(QuitFailure) - - if wakuConf.metricsServerConf.isSome(): - wakuApp.metricsServer = waku_metrics.startMetricsServerAndLogging( - wakuConf.metricsServerConf.get(), wakuConf.portsShift - ).valueOr: - error "Starting monitoring and external interfaces failed", error = error - quit(QuitFailure) - - nodeHealthMonitor.setOverallHealth(HealthStatus.READY) - debug "Setting up shutdown hooks" - ## Setup shutdown hooks for this process. - ## Stop node gracefully on shutdown. - proc asyncStopper(wakuApp: Waku) {.async: (raises: [Exception]).} = - nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN) - await wakuApp.stop() + proc asyncStopper(waku: Waku) {.async: (raises: [Exception]).} = + await waku.stop() quit(QuitSuccess) # Handle Ctrl-C SIGINT @@ -191,7 +142,7 @@ when isMainModule: # workaround for https://github.com/nim-lang/Nim/issues/4057 setupForeignThreadGc() notice "Shutting down after receiving SIGINT" - asyncSpawn asyncStopper(wakuApp) + asyncSpawn asyncStopper(waku) setControlCHook(handleCtrlC) @@ -199,7 +150,7 @@ when isMainModule: when defined(posix): proc handleSigterm(signal: cint) {.noconv.} = notice "Shutting down after receiving SIGTERM" - asyncSpawn asyncStopper(wakuApp) + asyncSpawn asyncStopper(waku) c_signal(ansi_c.SIGTERM, handleSigterm) @@ -212,22 +163,26 @@ when isMainModule: # Not available in -d:release mode writeStackTrace() - waitFor wakuApp.stop() + waitFor waku.stop() quit(QuitFailure) c_signal(ansi_c.SIGSEGV, handleSigsegv) info "Node setup complete" - let codec = conf.getCodec() + var codec = WakuLightPushCodec # mounting relevant client, for PX filter client must be mounted ahead + if conf.testFunc == TesterFunctionality.SENDER: + codec = WakuLightPushCodec + else: + codec = WakuFilterSubscribeCodec var lookForServiceNode = false var serviceNodePeerInfo: RemotePeerInfo if conf.serviceNode.len == 0: if conf.bootstrapNode.len > 0: info "Bootstrapping with PeerExchange to gather random service node" - let futForServiceNode = pxLookupServiceNode(wakuApp.node, conf) + let futForServiceNode = pxLookupServiceNode(waku.node, conf) if not (waitFor futForServiceNode.withTimeout(20.minutes)): error "Service node not found in time via PX" quit(QuitFailure) @@ -237,7 +192,7 @@ when isMainModule: quit(QuitFailure) serviceNodePeerInfo = selectRandomServicePeer( - wakuApp.node.peerManager, none(RemotePeerInfo), codec + waku.node.peerManager, none(RemotePeerInfo), codec ).valueOr: error "Service node selection failed" quit(QuitFailure) @@ -252,11 +207,11 @@ when isMainModule: info "Service node to be used", serviceNode = $serviceNodePeerInfo - logSelfPeers(wakuApp.node.peerManager) + logSelfPeers(waku.node.peerManager) if conf.testFunc == TesterFunctionality.SENDER: - setupAndPublish(wakuApp.node, conf, serviceNodePeerInfo) + setupAndPublish(waku.node, conf, serviceNodePeerInfo) else: - setupAndListen(wakuApp.node, conf, serviceNodePeerInfo) + setupAndListen(waku.node, conf, serviceNodePeerInfo) runForever() diff --git a/apps/wakunode2/wakunode2.nim b/apps/wakunode2/wakunode2.nim index a99cfcb52..5e6cbb700 100644 --- a/apps/wakunode2/wakunode2.nim +++ b/apps/wakunode2/wakunode2.nim @@ -16,7 +16,6 @@ import factory/external_config, factory/waku, node/health_monitor, - node/waku_metrics, waku_api/rest/builder as rest_server_builder, ] @@ -53,69 +52,21 @@ when isMainModule: let conf = wakuNodeConf.toInspectRlnDbConf() doInspectRlnDb(conf) of noCommand: - # NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it - # It will always be called from main thread anyway. - # Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety - var nodeHealthMonitor {.threadvar.}: WakuNodeHealthMonitor - nodeHealthMonitor = WakuNodeHealthMonitor() - nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING) - let conf = wakuNodeConf.toWakuConf().valueOr: error "Waku configuration failed", error = error quit(QuitFailure) - var restServer: WakuRestServerRef = nil - - if conf.restServerConf.isSome(): - restServer = rest_server_builder.startRestServerEssentials( - nodeHealthMonitor, conf.restServerConf.get(), conf.portsShift - ).valueOr: - error "Starting essential REST server failed.", error = $error - quit(QuitFailure) - var waku = Waku.new(conf).valueOr: error "Waku initialization failed", error = error quit(QuitFailure) - waku.restServer = restServer - - nodeHealthMonitor.setNode(waku.node) - (waitFor startWaku(addr waku)).isOkOr: error "Starting waku failed", error = error quit(QuitFailure) - if conf.restServerConf.isSome(): - rest_server_builder.startRestServerProtocolSupport( - restServer, - waku.node, - waku.wakuDiscv5, - conf.restServerConf.get(), - conf.relay, - conf.lightPush, - conf.clusterId, - conf.shards, - conf.contentTopics, - ).isOkOr: - error "Starting protocols support REST server failed.", error = $error - quit(QuitFailure) - - if conf.metricsServerConf.isSome(): - waku.metricsServer = waku_metrics.startMetricsServerAndLogging( - conf.metricsServerConf.get(), conf.portsShift - ).valueOr: - error "Starting monitoring and external interfaces failed", error = error - quit(QuitFailure) - - nodeHealthMonitor.setOverallHealth(HealthStatus.READY) - debug "Setting up shutdown hooks" - ## Setup shutdown hooks for this process. - ## Stop node gracefully on shutdown. - - proc asyncStopper(node: Waku) {.async: (raises: [Exception]).} = - nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN) - await node.stop() + proc asyncStopper(waku: Waku) {.async: (raises: [Exception]).} = + await waku.stop() quit(QuitSuccess) # Handle Ctrl-C SIGINT diff --git a/library/libwaku.nim b/library/libwaku.nim index 3774ad0a8..3e4431411 100644 --- a/library/libwaku.nim +++ b/library/libwaku.nim @@ -878,8 +878,8 @@ proc waku_is_online( handleRequest( ctx, - RequestType.PEER_MANAGER, - PeerManagementRequest.createShared(PeerManagementMsgType.IS_ONLINE), + RequestType.DEBUG, + DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_ONLINE_STATE), callback, userData, ) diff --git a/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim b/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim index dc0bc72f5..4ab8914ee 100644 --- a/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/debug_node_request.nim @@ -7,13 +7,17 @@ import strutils, libp2p/peerid, metrics -import ../../../../waku/factory/waku, ../../../../waku/node/waku_node +import + ../../../../waku/factory/waku, + ../../../../waku/node/waku_node, + ../../../../waku/node/health_monitor type DebugNodeMsgType* = enum RETRIEVE_LISTENING_ADDRESSES RETRIEVE_MY_ENR RETRIEVE_MY_PEER_ID RETRIEVE_METRICS + RETRIEVE_ONLINE_STATE type DebugNodeRequest* = object operation: DebugNodeMsgType @@ -49,6 +53,8 @@ proc process*( return ok($waku.node.peerId()) of RETRIEVE_METRICS: return ok(getMetrics()) + of RETRIEVE_ONLINE_STATE: + return ok($waku.healthMonitor.onlineMonitor.amIOnline()) error "unsupported operation in DebugNodeRequest" return err("unsupported operation in DebugNodeRequest") diff --git a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim index 8d504df89..0f912aaa3 100644 --- a/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/node_lifecycle_request.nim @@ -8,6 +8,7 @@ import ../../../../waku/factory/node_factory, ../../../../waku/factory/networks_config, ../../../../waku/factory/app_callbacks, + ../../../../waku/waku_api/rest/builder, ../../../alloc type NodeLifecycleMsgType* = enum @@ -73,9 +74,11 @@ proc createWaku( appCallbacks.topicHealthChangeHandler = nil # TODO: Convert `confJson` directly to `WakuConf` - let wakuConf = conf.toWakuConf().valueOr: + var wakuConf = conf.toWakuConf().valueOr: return err("Configuration error: " & $error) + wakuConf.restServerConf = none(RestServerConf) ## don't want REST in libwaku + let wakuRes = Waku.new(wakuConf, appCallbacks).valueOr: error "waku initialization failed", error = error return err("Failed setting up Waku: " & $error) diff --git a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim index 55728780f..deb520366 100644 --- a/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim +++ b/library/waku_thread/inter_thread_communication/requests/peer_manager_request.nim @@ -16,7 +16,6 @@ type PeerManagementMsgType* {.pure.} = enum DIAL_PEER DIAL_PEER_BY_ID GET_CONNECTED_PEERS - IS_ONLINE type PeerManagementRequest* = object operation: PeerManagementMsgType @@ -156,7 +155,5 @@ proc process*( (inPeerIds, outPeerIds) = waku.node.peerManager.connectedPeers() connectedPeerids = concat(inPeerIds, outPeerIds) return ok(connectedPeerids.mapIt($it).join(",")) - of IS_ONLINE: - return ok($waku.node.peerManager.isOnline()) return ok("") diff --git a/tests/wakunode_rest/test_rest_health.nim b/tests/wakunode_rest/test_rest_health.nim index 93838b4fe..964e09c5b 100644 --- a/tests/wakunode_rest/test_rest_health.nim +++ b/tests/wakunode_rest/test_rest_health.nim @@ -39,7 +39,7 @@ suite "Waku v2 REST API - health": asyncTest "Get node health info - GET /health": # Given let node = testWakuNode() - let healthMonitor = WakuNodeHealthMonitor() + let healthMonitor = NodeHealthMonitor() await node.start() (await node.mountRelay()).isOkOr: assert false, "Failed to mount relay" @@ -78,7 +78,7 @@ suite "Waku v2 REST API - health": node.mountLightPushClient() await node.mountFilterClient() - healthMonitor.setNode(node) + healthMonitor.setNodeToHealthMonitor(node) healthMonitor.setOverallHealth(HealthStatus.READY) # When response = await client.healthCheck() diff --git a/waku/factory/builder.nim b/waku/factory/builder.nim index 18f1535ea..b05d5d054 100644 --- a/waku/factory/builder.nim +++ b/waku/factory/builder.nim @@ -209,7 +209,6 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] = maxServicePeers = some(builder.maxServicePeers), colocationLimit = builder.colocationLimit, shardedPeerManagement = builder.shardAware, - dnsNameServers = netConfig.dnsNameServers, ) var node: WakuNode diff --git a/waku/factory/node_factory.nim b/waku/factory/node_factory.nim index 7063f8476..2c363c6c4 100644 --- a/waku/factory/node_factory.nim +++ b/waku/factory/node_factory.nim @@ -426,7 +426,7 @@ proc startNode*( ## Connect to static nodes and start ## keep-alive, if configured. - # Start Waku v2 node + info "Running nwaku node", version = git_version try: await node.start() except CatchableError: diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index 006093648..2602120d8 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -26,9 +26,11 @@ import ../waku_node, ../node/peer_manager, ../node/health_monitor, + ../node/waku_metrics, ../node/delivery_monitor/delivery_monitor, ../waku_api/message_cache, ../waku_api/rest/server, + ../waku_api/rest/builder as rest_server_builder, ../waku_archive, ../waku_relay/protocol, ../discovery/waku_dnsdisc, @@ -66,6 +68,8 @@ type Waku* = ref object node*: WakuNode + healthMonitor*: NodeHealthMonitor + deliveryMonitor: DeliveryMonitor restServer*: WakuRestServerRef @@ -159,19 +163,33 @@ proc new*( logging.setupLog(wakuConf.logLevel, wakuConf.logFormat) ?wakuConf.validate() - wakuConf.logConf() - info "Running nwaku node", version = git_version + let healthMonitor = NodeHealthMonitor.new(wakuConf.dnsAddrsNameServers) + + let restServer: WakuRestServerRef = + if wakuConf.restServerConf.isSome(): + let restServer = startRestServerEssentials( + healthMonitor, wakuConf.restServerConf.get(), wakuConf.portsShift + ).valueOr: + error "Starting essential REST server failed", error = $error + return err("Failed to start essential REST server in Waku.new: " & $error) + + restServer + else: + nil var relay = newCircuitRelay(wakuConf.circuitRelayClient) - let nodeRes = setupNode(wakuConf, rng, relay) - if nodeRes.isErr(): - error "Failed setting up node", error = nodeRes.error - return err("Failed setting up node: " & nodeRes.error) + let node = setupNode(wakuConf, rng, relay).valueOr: + error "Failed setting up node", error = $error + return err("Failed setting up node: " & $error) - let node = nodeRes.get() + healthMonitor.setNodeToHealthMonitor(node) + healthMonitor.onlineMonitor.setPeerStoreToOnlineMonitor(node.switch.peerStore) + healthMonitor.onlineMonitor.addOnlineStateObserver( + node.peerManager.getOnlineStateObserver() + ) node.setupAppCallbacks(wakuConf, appCallbacks).isOkOr: error "Failed setting up app callbacks", error = error @@ -197,8 +215,10 @@ proc new*( rng: rng, key: wakuConf.nodeKey, node: node, + healthMonitor: healthMonitor, deliveryMonitor: deliveryMonitor, appCallbacks: appCallbacks, + restServer: restServer, ) waku.setupSwitchServices(wakuConf, relay, rng) @@ -334,15 +354,6 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} = error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg() return -# The network connectivity loop checks periodically whether the node is online or not -# and triggers any change that depends on the network connectivity state -proc startNetworkConnectivityLoop(waku: Waku): Future[void] {.async.} = - while true: - await sleepAsync(15.seconds) - - # Update online state - await waku.node.peerManager.updateOnlineState() - proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = debug "Retrieve dynamic bootstrap nodes" let conf = waku[].conf @@ -369,7 +380,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = return err("Error in updateApp: " & $error) ## Discv5 - if conf.discv5Conf.isSome: + if conf.discv5Conf.isSome(): waku[].wakuDiscV5 = waku_discv5.setupDiscoveryV5( waku.node.enr, waku.node.peerManager, @@ -389,23 +400,41 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} = if not waku[].deliveryMonitor.isNil(): waku[].deliveryMonitor.startDeliveryMonitor() - # Start network connectivity check loop - waku[].networkConnLoopHandle = waku[].startNetworkConnectivityLoop() + ## Health Monitor + waku[].healthMonitor.startHealthMonitor() + + if conf.restServerConf.isSome(): + rest_server_builder.startRestServerProtocolSupport( + waku[].restServer, + waku[].node, + waku[].wakuDiscv5, + conf.restServerConf.get(), + conf.relay, + conf.lightPush, + conf.clusterId, + conf.shards, + conf.contentTopics, + ).isOkOr: + return err ("Starting protocols support REST server failed: " & $error) + + if conf.metricsServerConf.isSome(): + waku[].metricsServer = waku_metrics.startMetricsServerAndLogging( + conf.metricsServerConf.get(), conf.portsShift + ).valueOr: + return err("Starting monitoring and external interfaces failed: " & error) + + waku[].healthMonitor.setOverallHealth(HealthStatus.READY) return ok() -# Waku shutdown - proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} = - if not waku.restServer.isNil(): - await waku.restServer.stop() + ## Waku shutdown + + waku.healthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN) if not waku.metricsServer.isNil(): await waku.metricsServer.stop() - if not waku.networkConnLoopHandle.isNil(): - await waku.networkConnLoopHandle.cancelAndWait() - if not waku.wakuDiscv5.isNil(): await waku.wakuDiscv5.stop() @@ -414,3 +443,9 @@ proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} = if not waku.dnsRetryLoopHandle.isNil(): await waku.dnsRetryLoopHandle.cancelAndWait() + + if not waku.healthMonitor.isNil(): + await waku.healthMonitor.stopHealthMonitor() + + if not waku.restServer.isNil(): + await waku.restServer.stop() diff --git a/waku/node/health_monitor.nim b/waku/node/health_monitor.nim index aa9082ec6..854a8bbc0 100644 --- a/waku/node/health_monitor.nim +++ b/waku/node/health_monitor.nim @@ -1,293 +1,4 @@ -{.push raises: [].} +import + health_monitor/[node_health_monitor, protocol_health, online_monitor, health_status] -import std/[options, sets], chronos, libp2p/protocols/rendezvous - -import waku_node, ../waku_rln_relay, ../waku_relay, ./peer_manager - -type - HealthStatus* = enum - INITIALIZING - SYNCHRONIZING - READY - NOT_READY - NOT_MOUNTED - SHUTTING_DOWN - - ProtocolHealth* = object - protocol*: string - health*: HealthStatus - desc*: Option[string] ## describes why a certain protocol is considered `NOT_READY` - - HealthReport* = object - nodeHealth*: HealthStatus - protocolsHealth*: seq[ProtocolHealth] - - WakuNodeHealthMonitor* = ref object - nodeHealth: HealthStatus - node: Option[WakuNode] - -proc `$`*(t: HealthStatus): string = - result = - case t - of INITIALIZING: "Initializing" - of SYNCHRONIZING: "Synchronizing" - of READY: "Ready" - of NOT_READY: "Not Ready" - of NOT_MOUNTED: "Not Mounted" - of SHUTTING_DOWN: "Shutting Down" - -proc init*( - t: typedesc[HealthStatus], strRep: string -): HealthStatus {.raises: [ValueError].} = - case strRep - of "Initializing": - return HealthStatus.INITIALIZING - of "Synchronizing": - return HealthStatus.SYNCHRONIZING - of "Ready": - return HealthStatus.READY - of "Not Ready": - return HealthStatus.NOT_READY - of "Not Mounted": - return HealthStatus.NOT_MOUNTED - of "Shutting Down": - return HealthStatus.SHUTTING_DOWN - else: - raise newException(ValueError, "Invalid HealthStatus string representation") - -proc init*(p: typedesc[ProtocolHealth], protocol: string): ProtocolHealth = - let p = ProtocolHealth( - protocol: protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]() - ) - return p - -proc notReady(p: var ProtocolHealth, desc: string): ProtocolHealth = - p.health = HealthStatus.NOT_READY - p.desc = some(desc) - return p - -proc ready(p: var ProtocolHealth): ProtocolHealth = - p.health = HealthStatus.READY - p.desc = none[string]() - return p - -proc notMounted(p: var ProtocolHealth): ProtocolHealth = - p.health = HealthStatus.NOT_MOUNTED - p.desc = none[string]() - return p - -proc synchronizing(p: var ProtocolHealth): ProtocolHealth = - p.health = HealthStatus.SYNCHRONIZING - p.desc = none[string]() - return p - -proc initializing(p: var ProtocolHealth): ProtocolHealth = - p.health = HealthStatus.INITIALIZING - p.desc = none[string]() - return p - -proc shuttingDown(p: var ProtocolHealth): ProtocolHealth = - p.health = HealthStatus.SHUTTING_DOWN - p.desc = none[string]() - return p - -const FutIsReadyTimout = 5.seconds - -proc getRelayHealth(hm: WakuNodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Relay") - if hm.node.get().wakuRelay == nil: - return p.notMounted() - - let relayPeers = hm.node - .get().wakuRelay - .getConnectedPubSubPeers(pubsubTopic = "").valueOr: - return p.notMounted() - - if relayPeers.len() == 0: - return p.notReady("No connected peers") - - return p.ready() - -proc getRlnRelayHealth(hm: WakuNodeHealthMonitor): Future[ProtocolHealth] {.async.} = - var p = ProtocolHealth.init("Rln Relay") - if hm.node.get().wakuRlnRelay.isNil(): - return p.notMounted() - - let isReadyStateFut = hm.node.get().wakuRlnRelay.isReady() - if not await isReadyStateFut.withTimeout(FutIsReadyTimout): - return p.notReady("Ready state check timed out") - - try: - if not isReadyStateFut.completed(): - return p.notReady("Ready state check timed out") - elif isReadyStateFut.read(): - return p.ready() - - return p.synchronizing() - except: - error "exception reading state: " & getCurrentExceptionMsg() - return p.notReady("State cannot be determined") - -proc getLightpushHealth( - hm: WakuNodeHealthMonitor, relayHealth: HealthStatus -): ProtocolHealth = - var p = ProtocolHealth.init("Lightpush") - if hm.node.get().wakuLightPush == nil: - return p.notMounted() - - if relayHealth == HealthStatus.READY: - return p.ready() - - return p.notReady("Node has no relay peers to fullfill push requests") - -proc getLightpushClientHealth( - hm: WakuNodeHealthMonitor, relayHealth: HealthStatus -): ProtocolHealth = - var p = ProtocolHealth.init("Lightpush Client") - if hm.node.get().wakuLightpushClient == nil: - return p.notMounted() - - let selfServiceAvailable = - hm.node.get().wakuLightPush != nil and relayHealth == HealthStatus.READY - let servicePeerAvailable = - hm.node.get().peerManager.selectPeer(WakuLightPushCodec).isSome() - - if selfServiceAvailable or servicePeerAvailable: - return p.ready() - - return p.notReady("No Lightpush service peer available yet") - -proc getLegacyLightpushHealth( - hm: WakuNodeHealthMonitor, relayHealth: HealthStatus -): ProtocolHealth = - var p = ProtocolHealth.init("Legacy Lightpush") - if hm.node.get().wakuLegacyLightPush == nil: - return p.notMounted() - - if relayHealth == HealthStatus.READY: - return p.ready() - - return p.notReady("Node has no relay peers to fullfill push requests") - -proc getLegacyLightpushClientHealth( - hm: WakuNodeHealthMonitor, relayHealth: HealthStatus -): ProtocolHealth = - var p = ProtocolHealth.init("Legacy Lightpush Client") - if hm.node.get().wakuLegacyLightpushClient == nil: - return p.notMounted() - - if (hm.node.get().wakuLegacyLightPush != nil and relayHealth == HealthStatus.READY) or - hm.node.get().peerManager.selectPeer(WakuLegacyLightPushCodec).isSome(): - return p.ready() - - return p.notReady("No Lightpush service peer available yet") - -proc getFilterHealth( - hm: WakuNodeHealthMonitor, relayHealth: HealthStatus -): ProtocolHealth = - var p = ProtocolHealth.init("Filter") - if hm.node.get().wakuFilter == nil: - return p.notMounted() - - if relayHealth == HealthStatus.READY: - return p.ready() - - return p.notReady("Relay is not ready, filter will not be able to sort out messages") - -proc getFilterClientHealth( - hm: WakuNodeHealthMonitor, relayHealth: HealthStatus -): ProtocolHealth = - var p = ProtocolHealth.init("Filter Client") - if hm.node.get().wakuFilterClient == nil: - return p.notMounted() - - if hm.node.get().peerManager.selectPeer(WakuFilterSubscribeCodec).isSome(): - return p.ready() - - return p.notReady("No Filter service peer available yet") - -proc getStoreHealth(hm: WakuNodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Store") - if hm.node.get().wakuStore == nil: - return p.notMounted() - - return p.ready() - -proc getStoreClientHealth(hm: WakuNodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Store Client") - if hm.node.get().wakuStoreClient == nil: - return p.notMounted() - - if hm.node.get().peerManager.selectPeer(WakuStoreCodec).isSome() or - hm.node.get().wakuStore != nil: - return p.ready() - - return p.notReady( - "No Store service peer available yet, neither Store service set up for the node" - ) - -proc getLegacyStoreHealth(hm: WakuNodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Legacy Store") - if hm.node.get().wakuLegacyStore == nil: - return p.notMounted() - - return p.ready() - -proc getLegacyStoreClientHealth(hm: WakuNodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Legacy Store Client") - if hm.node.get().wakuLegacyStoreClient == nil: - return p.notMounted() - - if hm.node.get().peerManager.selectPeer(WakuLegacyStoreCodec).isSome() or - hm.node.get().wakuLegacyStore != nil: - return p.ready() - - return p.notReady( - "No Legacy Store service peers are available yet, neither Store service set up for the node" - ) - -proc getPeerExchangeHealth(hm: WakuNodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Peer Exchange") - if hm.node.get().wakuPeerExchange == nil: - return p.notMounted() - - return p.ready() - -proc getRendezvousHealth(hm: WakuNodeHealthMonitor): ProtocolHealth = - var p = ProtocolHealth.init("Rendezvous") - if hm.node.get().wakuRendezvous == nil: - return p.notMounted() - - if hm.node.get().peerManager.switch.peerStore.peers(RendezVousCodec).len() == 0: - return p.notReady("No Rendezvous peers are available yet") - - return p.ready() - -proc getNodeHealthReport*(hm: WakuNodeHealthMonitor): Future[HealthReport] {.async.} = - var report: HealthReport - report.nodeHealth = hm.nodeHealth - - if hm.node.isSome(): - let relayHealth = hm.getRelayHealth() - report.protocolsHealth.add(relayHealth) - report.protocolsHealth.add(await hm.getRlnRelayHealth()) - report.protocolsHealth.add(hm.getLightpushHealth(relayHealth.health)) - report.protocolsHealth.add(hm.getLegacyLightpushHealth(relayHealth.health)) - report.protocolsHealth.add(hm.getFilterHealth(relayHealth.health)) - report.protocolsHealth.add(hm.getStoreHealth()) - report.protocolsHealth.add(hm.getLegacyStoreHealth()) - report.protocolsHealth.add(hm.getPeerExchangeHealth()) - report.protocolsHealth.add(hm.getRendezvousHealth()) - - report.protocolsHealth.add(hm.getLightpushClientHealth(relayHealth.health)) - report.protocolsHealth.add(hm.getLegacyLightpushClientHealth(relayHealth.health)) - report.protocolsHealth.add(hm.getStoreClientHealth()) - report.protocolsHealth.add(hm.getLegacyStoreClientHealth()) - report.protocolsHealth.add(hm.getFilterClientHealth(relayHealth.health)) - return report - -proc setNode*(hm: WakuNodeHealthMonitor, node: WakuNode) = - hm.node = some(node) - -proc setOverallHealth*(hm: WakuNodeHealthMonitor, health: HealthStatus) = - hm.nodeHealth = health +export node_health_monitor, protocol_health, online_monitor, health_status diff --git a/waku/node/health_monitor/health_status.nim b/waku/node/health_monitor/health_status.nim new file mode 100644 index 000000000..4dd2bdd9a --- /dev/null +++ b/waku/node/health_monitor/health_status.nim @@ -0,0 +1,16 @@ +import results, std/strutils + +type HealthStatus* {.pure.} = enum + INITIALIZING + SYNCHRONIZING + READY + NOT_READY + NOT_MOUNTED + SHUTTING_DOWN + +proc init*(t: typedesc[HealthStatus], strRep: string): Result[HealthStatus, string] = + try: + let status = parseEnum[HealthStatus](strRep) + return ok(status) + except ValueError: + return err("Invalid HealthStatus string representation: " & strRep) diff --git a/waku/node/health_monitor/node_health_monitor.nim b/waku/node/health_monitor/node_health_monitor.nim new file mode 100644 index 000000000..b13925d66 --- /dev/null +++ b/waku/node/health_monitor/node_health_monitor.nim @@ -0,0 +1,270 @@ +{.push raises: [].} + +import std/[options, sets, strformat], chronos, chronicles, libp2p/protocols/rendezvous + +import + ../waku_node, + ../../waku_rln_relay, + ../../waku_relay, + ../peer_manager, + ./online_monitor, + ./health_status, + ./protocol_health + +## This module is aimed to check the state of the "self" Waku Node + +type + HealthReport* = object + nodeHealth*: HealthStatus + protocolsHealth*: seq[ProtocolHealth] + + NodeHealthMonitor* = ref object + nodeHealth: HealthStatus + node: WakuNode + onlineMonitor*: OnlineMonitor + +template checkWakuNodeNotNil(node: WakuNode, p: ProtocolHealth): untyped = + if node.isNil(): + warn "WakuNode is not set, cannot check health", protocol_health_instance = $p + return p.notMounted() + +proc getRelayHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init("Relay") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuRelay == nil: + return p.notMounted() + + let relayPeers = hm.node.wakuRelay.getConnectedPubSubPeers(pubsubTopic = "").valueOr: + return p.notMounted() + + if relayPeers.len() == 0: + return p.notReady("No connected peers") + + return p.ready() + +proc getRlnRelayHealth(hm: NodeHealthMonitor): Future[ProtocolHealth] {.async.} = + var p = ProtocolHealth.init("Rln Relay") + if hm.node.isNil(): + warn "WakuNode is not set, cannot check health", protocol_health_instance = $p + return p.notMounted() + + if hm.node.wakuRlnRelay.isNil(): + return p.notMounted() + + const FutIsReadyTimout = 5.seconds + + let isReadyStateFut = hm.node.wakuRlnRelay.isReady() + if not await isReadyStateFut.withTimeout(FutIsReadyTimout): + return p.notReady("Ready state check timed out") + + try: + if not isReadyStateFut.completed(): + return p.notReady("Ready state check timed out") + elif isReadyStateFut.read(): + return p.ready() + + return p.synchronizing() + except: + error "exception reading state: " & getCurrentExceptionMsg() + return p.notReady("State cannot be determined") + +proc getLightpushHealth( + hm: NodeHealthMonitor, relayHealth: HealthStatus +): ProtocolHealth = + var p = ProtocolHealth.init("Lightpush") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuLightPush == nil: + return p.notMounted() + + if relayHealth == HealthStatus.READY: + return p.ready() + + return p.notReady("Node has no relay peers to fullfill push requests") + +proc getLightpushClientHealth( + hm: NodeHealthMonitor, relayHealth: HealthStatus +): ProtocolHealth = + var p = ProtocolHealth.init("Lightpush Client") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuLightpushClient == nil: + return p.notMounted() + + let selfServiceAvailable = + hm.node.wakuLightPush != nil and relayHealth == HealthStatus.READY + let servicePeerAvailable = hm.node.peerManager.selectPeer(WakuLightPushCodec).isSome() + + if selfServiceAvailable or servicePeerAvailable: + return p.ready() + + return p.notReady("No Lightpush service peer available yet") + +proc getLegacyLightpushHealth( + hm: NodeHealthMonitor, relayHealth: HealthStatus +): ProtocolHealth = + var p = ProtocolHealth.init("Legacy Lightpush") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuLegacyLightPush == nil: + return p.notMounted() + + if relayHealth == HealthStatus.READY: + return p.ready() + + return p.notReady("Node has no relay peers to fullfill push requests") + +proc getLegacyLightpushClientHealth( + hm: NodeHealthMonitor, relayHealth: HealthStatus +): ProtocolHealth = + var p = ProtocolHealth.init("Legacy Lightpush Client") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuLegacyLightpushClient == nil: + return p.notMounted() + + if (hm.node.wakuLegacyLightPush != nil and relayHealth == HealthStatus.READY) or + hm.node.peerManager.selectPeer(WakuLegacyLightPushCodec).isSome(): + return p.ready() + + return p.notReady("No Lightpush service peer available yet") + +proc getFilterHealth(hm: NodeHealthMonitor, relayHealth: HealthStatus): ProtocolHealth = + var p = ProtocolHealth.init("Filter") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuFilter == nil: + return p.notMounted() + + if relayHealth == HealthStatus.READY: + return p.ready() + + return p.notReady("Relay is not ready, filter will not be able to sort out messages") + +proc getFilterClientHealth( + hm: NodeHealthMonitor, relayHealth: HealthStatus +): ProtocolHealth = + var p = ProtocolHealth.init("Filter Client") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuFilterClient == nil: + return p.notMounted() + + if hm.node.peerManager.selectPeer(WakuFilterSubscribeCodec).isSome(): + return p.ready() + + return p.notReady("No Filter service peer available yet") + +proc getStoreHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init("Store") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuStore == nil: + return p.notMounted() + + return p.ready() + +proc getStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init("Store Client") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuStoreClient == nil: + return p.notMounted() + + if hm.node.peerManager.selectPeer(WakuStoreCodec).isSome() or hm.node.wakuStore != nil: + return p.ready() + + return p.notReady( + "No Store service peer available yet, neither Store service set up for the node" + ) + +proc getLegacyStoreHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init("Legacy Store") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuLegacyStore == nil: + return p.notMounted() + + return p.ready() + +proc getLegacyStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init("Legacy Store Client") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuLegacyStoreClient == nil: + return p.notMounted() + + if hm.node.peerManager.selectPeer(WakuLegacyStoreCodec).isSome() or + hm.node.wakuLegacyStore != nil: + return p.ready() + + return p.notReady( + "No Legacy Store service peers are available yet, neither Store service set up for the node" + ) + +proc getPeerExchangeHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init("Peer Exchange") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuPeerExchange == nil: + return p.notMounted() + + return p.ready() + +proc getRendezvousHealth(hm: NodeHealthMonitor): ProtocolHealth = + var p = ProtocolHealth.init("Rendezvous") + checkWakuNodeNotNil(hm.node, p) + + if hm.node.wakuRendezvous == nil: + return p.notMounted() + + if hm.node.peerManager.switch.peerStore.peers(RendezVousCodec).len() == 0: + return p.notReady("No Rendezvous peers are available yet") + + return p.ready() + +proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} = + var report: HealthReport + report.nodeHealth = hm.nodeHealth + + if not hm.node.isNil(): + let relayHealth = hm.getRelayHealth() + report.protocolsHealth.add(relayHealth) + report.protocolsHealth.add(await hm.getRlnRelayHealth()) + report.protocolsHealth.add(hm.getLightpushHealth(relayHealth.health)) + report.protocolsHealth.add(hm.getLegacyLightpushHealth(relayHealth.health)) + report.protocolsHealth.add(hm.getFilterHealth(relayHealth.health)) + report.protocolsHealth.add(hm.getStoreHealth()) + report.protocolsHealth.add(hm.getLegacyStoreHealth()) + report.protocolsHealth.add(hm.getPeerExchangeHealth()) + report.protocolsHealth.add(hm.getRendezvousHealth()) + + report.protocolsHealth.add(hm.getLightpushClientHealth(relayHealth.health)) + report.protocolsHealth.add(hm.getLegacyLightpushClientHealth(relayHealth.health)) + report.protocolsHealth.add(hm.getStoreClientHealth()) + report.protocolsHealth.add(hm.getLegacyStoreClientHealth()) + report.protocolsHealth.add(hm.getFilterClientHealth(relayHealth.health)) + return report + +proc setNodeToHealthMonitor*(hm: NodeHealthMonitor, node: WakuNode) = + hm.node = node + +proc setOverallHealth*(hm: NodeHealthMonitor, health: HealthStatus) = + hm.nodeHealth = health + +proc startHealthMonitor*(hm: NodeHealthMonitor) = + hm.onlineMonitor.startOnlineMonitor() + +proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} = + await hm.onlineMonitor.stopOnlineMonitor() + +proc new*( + T: type NodeHealthMonitor, + dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")], +): T = + T( + nodeHealth: INITIALIZING, + node: nil, + onlineMonitor: OnlineMonitor.init(dnsNameServers), + ) diff --git a/waku/node/health_monitor/online_monitor.nim b/waku/node/health_monitor/online_monitor.nim new file mode 100644 index 000000000..f3a3013e2 --- /dev/null +++ b/waku/node/health_monitor/online_monitor.nim @@ -0,0 +1,77 @@ +import std/sequtils +import chronos, chronicles, libp2p/nameresolving/dnsresolver, libp2p/peerstore + +import ../peer_manager/waku_peer_store, waku/waku_core/peers + +type + OnOnlineStateChange* = proc(online: bool) {.gcsafe, raises: [].} + + OnlineMonitor* = ref object + onOnlineStateChange: OnOnlineStateChange + dnsNameServers*: seq[IpAddress] + onlineStateObservers: seq[OnOnlineStateChange] + networkConnLoopHandle: Future[void] # node: WakuNode + peerStore: PeerStore + online: bool + +proc checkInternetConnectivity( + nameServerIps: seq[IpAddress], timeout = 2.seconds +): Future[bool] {.async.} = + const DNSCheckDomain = "one.one.one.one" + let nameServers = nameServerIps.mapIt(initTAddress(it, Port(53))) + let dnsResolver = DnsResolver.new(nameServers) + + # Resolve domain IP + let resolved = await dnsResolver.resolveIp(DNSCheckDomain, 0.Port, Domain.AF_UNSPEC) + if resolved.len > 0: + return true + else: + return false + +proc updateOnlineState(self: OnlineMonitor) {.async.} = + if self.onlineStateObservers.len == 0: + trace "No online state observers registered, cannot notify about online state change" + return + + let numConnectedPeers = + if self.peerStore.isNil(): + 0 + else: + self.peerStore.peers().countIt(it.connectedness == Connected) + + self.online = + if numConnectedPeers > 0: + true + else: + await checkInternetConnectivity(self.dnsNameServers) + + for onlineStateObserver in self.onlineStateObservers: + onlineStateObserver(self.online) + +proc networkConnectivityLoop(self: OnlineMonitor): Future[void] {.async.} = + ## Checks periodically whether the node is online or not + ## and triggers any change that depends on the network connectivity state + while true: + await self.updateOnlineState() + await sleepAsync(15.seconds) + +proc startOnlineMonitor*(self: OnlineMonitor) = + self.networkConnLoopHandle = self.networkConnectivityLoop() + +proc stopOnlineMonitor*(self: OnlineMonitor) {.async.} = + if not self.networkConnLoopHandle.isNil(): + await self.networkConnLoopHandle.cancelAndWait() + +proc setPeerStoreToOnlineMonitor*(self: OnlineMonitor, peerStore: PeerStore) = + self.peerStore = peerStore + +proc addOnlineStateObserver*(self: OnlineMonitor, observer: OnOnlineStateChange) = + ## Adds an observer that will be called when the online state changes + if observer notin self.onlineStateObservers: + self.onlineStateObservers.add(observer) + +proc amIOnline*(self: OnlineMonitor): bool = + return self.online + +proc init*(T: type OnlineMonitor, dnsNameServers: seq[IpAddress]): OnlineMonitor = + T(dnsNameServers: dnsNameServers, onlineStateObservers: @[]) diff --git a/waku/node/health_monitor/protocol_health.nim b/waku/node/health_monitor/protocol_health.nim new file mode 100644 index 000000000..7bacea94b --- /dev/null +++ b/waku/node/health_monitor/protocol_health.nim @@ -0,0 +1,46 @@ +import std/[options, strformat] +import ./health_status + +type ProtocolHealth* = object + protocol*: string + health*: HealthStatus + desc*: Option[string] ## describes why a certain protocol is considered `NOT_READY` + +proc notReady*(p: var ProtocolHealth, desc: string): ProtocolHealth = + p.health = HealthStatus.NOT_READY + p.desc = some(desc) + return p + +proc ready*(p: var ProtocolHealth): ProtocolHealth = + p.health = HealthStatus.READY + p.desc = none[string]() + return p + +proc notMounted*(p: var ProtocolHealth): ProtocolHealth = + p.health = HealthStatus.NOT_MOUNTED + p.desc = none[string]() + return p + +proc synchronizing*(p: var ProtocolHealth): ProtocolHealth = + p.health = HealthStatus.SYNCHRONIZING + p.desc = none[string]() + return p + +proc initializing*(p: var ProtocolHealth): ProtocolHealth = + p.health = HealthStatus.INITIALIZING + p.desc = none[string]() + return p + +proc shuttingDown*(p: var ProtocolHealth): ProtocolHealth = + p.health = HealthStatus.SHUTTING_DOWN + p.desc = none[string]() + return p + +proc `$`*(p: ProtocolHealth): string = + return fmt"protocol: {p.protocol}, health: {p.health}, description: {p.desc}" + +proc init*(p: typedesc[ProtocolHealth], protocol: string): ProtocolHealth = + let p = ProtocolHealth( + protocol: protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]() + ) + return p diff --git a/waku/node/peer_manager/peer_manager.nim b/waku/node/peer_manager/peer_manager.nim index 707738e5f..0a19d5b2c 100644 --- a/waku/node/peer_manager/peer_manager.nim +++ b/waku/node/peer_manager/peer_manager.nim @@ -8,7 +8,6 @@ import libp2p/multistream, libp2p/muxers/muxer, libp2p/nameresolving/nameresolver, - libp2p/nameresolving/dnsresolver, libp2p/peerstore import @@ -21,6 +20,7 @@ import ../../waku_enr/sharding, ../../waku_enr/capabilities, ../../waku_metadata, + ../health_monitor/online_monitor, ./peer_store/peer_storage, ./waku_peer_store @@ -74,8 +74,6 @@ const # Max peers that we allow from the same IP DefaultColocationLimit* = 5 - DNSCheckDomain = "one.one.one.one" - type ConnectionChangeHandler* = proc( peerId: PeerId, peerEvent: PeerEventKind ): Future[void] {.gcsafe, raises: [Defect].} @@ -98,16 +96,12 @@ type PeerManager* = ref object of RootObj started: bool shardedPeerManagement: bool # temp feature flag onConnectionChange*: ConnectionChangeHandler - dnsNameServers*: seq[IpAddress] - online: bool + online: bool ## state managed by online_monitor module #~~~~~~~~~~~~~~~~~~~# # Helper Functions # #~~~~~~~~~~~~~~~~~~~# -template isOnline*(self: PeerManager): bool = - self.online - proc calculateBackoff( initialBackoffInSec: int, backoffFactor: int, failedAttempts: int ): timer.Duration = @@ -543,35 +537,9 @@ proc getStreamByPeerIdAndProtocol*( return ok(streamRes.get()) -proc checkInternetConnectivity( - nameServerIps: seq[IpAddress], timeout = 2.seconds -): Future[bool] {.async.} = - var nameServers: seq[TransportAddress] - for ip in nameServerIps: - nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53 - - let dnsResolver = DnsResolver.new(nameServers) - - # Resolve domain IP - let resolved = await dnsResolver.resolveIp(DNSCheckDomain, 0.Port, Domain.AF_UNSPEC) - - if resolved.len > 0: - return true - else: - return false - -proc updateOnlineState*(pm: PeerManager) {.async.} = - let numConnectedPeers = - pm.switch.peerStore.peers().countIt(it.connectedness == Connected) - - if numConnectedPeers > 0: - pm.online = true - else: - pm.online = await checkInternetConnectivity(pm.dnsNameServers) - proc connectToRelayPeers*(pm: PeerManager) {.async.} = # only attempt if current node is online - if not pm.isOnline(): + if not pm.online: error "connectToRelayPeers: won't attempt new connections - node is offline" return @@ -739,6 +707,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip asyncSpawn(pm.switch.disconnect(peerId)) peerStore.delete(peerId) + if not pm.onConnectionChange.isNil(): # we don't want to await for the callback to finish asyncSpawn pm.onConnectionChange(peerId, Joined) @@ -753,6 +722,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} = if pm.ipTable[ip].len == 0: pm.ipTable.del(ip) break + if not pm.onConnectionChange.isNil(): # we don't want to await for the callback to finish asyncSpawn pm.onConnectionChange(peerId, Left) @@ -809,6 +779,10 @@ proc logAndMetrics(pm: PeerManager) {.async.} = protoStreamsOut.float64, labelValues = [$Direction.Out, proto] ) +proc getOnlineStateObserver*(pm: PeerManager): OnOnlineStateChange = + return proc(online: bool) {.gcsafe, raises: [].} = + pm.online = online + #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# # Pruning and Maintenance (Stale Peers Management) # #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~# @@ -817,7 +791,7 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} = if pm.wakuMetadata.shards.len == 0: return - if not pm.isOnline(): + if not pm.online: error "manageRelayPeers: won't attempt new connections - node is offline" return @@ -1048,7 +1022,6 @@ proc new*( maxFailedAttempts = MaxFailedAttempts, colocationLimit = DefaultColocationLimit, shardedPeerManagement = false, - dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")], ): PeerManager {.gcsafe.} = let capacity = switch.peerStore.capacity let maxConnections = switch.connManager.inSema.size @@ -1099,7 +1072,6 @@ proc new*( maxFailedAttempts: maxFailedAttempts, colocationLimit: colocationLimit, shardedPeerManagement: shardedPeerManagement, - dnsNameServers: dnsNameServers, online: true, ) diff --git a/waku/waku_api/rest/builder.nim b/waku/waku_api/rest/builder.nim index 1b5d9af70..6725aaeec 100644 --- a/waku/waku_api/rest/builder.nim +++ b/waku/waku_api/rest/builder.nim @@ -40,7 +40,7 @@ type RestServerConf* = object relayCacheCapacity*: uint32 proc startRestServerEssentials*( - nodeHealthMonitor: WakuNodeHealthMonitor, conf: RestServerConf, portsShift: uint16 + nodeHealthMonitor: NodeHealthMonitor, conf: RestServerConf, portsShift: uint16 ): Result[WakuRestServerRef, string] = let requestErrorHandler: RestRequestErrorHandler = proc( error: RestRequestError, request: HttpRequestRef diff --git a/waku/waku_api/rest/health/handlers.nim b/waku/waku_api/rest/health/handlers.nim index 48dad9276..aa6b1e925 100644 --- a/waku/waku_api/rest/health/handlers.nim +++ b/waku/waku_api/rest/health/handlers.nim @@ -11,7 +11,7 @@ const ROUTE_HEALTH* = "/health" const FutHealthReportTimeout = 5.seconds proc installHealthApiHandler*( - router: var RestRouter, nodeHealthMonitor: WakuNodeHealthMonitor + router: var RestRouter, nodeHealthMonitor: NodeHealthMonitor ) = router.api(MethodGet, ROUTE_HEALTH) do() -> RestApiResponse: let healthReportFut = nodeHealthMonitor.getNodeHealthReport() diff --git a/waku/waku_api/rest/health/types.nim b/waku/waku_api/rest/health/types.nim index e457ebea5..57f8b284c 100644 --- a/waku/waku_api/rest/health/types.nim +++ b/waku/waku_api/rest/health/types.nim @@ -1,5 +1,6 @@ {.push raises: [].} +import results import chronicles, json_serialization, json_serialization/std/options import ../../../waku_node, ../serdes @@ -31,13 +32,10 @@ proc readValue*( ) let fieldValue = reader.readValue(string) - try: - health = some(HealthStatus.init(fieldValue)) - protocol = some(fieldName) - except ValueError: - reader.raiseUnexpectedValue( - "Invalid `health` value: " & getCurrentExceptionMsg() - ) + let h = HealthStatus.init(fieldValue).valueOr: + reader.raiseUnexpectedValue("Invalid `health` value: " & $error) + health = some(h) + protocol = some(fieldName) value = ProtocolHealth(protocol: protocol.get(), health: health.get(), desc: desc) @@ -63,10 +61,11 @@ proc readValue*( reader.raiseUnexpectedField( "Multiple `nodeHealth` fields found", "HealthReport" ) - try: - nodeHealth = some(HealthStatus.init(reader.readValue(string))) - except ValueError: - reader.raiseUnexpectedValue("Invalid `health` value") + + let health = HealthStatus.init(reader.readValue(string)).valueOr: + reader.raiseUnexpectedValue("Invalid `health` value: " & $error) + + nodeHealth = some(health) of "protocolsHealth": if protocolsHealth.isSome(): reader.raiseUnexpectedField(