mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-02 14:03:06 +00:00
chore: refactor to unify online and health monitors (#3456)
This commit is contained in:
parent
3f3c594885
commit
2e40f2971f
@ -14,13 +14,11 @@ import
|
||||
waku/[
|
||||
common/enr,
|
||||
common/logging,
|
||||
factory/waku,
|
||||
factory/waku as waku_factory,
|
||||
factory/external_config,
|
||||
waku_node,
|
||||
node/health_monitor,
|
||||
node/waku_metrics,
|
||||
node/peer_manager,
|
||||
waku_api/rest/builder as rest_server_builder,
|
||||
waku_lightpush/common,
|
||||
waku_filter_v2,
|
||||
waku_peer_exchange/protocol,
|
||||
@ -49,7 +47,7 @@ when isMainModule:
|
||||
## 5. Start monitoring tools and external interfaces
|
||||
## 6. Setup graceful shutdown hooks
|
||||
|
||||
const versionString = "version / git commit hash: " & waku.git_version
|
||||
const versionString = "version / git commit hash: " & waku_factory.git_version
|
||||
|
||||
let confRes = LiteProtocolTesterConf.load(version = versionString)
|
||||
if confRes.isErr():
|
||||
@ -61,7 +59,7 @@ when isMainModule:
|
||||
## Logging setup
|
||||
logging.setupLog(conf.logLevel, conf.logFormat)
|
||||
|
||||
info "Running Lite Protocol Tester node", version = waku.git_version
|
||||
info "Running Lite Protocol Tester node", version = waku_factory.git_version
|
||||
logConfig(conf)
|
||||
|
||||
##Prepare Waku configuration
|
||||
@ -69,13 +67,13 @@ when isMainModule:
|
||||
## - override according to tester functionality
|
||||
##
|
||||
|
||||
var wConf: WakuNodeConf
|
||||
var wakuNodeConf: WakuNodeConf
|
||||
|
||||
if conf.configFile.isSome():
|
||||
try:
|
||||
var configFile {.threadvar.}: InputFile
|
||||
configFile = conf.configFile.get()
|
||||
wConf = WakuNodeConf.load(
|
||||
wakuNodeConf = WakuNodeConf.load(
|
||||
version = versionString,
|
||||
printUsage = false,
|
||||
secondarySources = proc(
|
||||
@ -88,101 +86,54 @@ when isMainModule:
|
||||
error "Loading Waku configuration failed", error = getCurrentExceptionMsg()
|
||||
quit(QuitFailure)
|
||||
|
||||
wConf.logLevel = conf.logLevel
|
||||
wConf.logFormat = conf.logFormat
|
||||
wConf.nat = conf.nat
|
||||
wConf.maxConnections = 500
|
||||
wConf.restAddress = conf.restAddress
|
||||
wConf.restPort = conf.restPort
|
||||
wConf.restAllowOrigin = conf.restAllowOrigin
|
||||
wakuNodeConf.logLevel = conf.logLevel
|
||||
wakuNodeConf.logFormat = conf.logFormat
|
||||
wakuNodeConf.nat = conf.nat
|
||||
wakuNodeConf.maxConnections = 500
|
||||
wakuNodeConf.restAddress = conf.restAddress
|
||||
wakuNodeConf.restPort = conf.restPort
|
||||
wakuNodeConf.restAllowOrigin = conf.restAllowOrigin
|
||||
|
||||
wConf.dnsAddrsNameServers = @[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")]
|
||||
wakuNodeConf.dnsAddrsNameServers =
|
||||
@[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")]
|
||||
|
||||
wConf.shards = @[conf.shard]
|
||||
wConf.contentTopics = conf.contentTopics
|
||||
wConf.clusterId = conf.clusterId
|
||||
wakuNodeConf.shards = @[conf.shard]
|
||||
wakuNodeConf.contentTopics = conf.contentTopics
|
||||
wakuNodeConf.clusterId = conf.clusterId
|
||||
## TODO: Depending on the tester needs we might extend here with shards, clusterId, etc...
|
||||
|
||||
wConf.metricsServer = true
|
||||
wConf.metricsServerAddress = parseIpAddress("0.0.0.0")
|
||||
wConf.metricsServerPort = conf.metricsPort
|
||||
wakuNodeConf.metricsServer = true
|
||||
wakuNodeConf.metricsServerAddress = parseIpAddress("0.0.0.0")
|
||||
wakuNodeConf.metricsServerPort = conf.metricsPort
|
||||
|
||||
# If bootstrap option is chosen we expect our clients will not mounted
|
||||
# so we will mount PeerExchange manually to gather possible service peers,
|
||||
# if got some we will mount the client protocols afterward.
|
||||
wConf.peerExchange = false
|
||||
wConf.relay = false
|
||||
wConf.filter = false
|
||||
wConf.lightpush = false
|
||||
wConf.store = false
|
||||
wakuNodeConf.peerExchange = false
|
||||
wakuNodeConf.relay = false
|
||||
wakuNodeConf.filter = false
|
||||
wakuNodeConf.lightpush = false
|
||||
wakuNodeConf.store = false
|
||||
|
||||
wConf.rest = false
|
||||
wConf.relayServiceRatio = "40:60"
|
||||
wakuNodeConf.rest = false
|
||||
wakuNodeConf.relayServiceRatio = "40:60"
|
||||
|
||||
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
|
||||
# It will always be called from main thread anyway.
|
||||
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
|
||||
var nodeHealthMonitor {.threadvar.}: WakuNodeHealthMonitor
|
||||
nodeHealthMonitor = WakuNodeHealthMonitor()
|
||||
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
|
||||
|
||||
let wakuConf = wConf.toWakuConf().valueOr:
|
||||
error "Waku configuration failed", error = error
|
||||
let wakuConf = wakuNodeConf.toWakuConf().valueOr:
|
||||
error "Issue converting toWakuConf", error = $error
|
||||
quit(QuitFailure)
|
||||
|
||||
let restServer: WakuRestServerRef =
|
||||
if wakuConf.restServerConf.isSome():
|
||||
rest_server_builder.startRestServerEssentials(
|
||||
nodeHealthMonitor, wakuConf.restServerConf.get(), wakuConf.portsShift
|
||||
).valueOr:
|
||||
error "Starting essential REST server failed.", error = $error
|
||||
quit(QuitFailure)
|
||||
else:
|
||||
nil
|
||||
|
||||
var wakuApp = Waku.new(wakuConf).valueOr:
|
||||
var waku = Waku.new(wakuConf).valueOr:
|
||||
error "Waku initialization failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
wakuApp.restServer = restServer
|
||||
|
||||
nodeHealthMonitor.setNode(wakuApp.node)
|
||||
|
||||
(waitFor startWaku(addr wakuApp)).isOkOr:
|
||||
(waitFor startWaku(addr waku)).isOkOr:
|
||||
error "Starting waku failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
if wakuConf.restServerConf.isSome():
|
||||
rest_server_builder.startRestServerProtocolSupport(
|
||||
restServer,
|
||||
wakuApp.node,
|
||||
wakuApp.wakuDiscv5,
|
||||
wakuConf.restServerConf.get(),
|
||||
wakuConf.relay,
|
||||
wakuConf.lightPush,
|
||||
wakuConf.clusterId,
|
||||
wakuConf.shards,
|
||||
wakuConf.contentTopics,
|
||||
).isOkOr:
|
||||
error "Starting protocols support REST server failed.", error = $error
|
||||
quit(QuitFailure)
|
||||
|
||||
if wakuConf.metricsServerConf.isSome():
|
||||
wakuApp.metricsServer = waku_metrics.startMetricsServerAndLogging(
|
||||
wakuConf.metricsServerConf.get(), wakuConf.portsShift
|
||||
).valueOr:
|
||||
error "Starting monitoring and external interfaces failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
nodeHealthMonitor.setOverallHealth(HealthStatus.READY)
|
||||
|
||||
debug "Setting up shutdown hooks"
|
||||
## Setup shutdown hooks for this process.
|
||||
## Stop node gracefully on shutdown.
|
||||
|
||||
proc asyncStopper(wakuApp: Waku) {.async: (raises: [Exception]).} =
|
||||
nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
|
||||
await wakuApp.stop()
|
||||
proc asyncStopper(waku: Waku) {.async: (raises: [Exception]).} =
|
||||
await waku.stop()
|
||||
quit(QuitSuccess)
|
||||
|
||||
# Handle Ctrl-C SIGINT
|
||||
@ -191,7 +142,7 @@ when isMainModule:
|
||||
# workaround for https://github.com/nim-lang/Nim/issues/4057
|
||||
setupForeignThreadGc()
|
||||
notice "Shutting down after receiving SIGINT"
|
||||
asyncSpawn asyncStopper(wakuApp)
|
||||
asyncSpawn asyncStopper(waku)
|
||||
|
||||
setControlCHook(handleCtrlC)
|
||||
|
||||
@ -199,7 +150,7 @@ when isMainModule:
|
||||
when defined(posix):
|
||||
proc handleSigterm(signal: cint) {.noconv.} =
|
||||
notice "Shutting down after receiving SIGTERM"
|
||||
asyncSpawn asyncStopper(wakuApp)
|
||||
asyncSpawn asyncStopper(waku)
|
||||
|
||||
c_signal(ansi_c.SIGTERM, handleSigterm)
|
||||
|
||||
@ -212,22 +163,26 @@ when isMainModule:
|
||||
# Not available in -d:release mode
|
||||
writeStackTrace()
|
||||
|
||||
waitFor wakuApp.stop()
|
||||
waitFor waku.stop()
|
||||
quit(QuitFailure)
|
||||
|
||||
c_signal(ansi_c.SIGSEGV, handleSigsegv)
|
||||
|
||||
info "Node setup complete"
|
||||
|
||||
let codec = conf.getCodec()
|
||||
var codec = WakuLightPushCodec
|
||||
# mounting relevant client, for PX filter client must be mounted ahead
|
||||
if conf.testFunc == TesterFunctionality.SENDER:
|
||||
codec = WakuLightPushCodec
|
||||
else:
|
||||
codec = WakuFilterSubscribeCodec
|
||||
|
||||
var lookForServiceNode = false
|
||||
var serviceNodePeerInfo: RemotePeerInfo
|
||||
if conf.serviceNode.len == 0:
|
||||
if conf.bootstrapNode.len > 0:
|
||||
info "Bootstrapping with PeerExchange to gather random service node"
|
||||
let futForServiceNode = pxLookupServiceNode(wakuApp.node, conf)
|
||||
let futForServiceNode = pxLookupServiceNode(waku.node, conf)
|
||||
if not (waitFor futForServiceNode.withTimeout(20.minutes)):
|
||||
error "Service node not found in time via PX"
|
||||
quit(QuitFailure)
|
||||
@ -237,7 +192,7 @@ when isMainModule:
|
||||
quit(QuitFailure)
|
||||
|
||||
serviceNodePeerInfo = selectRandomServicePeer(
|
||||
wakuApp.node.peerManager, none(RemotePeerInfo), codec
|
||||
waku.node.peerManager, none(RemotePeerInfo), codec
|
||||
).valueOr:
|
||||
error "Service node selection failed"
|
||||
quit(QuitFailure)
|
||||
@ -252,11 +207,11 @@ when isMainModule:
|
||||
|
||||
info "Service node to be used", serviceNode = $serviceNodePeerInfo
|
||||
|
||||
logSelfPeers(wakuApp.node.peerManager)
|
||||
logSelfPeers(waku.node.peerManager)
|
||||
|
||||
if conf.testFunc == TesterFunctionality.SENDER:
|
||||
setupAndPublish(wakuApp.node, conf, serviceNodePeerInfo)
|
||||
setupAndPublish(waku.node, conf, serviceNodePeerInfo)
|
||||
else:
|
||||
setupAndListen(wakuApp.node, conf, serviceNodePeerInfo)
|
||||
setupAndListen(waku.node, conf, serviceNodePeerInfo)
|
||||
|
||||
runForever()
|
||||
|
||||
@ -16,7 +16,6 @@ import
|
||||
factory/external_config,
|
||||
factory/waku,
|
||||
node/health_monitor,
|
||||
node/waku_metrics,
|
||||
waku_api/rest/builder as rest_server_builder,
|
||||
]
|
||||
|
||||
@ -53,69 +52,21 @@ when isMainModule:
|
||||
let conf = wakuNodeConf.toInspectRlnDbConf()
|
||||
doInspectRlnDb(conf)
|
||||
of noCommand:
|
||||
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
|
||||
# It will always be called from main thread anyway.
|
||||
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
|
||||
var nodeHealthMonitor {.threadvar.}: WakuNodeHealthMonitor
|
||||
nodeHealthMonitor = WakuNodeHealthMonitor()
|
||||
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
|
||||
|
||||
let conf = wakuNodeConf.toWakuConf().valueOr:
|
||||
error "Waku configuration failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
var restServer: WakuRestServerRef = nil
|
||||
|
||||
if conf.restServerConf.isSome():
|
||||
restServer = rest_server_builder.startRestServerEssentials(
|
||||
nodeHealthMonitor, conf.restServerConf.get(), conf.portsShift
|
||||
).valueOr:
|
||||
error "Starting essential REST server failed.", error = $error
|
||||
quit(QuitFailure)
|
||||
|
||||
var waku = Waku.new(conf).valueOr:
|
||||
error "Waku initialization failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
waku.restServer = restServer
|
||||
|
||||
nodeHealthMonitor.setNode(waku.node)
|
||||
|
||||
(waitFor startWaku(addr waku)).isOkOr:
|
||||
error "Starting waku failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
if conf.restServerConf.isSome():
|
||||
rest_server_builder.startRestServerProtocolSupport(
|
||||
restServer,
|
||||
waku.node,
|
||||
waku.wakuDiscv5,
|
||||
conf.restServerConf.get(),
|
||||
conf.relay,
|
||||
conf.lightPush,
|
||||
conf.clusterId,
|
||||
conf.shards,
|
||||
conf.contentTopics,
|
||||
).isOkOr:
|
||||
error "Starting protocols support REST server failed.", error = $error
|
||||
quit(QuitFailure)
|
||||
|
||||
if conf.metricsServerConf.isSome():
|
||||
waku.metricsServer = waku_metrics.startMetricsServerAndLogging(
|
||||
conf.metricsServerConf.get(), conf.portsShift
|
||||
).valueOr:
|
||||
error "Starting monitoring and external interfaces failed", error = error
|
||||
quit(QuitFailure)
|
||||
|
||||
nodeHealthMonitor.setOverallHealth(HealthStatus.READY)
|
||||
|
||||
debug "Setting up shutdown hooks"
|
||||
## Setup shutdown hooks for this process.
|
||||
## Stop node gracefully on shutdown.
|
||||
|
||||
proc asyncStopper(node: Waku) {.async: (raises: [Exception]).} =
|
||||
nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
|
||||
await node.stop()
|
||||
proc asyncStopper(waku: Waku) {.async: (raises: [Exception]).} =
|
||||
await waku.stop()
|
||||
quit(QuitSuccess)
|
||||
|
||||
# Handle Ctrl-C SIGINT
|
||||
|
||||
@ -878,8 +878,8 @@ proc waku_is_online(
|
||||
|
||||
handleRequest(
|
||||
ctx,
|
||||
RequestType.PEER_MANAGER,
|
||||
PeerManagementRequest.createShared(PeerManagementMsgType.IS_ONLINE),
|
||||
RequestType.DEBUG,
|
||||
DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_ONLINE_STATE),
|
||||
callback,
|
||||
userData,
|
||||
)
|
||||
|
||||
@ -7,13 +7,17 @@ import
|
||||
strutils,
|
||||
libp2p/peerid,
|
||||
metrics
|
||||
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node
|
||||
import
|
||||
../../../../waku/factory/waku,
|
||||
../../../../waku/node/waku_node,
|
||||
../../../../waku/node/health_monitor
|
||||
|
||||
type DebugNodeMsgType* = enum
|
||||
RETRIEVE_LISTENING_ADDRESSES
|
||||
RETRIEVE_MY_ENR
|
||||
RETRIEVE_MY_PEER_ID
|
||||
RETRIEVE_METRICS
|
||||
RETRIEVE_ONLINE_STATE
|
||||
|
||||
type DebugNodeRequest* = object
|
||||
operation: DebugNodeMsgType
|
||||
@ -49,6 +53,8 @@ proc process*(
|
||||
return ok($waku.node.peerId())
|
||||
of RETRIEVE_METRICS:
|
||||
return ok(getMetrics())
|
||||
of RETRIEVE_ONLINE_STATE:
|
||||
return ok($waku.healthMonitor.onlineMonitor.amIOnline())
|
||||
|
||||
error "unsupported operation in DebugNodeRequest"
|
||||
return err("unsupported operation in DebugNodeRequest")
|
||||
|
||||
@ -8,6 +8,7 @@ import
|
||||
../../../../waku/factory/node_factory,
|
||||
../../../../waku/factory/networks_config,
|
||||
../../../../waku/factory/app_callbacks,
|
||||
../../../../waku/waku_api/rest/builder,
|
||||
../../../alloc
|
||||
|
||||
type NodeLifecycleMsgType* = enum
|
||||
@ -73,9 +74,11 @@ proc createWaku(
|
||||
appCallbacks.topicHealthChangeHandler = nil
|
||||
|
||||
# TODO: Convert `confJson` directly to `WakuConf`
|
||||
let wakuConf = conf.toWakuConf().valueOr:
|
||||
var wakuConf = conf.toWakuConf().valueOr:
|
||||
return err("Configuration error: " & $error)
|
||||
|
||||
wakuConf.restServerConf = none(RestServerConf) ## don't want REST in libwaku
|
||||
|
||||
let wakuRes = Waku.new(wakuConf, appCallbacks).valueOr:
|
||||
error "waku initialization failed", error = error
|
||||
return err("Failed setting up Waku: " & $error)
|
||||
|
||||
@ -16,7 +16,6 @@ type PeerManagementMsgType* {.pure.} = enum
|
||||
DIAL_PEER
|
||||
DIAL_PEER_BY_ID
|
||||
GET_CONNECTED_PEERS
|
||||
IS_ONLINE
|
||||
|
||||
type PeerManagementRequest* = object
|
||||
operation: PeerManagementMsgType
|
||||
@ -156,7 +155,5 @@ proc process*(
|
||||
(inPeerIds, outPeerIds) = waku.node.peerManager.connectedPeers()
|
||||
connectedPeerids = concat(inPeerIds, outPeerIds)
|
||||
return ok(connectedPeerids.mapIt($it).join(","))
|
||||
of IS_ONLINE:
|
||||
return ok($waku.node.peerManager.isOnline())
|
||||
|
||||
return ok("")
|
||||
|
||||
@ -39,7 +39,7 @@ suite "Waku v2 REST API - health":
|
||||
asyncTest "Get node health info - GET /health":
|
||||
# Given
|
||||
let node = testWakuNode()
|
||||
let healthMonitor = WakuNodeHealthMonitor()
|
||||
let healthMonitor = NodeHealthMonitor()
|
||||
await node.start()
|
||||
(await node.mountRelay()).isOkOr:
|
||||
assert false, "Failed to mount relay"
|
||||
@ -78,7 +78,7 @@ suite "Waku v2 REST API - health":
|
||||
node.mountLightPushClient()
|
||||
await node.mountFilterClient()
|
||||
|
||||
healthMonitor.setNode(node)
|
||||
healthMonitor.setNodeToHealthMonitor(node)
|
||||
healthMonitor.setOverallHealth(HealthStatus.READY)
|
||||
# When
|
||||
response = await client.healthCheck()
|
||||
|
||||
@ -209,7 +209,6 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
|
||||
maxServicePeers = some(builder.maxServicePeers),
|
||||
colocationLimit = builder.colocationLimit,
|
||||
shardedPeerManagement = builder.shardAware,
|
||||
dnsNameServers = netConfig.dnsNameServers,
|
||||
)
|
||||
|
||||
var node: WakuNode
|
||||
|
||||
@ -426,7 +426,7 @@ proc startNode*(
|
||||
## Connect to static nodes and start
|
||||
## keep-alive, if configured.
|
||||
|
||||
# Start Waku v2 node
|
||||
info "Running nwaku node", version = git_version
|
||||
try:
|
||||
await node.start()
|
||||
except CatchableError:
|
||||
|
||||
@ -26,9 +26,11 @@ import
|
||||
../waku_node,
|
||||
../node/peer_manager,
|
||||
../node/health_monitor,
|
||||
../node/waku_metrics,
|
||||
../node/delivery_monitor/delivery_monitor,
|
||||
../waku_api/message_cache,
|
||||
../waku_api/rest/server,
|
||||
../waku_api/rest/builder as rest_server_builder,
|
||||
../waku_archive,
|
||||
../waku_relay/protocol,
|
||||
../discovery/waku_dnsdisc,
|
||||
@ -66,6 +68,8 @@ type Waku* = ref object
|
||||
|
||||
node*: WakuNode
|
||||
|
||||
healthMonitor*: NodeHealthMonitor
|
||||
|
||||
deliveryMonitor: DeliveryMonitor
|
||||
|
||||
restServer*: WakuRestServerRef
|
||||
@ -159,19 +163,33 @@ proc new*(
|
||||
logging.setupLog(wakuConf.logLevel, wakuConf.logFormat)
|
||||
|
||||
?wakuConf.validate()
|
||||
|
||||
wakuConf.logConf()
|
||||
|
||||
info "Running nwaku node", version = git_version
|
||||
let healthMonitor = NodeHealthMonitor.new(wakuConf.dnsAddrsNameServers)
|
||||
|
||||
let restServer: WakuRestServerRef =
|
||||
if wakuConf.restServerConf.isSome():
|
||||
let restServer = startRestServerEssentials(
|
||||
healthMonitor, wakuConf.restServerConf.get(), wakuConf.portsShift
|
||||
).valueOr:
|
||||
error "Starting essential REST server failed", error = $error
|
||||
return err("Failed to start essential REST server in Waku.new: " & $error)
|
||||
|
||||
restServer
|
||||
else:
|
||||
nil
|
||||
|
||||
var relay = newCircuitRelay(wakuConf.circuitRelayClient)
|
||||
|
||||
let nodeRes = setupNode(wakuConf, rng, relay)
|
||||
if nodeRes.isErr():
|
||||
error "Failed setting up node", error = nodeRes.error
|
||||
return err("Failed setting up node: " & nodeRes.error)
|
||||
let node = setupNode(wakuConf, rng, relay).valueOr:
|
||||
error "Failed setting up node", error = $error
|
||||
return err("Failed setting up node: " & $error)
|
||||
|
||||
let node = nodeRes.get()
|
||||
healthMonitor.setNodeToHealthMonitor(node)
|
||||
healthMonitor.onlineMonitor.setPeerStoreToOnlineMonitor(node.switch.peerStore)
|
||||
healthMonitor.onlineMonitor.addOnlineStateObserver(
|
||||
node.peerManager.getOnlineStateObserver()
|
||||
)
|
||||
|
||||
node.setupAppCallbacks(wakuConf, appCallbacks).isOkOr:
|
||||
error "Failed setting up app callbacks", error = error
|
||||
@ -197,8 +215,10 @@ proc new*(
|
||||
rng: rng,
|
||||
key: wakuConf.nodeKey,
|
||||
node: node,
|
||||
healthMonitor: healthMonitor,
|
||||
deliveryMonitor: deliveryMonitor,
|
||||
appCallbacks: appCallbacks,
|
||||
restServer: restServer,
|
||||
)
|
||||
|
||||
waku.setupSwitchServices(wakuConf, relay, rng)
|
||||
@ -334,15 +354,6 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
|
||||
error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()
|
||||
return
|
||||
|
||||
# The network connectivity loop checks periodically whether the node is online or not
|
||||
# and triggers any change that depends on the network connectivity state
|
||||
proc startNetworkConnectivityLoop(waku: Waku): Future[void] {.async.} =
|
||||
while true:
|
||||
await sleepAsync(15.seconds)
|
||||
|
||||
# Update online state
|
||||
await waku.node.peerManager.updateOnlineState()
|
||||
|
||||
proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
debug "Retrieve dynamic bootstrap nodes"
|
||||
let conf = waku[].conf
|
||||
@ -369,7 +380,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
return err("Error in updateApp: " & $error)
|
||||
|
||||
## Discv5
|
||||
if conf.discv5Conf.isSome:
|
||||
if conf.discv5Conf.isSome():
|
||||
waku[].wakuDiscV5 = waku_discv5.setupDiscoveryV5(
|
||||
waku.node.enr,
|
||||
waku.node.peerManager,
|
||||
@ -389,23 +400,41 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
|
||||
if not waku[].deliveryMonitor.isNil():
|
||||
waku[].deliveryMonitor.startDeliveryMonitor()
|
||||
|
||||
# Start network connectivity check loop
|
||||
waku[].networkConnLoopHandle = waku[].startNetworkConnectivityLoop()
|
||||
## Health Monitor
|
||||
waku[].healthMonitor.startHealthMonitor()
|
||||
|
||||
if conf.restServerConf.isSome():
|
||||
rest_server_builder.startRestServerProtocolSupport(
|
||||
waku[].restServer,
|
||||
waku[].node,
|
||||
waku[].wakuDiscv5,
|
||||
conf.restServerConf.get(),
|
||||
conf.relay,
|
||||
conf.lightPush,
|
||||
conf.clusterId,
|
||||
conf.shards,
|
||||
conf.contentTopics,
|
||||
).isOkOr:
|
||||
return err ("Starting protocols support REST server failed: " & $error)
|
||||
|
||||
if conf.metricsServerConf.isSome():
|
||||
waku[].metricsServer = waku_metrics.startMetricsServerAndLogging(
|
||||
conf.metricsServerConf.get(), conf.portsShift
|
||||
).valueOr:
|
||||
return err("Starting monitoring and external interfaces failed: " & error)
|
||||
|
||||
waku[].healthMonitor.setOverallHealth(HealthStatus.READY)
|
||||
|
||||
return ok()
|
||||
|
||||
# Waku shutdown
|
||||
|
||||
proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} =
|
||||
if not waku.restServer.isNil():
|
||||
await waku.restServer.stop()
|
||||
## Waku shutdown
|
||||
|
||||
waku.healthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
|
||||
|
||||
if not waku.metricsServer.isNil():
|
||||
await waku.metricsServer.stop()
|
||||
|
||||
if not waku.networkConnLoopHandle.isNil():
|
||||
await waku.networkConnLoopHandle.cancelAndWait()
|
||||
|
||||
if not waku.wakuDiscv5.isNil():
|
||||
await waku.wakuDiscv5.stop()
|
||||
|
||||
@ -414,3 +443,9 @@ proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} =
|
||||
|
||||
if not waku.dnsRetryLoopHandle.isNil():
|
||||
await waku.dnsRetryLoopHandle.cancelAndWait()
|
||||
|
||||
if not waku.healthMonitor.isNil():
|
||||
await waku.healthMonitor.stopHealthMonitor()
|
||||
|
||||
if not waku.restServer.isNil():
|
||||
await waku.restServer.stop()
|
||||
|
||||
@ -1,293 +1,4 @@
|
||||
{.push raises: [].}
|
||||
import
|
||||
health_monitor/[node_health_monitor, protocol_health, online_monitor, health_status]
|
||||
|
||||
import std/[options, sets], chronos, libp2p/protocols/rendezvous
|
||||
|
||||
import waku_node, ../waku_rln_relay, ../waku_relay, ./peer_manager
|
||||
|
||||
type
|
||||
HealthStatus* = enum
|
||||
INITIALIZING
|
||||
SYNCHRONIZING
|
||||
READY
|
||||
NOT_READY
|
||||
NOT_MOUNTED
|
||||
SHUTTING_DOWN
|
||||
|
||||
ProtocolHealth* = object
|
||||
protocol*: string
|
||||
health*: HealthStatus
|
||||
desc*: Option[string] ## describes why a certain protocol is considered `NOT_READY`
|
||||
|
||||
HealthReport* = object
|
||||
nodeHealth*: HealthStatus
|
||||
protocolsHealth*: seq[ProtocolHealth]
|
||||
|
||||
WakuNodeHealthMonitor* = ref object
|
||||
nodeHealth: HealthStatus
|
||||
node: Option[WakuNode]
|
||||
|
||||
proc `$`*(t: HealthStatus): string =
|
||||
result =
|
||||
case t
|
||||
of INITIALIZING: "Initializing"
|
||||
of SYNCHRONIZING: "Synchronizing"
|
||||
of READY: "Ready"
|
||||
of NOT_READY: "Not Ready"
|
||||
of NOT_MOUNTED: "Not Mounted"
|
||||
of SHUTTING_DOWN: "Shutting Down"
|
||||
|
||||
proc init*(
|
||||
t: typedesc[HealthStatus], strRep: string
|
||||
): HealthStatus {.raises: [ValueError].} =
|
||||
case strRep
|
||||
of "Initializing":
|
||||
return HealthStatus.INITIALIZING
|
||||
of "Synchronizing":
|
||||
return HealthStatus.SYNCHRONIZING
|
||||
of "Ready":
|
||||
return HealthStatus.READY
|
||||
of "Not Ready":
|
||||
return HealthStatus.NOT_READY
|
||||
of "Not Mounted":
|
||||
return HealthStatus.NOT_MOUNTED
|
||||
of "Shutting Down":
|
||||
return HealthStatus.SHUTTING_DOWN
|
||||
else:
|
||||
raise newException(ValueError, "Invalid HealthStatus string representation")
|
||||
|
||||
proc init*(p: typedesc[ProtocolHealth], protocol: string): ProtocolHealth =
|
||||
let p = ProtocolHealth(
|
||||
protocol: protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]()
|
||||
)
|
||||
return p
|
||||
|
||||
proc notReady(p: var ProtocolHealth, desc: string): ProtocolHealth =
|
||||
p.health = HealthStatus.NOT_READY
|
||||
p.desc = some(desc)
|
||||
return p
|
||||
|
||||
proc ready(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.READY
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc notMounted(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.NOT_MOUNTED
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc synchronizing(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.SYNCHRONIZING
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc initializing(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.INITIALIZING
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc shuttingDown(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.SHUTTING_DOWN
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
const FutIsReadyTimout = 5.seconds
|
||||
|
||||
proc getRelayHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Relay")
|
||||
if hm.node.get().wakuRelay == nil:
|
||||
return p.notMounted()
|
||||
|
||||
let relayPeers = hm.node
|
||||
.get().wakuRelay
|
||||
.getConnectedPubSubPeers(pubsubTopic = "").valueOr:
|
||||
return p.notMounted()
|
||||
|
||||
if relayPeers.len() == 0:
|
||||
return p.notReady("No connected peers")
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getRlnRelayHealth(hm: WakuNodeHealthMonitor): Future[ProtocolHealth] {.async.} =
|
||||
var p = ProtocolHealth.init("Rln Relay")
|
||||
if hm.node.get().wakuRlnRelay.isNil():
|
||||
return p.notMounted()
|
||||
|
||||
let isReadyStateFut = hm.node.get().wakuRlnRelay.isReady()
|
||||
if not await isReadyStateFut.withTimeout(FutIsReadyTimout):
|
||||
return p.notReady("Ready state check timed out")
|
||||
|
||||
try:
|
||||
if not isReadyStateFut.completed():
|
||||
return p.notReady("Ready state check timed out")
|
||||
elif isReadyStateFut.read():
|
||||
return p.ready()
|
||||
|
||||
return p.synchronizing()
|
||||
except:
|
||||
error "exception reading state: " & getCurrentExceptionMsg()
|
||||
return p.notReady("State cannot be determined")
|
||||
|
||||
proc getLightpushHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Lightpush")
|
||||
if hm.node.get().wakuLightPush == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if relayHealth == HealthStatus.READY:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("Node has no relay peers to fullfill push requests")
|
||||
|
||||
proc getLightpushClientHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Lightpush Client")
|
||||
if hm.node.get().wakuLightpushClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
let selfServiceAvailable =
|
||||
hm.node.get().wakuLightPush != nil and relayHealth == HealthStatus.READY
|
||||
let servicePeerAvailable =
|
||||
hm.node.get().peerManager.selectPeer(WakuLightPushCodec).isSome()
|
||||
|
||||
if selfServiceAvailable or servicePeerAvailable:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("No Lightpush service peer available yet")
|
||||
|
||||
proc getLegacyLightpushHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Lightpush")
|
||||
if hm.node.get().wakuLegacyLightPush == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if relayHealth == HealthStatus.READY:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("Node has no relay peers to fullfill push requests")
|
||||
|
||||
proc getLegacyLightpushClientHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Lightpush Client")
|
||||
if hm.node.get().wakuLegacyLightpushClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if (hm.node.get().wakuLegacyLightPush != nil and relayHealth == HealthStatus.READY) or
|
||||
hm.node.get().peerManager.selectPeer(WakuLegacyLightPushCodec).isSome():
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("No Lightpush service peer available yet")
|
||||
|
||||
proc getFilterHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Filter")
|
||||
if hm.node.get().wakuFilter == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if relayHealth == HealthStatus.READY:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("Relay is not ready, filter will not be able to sort out messages")
|
||||
|
||||
proc getFilterClientHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Filter Client")
|
||||
if hm.node.get().wakuFilterClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.get().peerManager.selectPeer(WakuFilterSubscribeCodec).isSome():
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("No Filter service peer available yet")
|
||||
|
||||
proc getStoreHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Store")
|
||||
if hm.node.get().wakuStore == nil:
|
||||
return p.notMounted()
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getStoreClientHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Store Client")
|
||||
if hm.node.get().wakuStoreClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.get().peerManager.selectPeer(WakuStoreCodec).isSome() or
|
||||
hm.node.get().wakuStore != nil:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady(
|
||||
"No Store service peer available yet, neither Store service set up for the node"
|
||||
)
|
||||
|
||||
proc getLegacyStoreHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Store")
|
||||
if hm.node.get().wakuLegacyStore == nil:
|
||||
return p.notMounted()
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getLegacyStoreClientHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Store Client")
|
||||
if hm.node.get().wakuLegacyStoreClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.get().peerManager.selectPeer(WakuLegacyStoreCodec).isSome() or
|
||||
hm.node.get().wakuLegacyStore != nil:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady(
|
||||
"No Legacy Store service peers are available yet, neither Store service set up for the node"
|
||||
)
|
||||
|
||||
proc getPeerExchangeHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Peer Exchange")
|
||||
if hm.node.get().wakuPeerExchange == nil:
|
||||
return p.notMounted()
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getRendezvousHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Rendezvous")
|
||||
if hm.node.get().wakuRendezvous == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.get().peerManager.switch.peerStore.peers(RendezVousCodec).len() == 0:
|
||||
return p.notReady("No Rendezvous peers are available yet")
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getNodeHealthReport*(hm: WakuNodeHealthMonitor): Future[HealthReport] {.async.} =
|
||||
var report: HealthReport
|
||||
report.nodeHealth = hm.nodeHealth
|
||||
|
||||
if hm.node.isSome():
|
||||
let relayHealth = hm.getRelayHealth()
|
||||
report.protocolsHealth.add(relayHealth)
|
||||
report.protocolsHealth.add(await hm.getRlnRelayHealth())
|
||||
report.protocolsHealth.add(hm.getLightpushHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getLegacyLightpushHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getFilterHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getStoreHealth())
|
||||
report.protocolsHealth.add(hm.getLegacyStoreHealth())
|
||||
report.protocolsHealth.add(hm.getPeerExchangeHealth())
|
||||
report.protocolsHealth.add(hm.getRendezvousHealth())
|
||||
|
||||
report.protocolsHealth.add(hm.getLightpushClientHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getLegacyLightpushClientHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getStoreClientHealth())
|
||||
report.protocolsHealth.add(hm.getLegacyStoreClientHealth())
|
||||
report.protocolsHealth.add(hm.getFilterClientHealth(relayHealth.health))
|
||||
return report
|
||||
|
||||
proc setNode*(hm: WakuNodeHealthMonitor, node: WakuNode) =
|
||||
hm.node = some(node)
|
||||
|
||||
proc setOverallHealth*(hm: WakuNodeHealthMonitor, health: HealthStatus) =
|
||||
hm.nodeHealth = health
|
||||
export node_health_monitor, protocol_health, online_monitor, health_status
|
||||
|
||||
16
waku/node/health_monitor/health_status.nim
Normal file
16
waku/node/health_monitor/health_status.nim
Normal file
@ -0,0 +1,16 @@
|
||||
import results, std/strutils
|
||||
|
||||
type HealthStatus* {.pure.} = enum
|
||||
INITIALIZING
|
||||
SYNCHRONIZING
|
||||
READY
|
||||
NOT_READY
|
||||
NOT_MOUNTED
|
||||
SHUTTING_DOWN
|
||||
|
||||
proc init*(t: typedesc[HealthStatus], strRep: string): Result[HealthStatus, string] =
|
||||
try:
|
||||
let status = parseEnum[HealthStatus](strRep)
|
||||
return ok(status)
|
||||
except ValueError:
|
||||
return err("Invalid HealthStatus string representation: " & strRep)
|
||||
270
waku/node/health_monitor/node_health_monitor.nim
Normal file
270
waku/node/health_monitor/node_health_monitor.nim
Normal file
@ -0,0 +1,270 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options, sets, strformat], chronos, chronicles, libp2p/protocols/rendezvous
|
||||
|
||||
import
|
||||
../waku_node,
|
||||
../../waku_rln_relay,
|
||||
../../waku_relay,
|
||||
../peer_manager,
|
||||
./online_monitor,
|
||||
./health_status,
|
||||
./protocol_health
|
||||
|
||||
## This module is aimed to check the state of the "self" Waku Node
|
||||
|
||||
type
|
||||
HealthReport* = object
|
||||
nodeHealth*: HealthStatus
|
||||
protocolsHealth*: seq[ProtocolHealth]
|
||||
|
||||
NodeHealthMonitor* = ref object
|
||||
nodeHealth: HealthStatus
|
||||
node: WakuNode
|
||||
onlineMonitor*: OnlineMonitor
|
||||
|
||||
template checkWakuNodeNotNil(node: WakuNode, p: ProtocolHealth): untyped =
|
||||
if node.isNil():
|
||||
warn "WakuNode is not set, cannot check health", protocol_health_instance = $p
|
||||
return p.notMounted()
|
||||
|
||||
proc getRelayHealth(hm: NodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Relay")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuRelay == nil:
|
||||
return p.notMounted()
|
||||
|
||||
let relayPeers = hm.node.wakuRelay.getConnectedPubSubPeers(pubsubTopic = "").valueOr:
|
||||
return p.notMounted()
|
||||
|
||||
if relayPeers.len() == 0:
|
||||
return p.notReady("No connected peers")
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getRlnRelayHealth(hm: NodeHealthMonitor): Future[ProtocolHealth] {.async.} =
|
||||
var p = ProtocolHealth.init("Rln Relay")
|
||||
if hm.node.isNil():
|
||||
warn "WakuNode is not set, cannot check health", protocol_health_instance = $p
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.wakuRlnRelay.isNil():
|
||||
return p.notMounted()
|
||||
|
||||
const FutIsReadyTimout = 5.seconds
|
||||
|
||||
let isReadyStateFut = hm.node.wakuRlnRelay.isReady()
|
||||
if not await isReadyStateFut.withTimeout(FutIsReadyTimout):
|
||||
return p.notReady("Ready state check timed out")
|
||||
|
||||
try:
|
||||
if not isReadyStateFut.completed():
|
||||
return p.notReady("Ready state check timed out")
|
||||
elif isReadyStateFut.read():
|
||||
return p.ready()
|
||||
|
||||
return p.synchronizing()
|
||||
except:
|
||||
error "exception reading state: " & getCurrentExceptionMsg()
|
||||
return p.notReady("State cannot be determined")
|
||||
|
||||
proc getLightpushHealth(
|
||||
hm: NodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Lightpush")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuLightPush == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if relayHealth == HealthStatus.READY:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("Node has no relay peers to fullfill push requests")
|
||||
|
||||
proc getLightpushClientHealth(
|
||||
hm: NodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Lightpush Client")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuLightpushClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
let selfServiceAvailable =
|
||||
hm.node.wakuLightPush != nil and relayHealth == HealthStatus.READY
|
||||
let servicePeerAvailable = hm.node.peerManager.selectPeer(WakuLightPushCodec).isSome()
|
||||
|
||||
if selfServiceAvailable or servicePeerAvailable:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("No Lightpush service peer available yet")
|
||||
|
||||
proc getLegacyLightpushHealth(
|
||||
hm: NodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Lightpush")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuLegacyLightPush == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if relayHealth == HealthStatus.READY:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("Node has no relay peers to fullfill push requests")
|
||||
|
||||
proc getLegacyLightpushClientHealth(
|
||||
hm: NodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Lightpush Client")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuLegacyLightpushClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if (hm.node.wakuLegacyLightPush != nil and relayHealth == HealthStatus.READY) or
|
||||
hm.node.peerManager.selectPeer(WakuLegacyLightPushCodec).isSome():
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("No Lightpush service peer available yet")
|
||||
|
||||
proc getFilterHealth(hm: NodeHealthMonitor, relayHealth: HealthStatus): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Filter")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuFilter == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if relayHealth == HealthStatus.READY:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("Relay is not ready, filter will not be able to sort out messages")
|
||||
|
||||
proc getFilterClientHealth(
|
||||
hm: NodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Filter Client")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuFilterClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.peerManager.selectPeer(WakuFilterSubscribeCodec).isSome():
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("No Filter service peer available yet")
|
||||
|
||||
proc getStoreHealth(hm: NodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Store")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuStore == nil:
|
||||
return p.notMounted()
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Store Client")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuStoreClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.peerManager.selectPeer(WakuStoreCodec).isSome() or hm.node.wakuStore != nil:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady(
|
||||
"No Store service peer available yet, neither Store service set up for the node"
|
||||
)
|
||||
|
||||
proc getLegacyStoreHealth(hm: NodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Store")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuLegacyStore == nil:
|
||||
return p.notMounted()
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getLegacyStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Store Client")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuLegacyStoreClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.peerManager.selectPeer(WakuLegacyStoreCodec).isSome() or
|
||||
hm.node.wakuLegacyStore != nil:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady(
|
||||
"No Legacy Store service peers are available yet, neither Store service set up for the node"
|
||||
)
|
||||
|
||||
proc getPeerExchangeHealth(hm: NodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Peer Exchange")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuPeerExchange == nil:
|
||||
return p.notMounted()
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getRendezvousHealth(hm: NodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Rendezvous")
|
||||
checkWakuNodeNotNil(hm.node, p)
|
||||
|
||||
if hm.node.wakuRendezvous == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.peerManager.switch.peerStore.peers(RendezVousCodec).len() == 0:
|
||||
return p.notReady("No Rendezvous peers are available yet")
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} =
|
||||
var report: HealthReport
|
||||
report.nodeHealth = hm.nodeHealth
|
||||
|
||||
if not hm.node.isNil():
|
||||
let relayHealth = hm.getRelayHealth()
|
||||
report.protocolsHealth.add(relayHealth)
|
||||
report.protocolsHealth.add(await hm.getRlnRelayHealth())
|
||||
report.protocolsHealth.add(hm.getLightpushHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getLegacyLightpushHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getFilterHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getStoreHealth())
|
||||
report.protocolsHealth.add(hm.getLegacyStoreHealth())
|
||||
report.protocolsHealth.add(hm.getPeerExchangeHealth())
|
||||
report.protocolsHealth.add(hm.getRendezvousHealth())
|
||||
|
||||
report.protocolsHealth.add(hm.getLightpushClientHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getLegacyLightpushClientHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getStoreClientHealth())
|
||||
report.protocolsHealth.add(hm.getLegacyStoreClientHealth())
|
||||
report.protocolsHealth.add(hm.getFilterClientHealth(relayHealth.health))
|
||||
return report
|
||||
|
||||
proc setNodeToHealthMonitor*(hm: NodeHealthMonitor, node: WakuNode) =
|
||||
hm.node = node
|
||||
|
||||
proc setOverallHealth*(hm: NodeHealthMonitor, health: HealthStatus) =
|
||||
hm.nodeHealth = health
|
||||
|
||||
proc startHealthMonitor*(hm: NodeHealthMonitor) =
|
||||
hm.onlineMonitor.startOnlineMonitor()
|
||||
|
||||
proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
|
||||
await hm.onlineMonitor.stopOnlineMonitor()
|
||||
|
||||
proc new*(
|
||||
T: type NodeHealthMonitor,
|
||||
dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
|
||||
): T =
|
||||
T(
|
||||
nodeHealth: INITIALIZING,
|
||||
node: nil,
|
||||
onlineMonitor: OnlineMonitor.init(dnsNameServers),
|
||||
)
|
||||
77
waku/node/health_monitor/online_monitor.nim
Normal file
77
waku/node/health_monitor/online_monitor.nim
Normal file
@ -0,0 +1,77 @@
|
||||
import std/sequtils
|
||||
import chronos, chronicles, libp2p/nameresolving/dnsresolver, libp2p/peerstore
|
||||
|
||||
import ../peer_manager/waku_peer_store, waku/waku_core/peers
|
||||
|
||||
type
|
||||
OnOnlineStateChange* = proc(online: bool) {.gcsafe, raises: [].}
|
||||
|
||||
OnlineMonitor* = ref object
|
||||
onOnlineStateChange: OnOnlineStateChange
|
||||
dnsNameServers*: seq[IpAddress]
|
||||
onlineStateObservers: seq[OnOnlineStateChange]
|
||||
networkConnLoopHandle: Future[void] # node: WakuNode
|
||||
peerStore: PeerStore
|
||||
online: bool
|
||||
|
||||
proc checkInternetConnectivity(
|
||||
nameServerIps: seq[IpAddress], timeout = 2.seconds
|
||||
): Future[bool] {.async.} =
|
||||
const DNSCheckDomain = "one.one.one.one"
|
||||
let nameServers = nameServerIps.mapIt(initTAddress(it, Port(53)))
|
||||
let dnsResolver = DnsResolver.new(nameServers)
|
||||
|
||||
# Resolve domain IP
|
||||
let resolved = await dnsResolver.resolveIp(DNSCheckDomain, 0.Port, Domain.AF_UNSPEC)
|
||||
if resolved.len > 0:
|
||||
return true
|
||||
else:
|
||||
return false
|
||||
|
||||
proc updateOnlineState(self: OnlineMonitor) {.async.} =
|
||||
if self.onlineStateObservers.len == 0:
|
||||
trace "No online state observers registered, cannot notify about online state change"
|
||||
return
|
||||
|
||||
let numConnectedPeers =
|
||||
if self.peerStore.isNil():
|
||||
0
|
||||
else:
|
||||
self.peerStore.peers().countIt(it.connectedness == Connected)
|
||||
|
||||
self.online =
|
||||
if numConnectedPeers > 0:
|
||||
true
|
||||
else:
|
||||
await checkInternetConnectivity(self.dnsNameServers)
|
||||
|
||||
for onlineStateObserver in self.onlineStateObservers:
|
||||
onlineStateObserver(self.online)
|
||||
|
||||
proc networkConnectivityLoop(self: OnlineMonitor): Future[void] {.async.} =
|
||||
## Checks periodically whether the node is online or not
|
||||
## and triggers any change that depends on the network connectivity state
|
||||
while true:
|
||||
await self.updateOnlineState()
|
||||
await sleepAsync(15.seconds)
|
||||
|
||||
proc startOnlineMonitor*(self: OnlineMonitor) =
|
||||
self.networkConnLoopHandle = self.networkConnectivityLoop()
|
||||
|
||||
proc stopOnlineMonitor*(self: OnlineMonitor) {.async.} =
|
||||
if not self.networkConnLoopHandle.isNil():
|
||||
await self.networkConnLoopHandle.cancelAndWait()
|
||||
|
||||
proc setPeerStoreToOnlineMonitor*(self: OnlineMonitor, peerStore: PeerStore) =
|
||||
self.peerStore = peerStore
|
||||
|
||||
proc addOnlineStateObserver*(self: OnlineMonitor, observer: OnOnlineStateChange) =
|
||||
## Adds an observer that will be called when the online state changes
|
||||
if observer notin self.onlineStateObservers:
|
||||
self.onlineStateObservers.add(observer)
|
||||
|
||||
proc amIOnline*(self: OnlineMonitor): bool =
|
||||
return self.online
|
||||
|
||||
proc init*(T: type OnlineMonitor, dnsNameServers: seq[IpAddress]): OnlineMonitor =
|
||||
T(dnsNameServers: dnsNameServers, onlineStateObservers: @[])
|
||||
46
waku/node/health_monitor/protocol_health.nim
Normal file
46
waku/node/health_monitor/protocol_health.nim
Normal file
@ -0,0 +1,46 @@
|
||||
import std/[options, strformat]
|
||||
import ./health_status
|
||||
|
||||
type ProtocolHealth* = object
|
||||
protocol*: string
|
||||
health*: HealthStatus
|
||||
desc*: Option[string] ## describes why a certain protocol is considered `NOT_READY`
|
||||
|
||||
proc notReady*(p: var ProtocolHealth, desc: string): ProtocolHealth =
|
||||
p.health = HealthStatus.NOT_READY
|
||||
p.desc = some(desc)
|
||||
return p
|
||||
|
||||
proc ready*(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.READY
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc notMounted*(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.NOT_MOUNTED
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc synchronizing*(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.SYNCHRONIZING
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc initializing*(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.INITIALIZING
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc shuttingDown*(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.SHUTTING_DOWN
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc `$`*(p: ProtocolHealth): string =
|
||||
return fmt"protocol: {p.protocol}, health: {p.health}, description: {p.desc}"
|
||||
|
||||
proc init*(p: typedesc[ProtocolHealth], protocol: string): ProtocolHealth =
|
||||
let p = ProtocolHealth(
|
||||
protocol: protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]()
|
||||
)
|
||||
return p
|
||||
@ -8,7 +8,6 @@ import
|
||||
libp2p/multistream,
|
||||
libp2p/muxers/muxer,
|
||||
libp2p/nameresolving/nameresolver,
|
||||
libp2p/nameresolving/dnsresolver,
|
||||
libp2p/peerstore
|
||||
|
||||
import
|
||||
@ -21,6 +20,7 @@ import
|
||||
../../waku_enr/sharding,
|
||||
../../waku_enr/capabilities,
|
||||
../../waku_metadata,
|
||||
../health_monitor/online_monitor,
|
||||
./peer_store/peer_storage,
|
||||
./waku_peer_store
|
||||
|
||||
@ -74,8 +74,6 @@ const
|
||||
# Max peers that we allow from the same IP
|
||||
DefaultColocationLimit* = 5
|
||||
|
||||
DNSCheckDomain = "one.one.one.one"
|
||||
|
||||
type ConnectionChangeHandler* = proc(
|
||||
peerId: PeerId, peerEvent: PeerEventKind
|
||||
): Future[void] {.gcsafe, raises: [Defect].}
|
||||
@ -98,16 +96,12 @@ type PeerManager* = ref object of RootObj
|
||||
started: bool
|
||||
shardedPeerManagement: bool # temp feature flag
|
||||
onConnectionChange*: ConnectionChangeHandler
|
||||
dnsNameServers*: seq[IpAddress]
|
||||
online: bool
|
||||
online: bool ## state managed by online_monitor module
|
||||
|
||||
#~~~~~~~~~~~~~~~~~~~#
|
||||
# Helper Functions #
|
||||
#~~~~~~~~~~~~~~~~~~~#
|
||||
|
||||
template isOnline*(self: PeerManager): bool =
|
||||
self.online
|
||||
|
||||
proc calculateBackoff(
|
||||
initialBackoffInSec: int, backoffFactor: int, failedAttempts: int
|
||||
): timer.Duration =
|
||||
@ -543,35 +537,9 @@ proc getStreamByPeerIdAndProtocol*(
|
||||
|
||||
return ok(streamRes.get())
|
||||
|
||||
proc checkInternetConnectivity(
|
||||
nameServerIps: seq[IpAddress], timeout = 2.seconds
|
||||
): Future[bool] {.async.} =
|
||||
var nameServers: seq[TransportAddress]
|
||||
for ip in nameServerIps:
|
||||
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
|
||||
|
||||
let dnsResolver = DnsResolver.new(nameServers)
|
||||
|
||||
# Resolve domain IP
|
||||
let resolved = await dnsResolver.resolveIp(DNSCheckDomain, 0.Port, Domain.AF_UNSPEC)
|
||||
|
||||
if resolved.len > 0:
|
||||
return true
|
||||
else:
|
||||
return false
|
||||
|
||||
proc updateOnlineState*(pm: PeerManager) {.async.} =
|
||||
let numConnectedPeers =
|
||||
pm.switch.peerStore.peers().countIt(it.connectedness == Connected)
|
||||
|
||||
if numConnectedPeers > 0:
|
||||
pm.online = true
|
||||
else:
|
||||
pm.online = await checkInternetConnectivity(pm.dnsNameServers)
|
||||
|
||||
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
|
||||
# only attempt if current node is online
|
||||
if not pm.isOnline():
|
||||
if not pm.online:
|
||||
error "connectToRelayPeers: won't attempt new connections - node is offline"
|
||||
return
|
||||
|
||||
@ -739,6 +707,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
||||
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
|
||||
asyncSpawn(pm.switch.disconnect(peerId))
|
||||
peerStore.delete(peerId)
|
||||
|
||||
if not pm.onConnectionChange.isNil():
|
||||
# we don't want to await for the callback to finish
|
||||
asyncSpawn pm.onConnectionChange(peerId, Joined)
|
||||
@ -753,6 +722,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
|
||||
if pm.ipTable[ip].len == 0:
|
||||
pm.ipTable.del(ip)
|
||||
break
|
||||
|
||||
if not pm.onConnectionChange.isNil():
|
||||
# we don't want to await for the callback to finish
|
||||
asyncSpawn pm.onConnectionChange(peerId, Left)
|
||||
@ -809,6 +779,10 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
|
||||
protoStreamsOut.float64, labelValues = [$Direction.Out, proto]
|
||||
)
|
||||
|
||||
proc getOnlineStateObserver*(pm: PeerManager): OnOnlineStateChange =
|
||||
return proc(online: bool) {.gcsafe, raises: [].} =
|
||||
pm.online = online
|
||||
|
||||
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
|
||||
# Pruning and Maintenance (Stale Peers Management) #
|
||||
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
|
||||
@ -817,7 +791,7 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} =
|
||||
if pm.wakuMetadata.shards.len == 0:
|
||||
return
|
||||
|
||||
if not pm.isOnline():
|
||||
if not pm.online:
|
||||
error "manageRelayPeers: won't attempt new connections - node is offline"
|
||||
return
|
||||
|
||||
@ -1048,7 +1022,6 @@ proc new*(
|
||||
maxFailedAttempts = MaxFailedAttempts,
|
||||
colocationLimit = DefaultColocationLimit,
|
||||
shardedPeerManagement = false,
|
||||
dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
|
||||
): PeerManager {.gcsafe.} =
|
||||
let capacity = switch.peerStore.capacity
|
||||
let maxConnections = switch.connManager.inSema.size
|
||||
@ -1099,7 +1072,6 @@ proc new*(
|
||||
maxFailedAttempts: maxFailedAttempts,
|
||||
colocationLimit: colocationLimit,
|
||||
shardedPeerManagement: shardedPeerManagement,
|
||||
dnsNameServers: dnsNameServers,
|
||||
online: true,
|
||||
)
|
||||
|
||||
|
||||
@ -40,7 +40,7 @@ type RestServerConf* = object
|
||||
relayCacheCapacity*: uint32
|
||||
|
||||
proc startRestServerEssentials*(
|
||||
nodeHealthMonitor: WakuNodeHealthMonitor, conf: RestServerConf, portsShift: uint16
|
||||
nodeHealthMonitor: NodeHealthMonitor, conf: RestServerConf, portsShift: uint16
|
||||
): Result[WakuRestServerRef, string] =
|
||||
let requestErrorHandler: RestRequestErrorHandler = proc(
|
||||
error: RestRequestError, request: HttpRequestRef
|
||||
|
||||
@ -11,7 +11,7 @@ const ROUTE_HEALTH* = "/health"
|
||||
const FutHealthReportTimeout = 5.seconds
|
||||
|
||||
proc installHealthApiHandler*(
|
||||
router: var RestRouter, nodeHealthMonitor: WakuNodeHealthMonitor
|
||||
router: var RestRouter, nodeHealthMonitor: NodeHealthMonitor
|
||||
) =
|
||||
router.api(MethodGet, ROUTE_HEALTH) do() -> RestApiResponse:
|
||||
let healthReportFut = nodeHealthMonitor.getNodeHealthReport()
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import results
|
||||
import chronicles, json_serialization, json_serialization/std/options
|
||||
import ../../../waku_node, ../serdes
|
||||
|
||||
@ -31,13 +32,10 @@ proc readValue*(
|
||||
)
|
||||
|
||||
let fieldValue = reader.readValue(string)
|
||||
try:
|
||||
health = some(HealthStatus.init(fieldValue))
|
||||
protocol = some(fieldName)
|
||||
except ValueError:
|
||||
reader.raiseUnexpectedValue(
|
||||
"Invalid `health` value: " & getCurrentExceptionMsg()
|
||||
)
|
||||
let h = HealthStatus.init(fieldValue).valueOr:
|
||||
reader.raiseUnexpectedValue("Invalid `health` value: " & $error)
|
||||
health = some(h)
|
||||
protocol = some(fieldName)
|
||||
|
||||
value = ProtocolHealth(protocol: protocol.get(), health: health.get(), desc: desc)
|
||||
|
||||
@ -63,10 +61,11 @@ proc readValue*(
|
||||
reader.raiseUnexpectedField(
|
||||
"Multiple `nodeHealth` fields found", "HealthReport"
|
||||
)
|
||||
try:
|
||||
nodeHealth = some(HealthStatus.init(reader.readValue(string)))
|
||||
except ValueError:
|
||||
reader.raiseUnexpectedValue("Invalid `health` value")
|
||||
|
||||
let health = HealthStatus.init(reader.readValue(string)).valueOr:
|
||||
reader.raiseUnexpectedValue("Invalid `health` value: " & $error)
|
||||
|
||||
nodeHealth = some(health)
|
||||
of "protocolsHealth":
|
||||
if protocolsHealth.isSome():
|
||||
reader.raiseUnexpectedField(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user