mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-04 06:53:12 +00:00
chore: Separation of node health and initialization state from rln_relay (#2612)
* Separation of node health and initialization state from rln_relay status. Make (only) health endpoint avail early and install others in the last stage of node setup. * Proper json report from /health, adjusted and fixed test, added convenient script for checking node health * Stop wakunode2 if configured rest server cannot be started * Fix wakuRlnRelay protocol existence check * Fix typo * Removed unused imports from touched files. * Added missing /health test for all
This commit is contained in:
parent
8a2b0dcf7e
commit
daa88019d0
@ -18,7 +18,8 @@ import
|
|||||||
../../waku/common/logging,
|
../../waku/common/logging,
|
||||||
../../waku/factory/external_config,
|
../../waku/factory/external_config,
|
||||||
../../waku/factory/networks_config,
|
../../waku/factory/networks_config,
|
||||||
../../waku/factory/app
|
../../waku/factory/app,
|
||||||
|
../../waku/node/health_monitor
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "wakunode main"
|
topics = "wakunode main"
|
||||||
@ -88,54 +89,74 @@ when isMainModule:
|
|||||||
doInspectRlnDb(conf)
|
doInspectRlnDb(conf)
|
||||||
of noCommand:
|
of noCommand:
|
||||||
case conf.clusterId
|
case conf.clusterId
|
||||||
# cluster-id=0
|
# cluster-id=0
|
||||||
of 0:
|
of 0:
|
||||||
let clusterZeroConf = ClusterConf.ClusterZeroConf()
|
let clusterZeroConf = ClusterConf.ClusterZeroConf()
|
||||||
conf.pubsubTopics = clusterZeroConf.pubsubTopics
|
conf.pubsubTopics = clusterZeroConf.pubsubTopics
|
||||||
# TODO: Write some template to "merge" the configs
|
# TODO: Write some template to "merge" the configs
|
||||||
# cluster-id=1 (aka The Waku Network)
|
# cluster-id=1 (aka The Waku Network)
|
||||||
of 1:
|
of 1:
|
||||||
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
|
let twnClusterConf = ClusterConf.TheWakuNetworkConf()
|
||||||
if len(conf.shards) != 0:
|
if len(conf.shards) != 0:
|
||||||
conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16])
|
conf.pubsubTopics = conf.shards.mapIt(twnClusterConf.pubsubTopics[it.uint16])
|
||||||
else:
|
|
||||||
conf.pubsubTopics = twnClusterConf.pubsubTopics
|
|
||||||
|
|
||||||
# Override configuration
|
|
||||||
conf.maxMessageSize = twnClusterConf.maxMessageSize
|
|
||||||
conf.clusterId = twnClusterConf.clusterId
|
|
||||||
conf.rlnRelay = twnClusterConf.rlnRelay
|
|
||||||
conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
|
|
||||||
conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
|
|
||||||
conf.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold
|
|
||||||
conf.discv5Discovery = twnClusterConf.discv5Discovery
|
|
||||||
conf.discv5BootstrapNodes =
|
|
||||||
conf.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes
|
|
||||||
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
|
|
||||||
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
|
|
||||||
else:
|
else:
|
||||||
discard
|
conf.pubsubTopics = twnClusterConf.pubsubTopics
|
||||||
|
|
||||||
|
# Override configuration
|
||||||
|
conf.maxMessageSize = twnClusterConf.maxMessageSize
|
||||||
|
conf.clusterId = twnClusterConf.clusterId
|
||||||
|
conf.rlnRelay = twnClusterConf.rlnRelay
|
||||||
|
conf.rlnRelayEthContractAddress = twnClusterConf.rlnRelayEthContractAddress
|
||||||
|
conf.rlnRelayDynamic = twnClusterConf.rlnRelayDynamic
|
||||||
|
conf.rlnRelayBandwidthThreshold = twnClusterConf.rlnRelayBandwidthThreshold
|
||||||
|
conf.discv5Discovery = twnClusterConf.discv5Discovery
|
||||||
|
conf.discv5BootstrapNodes =
|
||||||
|
conf.discv5BootstrapNodes & twnClusterConf.discv5BootstrapNodes
|
||||||
|
conf.rlnEpochSizeSec = twnClusterConf.rlnEpochSizeSec
|
||||||
|
conf.rlnRelayUserMessageLimit = twnClusterConf.rlnRelayUserMessageLimit
|
||||||
|
else:
|
||||||
|
discard
|
||||||
|
|
||||||
info "Running nwaku node", version = app.git_version
|
info "Running nwaku node", version = app.git_version
|
||||||
logConfig(conf)
|
logConfig(conf)
|
||||||
|
|
||||||
|
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
|
||||||
|
# It will always be called from main thread anyway.
|
||||||
|
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
|
||||||
|
var nodeHealthMonitor {.threadvar.}: WakuNodeHealthMonitor
|
||||||
|
nodeHealthMonitor = WakuNodeHealthMonitor()
|
||||||
|
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
|
||||||
|
|
||||||
|
let restServerRes = startRestServerEsentials(nodeHealthMonitor, conf)
|
||||||
|
if restServerRes.isErr():
|
||||||
|
error "Starting REST server failed.", error = $restServerRes.error()
|
||||||
|
quit(QuitFailure)
|
||||||
|
|
||||||
var wakunode2 = App.init(conf).valueOr:
|
var wakunode2 = App.init(conf).valueOr:
|
||||||
error "App initialization failed", error = error
|
error "App initialization failed", error = error
|
||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
|
|
||||||
|
nodeHealthMonitor.setNode(wakunode2.node)
|
||||||
|
|
||||||
wakunode2.startApp().isOkOr:
|
wakunode2.startApp().isOkOr:
|
||||||
error "Starting app failed", error = error
|
error "Starting app failed", error = error
|
||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
|
|
||||||
|
if conf.rest and not restServerRes.isErr():
|
||||||
|
wakunode2.restServer = restServerRes.value
|
||||||
|
|
||||||
wakunode2.setupMonitoringAndExternalInterfaces().isOkOr:
|
wakunode2.setupMonitoringAndExternalInterfaces().isOkOr:
|
||||||
error "Starting monitoring and external interfaces failed", error = error
|
error "Starting monitoring and external interfaces failed", error = error
|
||||||
quit(QuitFailure)
|
quit(QuitFailure)
|
||||||
|
|
||||||
|
nodeHealthMonitor.setOverallHealth(HealthStatus.READY)
|
||||||
|
|
||||||
debug "Setting up shutdown hooks"
|
debug "Setting up shutdown hooks"
|
||||||
## Setup shutdown hooks for this process.
|
## Setup shutdown hooks for this process.
|
||||||
## Stop node gracefully on shutdown.
|
## Stop node gracefully on shutdown.
|
||||||
|
|
||||||
proc asyncStopper(node: App) {.async: (raises: [Exception]).} =
|
proc asyncStopper(node: App) {.async: (raises: [Exception]).} =
|
||||||
|
nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
|
||||||
await node.stop()
|
await node.stop()
|
||||||
quit(QuitSuccess)
|
quit(QuitSuccess)
|
||||||
|
|
||||||
|
|||||||
55
scripts/chkhealth.sh
Executable file
55
scripts/chkhealth.sh
Executable file
@ -0,0 +1,55 @@
|
|||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
# optional argument to specgify the ip address
|
||||||
|
ip_address=$1
|
||||||
|
plain_text_out=false
|
||||||
|
|
||||||
|
# Parse command line arguments
|
||||||
|
POSITIONAL_ARGS=()
|
||||||
|
|
||||||
|
while [[ $# -gt 0 ]]; do
|
||||||
|
case $1 in
|
||||||
|
-p|--plain)
|
||||||
|
plain_text_out=true
|
||||||
|
shift # past argument
|
||||||
|
;;
|
||||||
|
-*|--*)
|
||||||
|
echo "Unknown option $1"
|
||||||
|
exit 1
|
||||||
|
;;
|
||||||
|
*)
|
||||||
|
POSITIONAL_ARGS+=("$1") # save positional arg
|
||||||
|
shift # past argument
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
done
|
||||||
|
|
||||||
|
set -- "${POSITIONAL_ARGS[@]}" # restore positional parameters
|
||||||
|
|
||||||
|
# Check if an IP address is provided as an argument
|
||||||
|
if [[ -n "$1" ]]; then
|
||||||
|
ip_address="$1"
|
||||||
|
else
|
||||||
|
ip_address="localhost:8645"
|
||||||
|
fi
|
||||||
|
|
||||||
|
# check if curl is available
|
||||||
|
if ! command -v curl &> /dev/null
|
||||||
|
then
|
||||||
|
echo "curl could not be found"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
response=$(curl -s GET http://${ip_address}/health)
|
||||||
|
|
||||||
|
if [[ -z "${response}" ]]; then
|
||||||
|
echo -e "$(date +'%H:%M:%S')\tnode health status is: unknown\n"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
|
||||||
|
if ! command -v jq &> /dev/null || [[ "$plain_text_out" = true ]]; then
|
||||||
|
echo -e "$(date +'%H:%M:%S')\tnode health status is: ${response}\n"
|
||||||
|
else
|
||||||
|
echo -e "$(date +'%H:%M:%S')\tnode health status is:\n"
|
||||||
|
echo "${response}" | jq .
|
||||||
|
fi
|
||||||
@ -77,7 +77,8 @@ import
|
|||||||
./wakunode_rest/test_rest_filter,
|
./wakunode_rest/test_rest_filter,
|
||||||
./wakunode_rest/test_rest_lightpush,
|
./wakunode_rest/test_rest_lightpush,
|
||||||
./wakunode_rest/test_rest_admin,
|
./wakunode_rest/test_rest_admin,
|
||||||
./wakunode_rest/test_rest_cors
|
./wakunode_rest/test_rest_cors,
|
||||||
|
./wakunode_rest/test_rest_health
|
||||||
|
|
||||||
import ./waku_rln_relay/test_all
|
import ./waku_rln_relay/test_all
|
||||||
|
|
||||||
|
|||||||
@ -19,6 +19,7 @@ import
|
|||||||
../../waku/waku_api/rest/health/handlers as health_api,
|
../../waku/waku_api/rest/health/handlers as health_api,
|
||||||
../../waku/waku_api/rest/health/client as health_api_client,
|
../../waku/waku_api/rest/health/client as health_api_client,
|
||||||
../../waku/waku_rln_relay,
|
../../waku/waku_rln_relay,
|
||||||
|
../../waku/node/health_monitor,
|
||||||
../testlib/common,
|
../testlib/common,
|
||||||
../testlib/testutils,
|
../testlib/testutils,
|
||||||
../testlib/wakucore,
|
../testlib/wakucore,
|
||||||
@ -35,17 +36,20 @@ proc testWakuNode(): WakuNode =
|
|||||||
|
|
||||||
suite "Waku v2 REST API - health":
|
suite "Waku v2 REST API - health":
|
||||||
# TODO: better test for health
|
# TODO: better test for health
|
||||||
xasyncTest "Get node health info - GET /health":
|
asyncTest "Get node health info - GET /health":
|
||||||
# Given
|
# Given
|
||||||
let node = testWakuNode()
|
let node = testWakuNode()
|
||||||
|
let healthMonitor = WakuNodeHealthMonitor()
|
||||||
await node.start()
|
await node.start()
|
||||||
await node.mountRelay()
|
await node.mountRelay()
|
||||||
|
|
||||||
|
healthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
|
||||||
|
|
||||||
let restPort = Port(58001)
|
let restPort = Port(58001)
|
||||||
let restAddress = parseIpAddress("0.0.0.0")
|
let restAddress = parseIpAddress("0.0.0.0")
|
||||||
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
let restServer = WakuRestServerRef.init(restAddress, restPort).tryGet()
|
||||||
|
|
||||||
installHealthApiHandler(restServer.router, node)
|
installHealthApiHandler(restServer.router, healthMonitor)
|
||||||
restServer.start()
|
restServer.start()
|
||||||
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
let client = newRestHttpClient(initTAddress(restAddress, restPort))
|
||||||
|
|
||||||
@ -54,9 +58,10 @@ suite "Waku v2 REST API - health":
|
|||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
response.status == 503
|
response.status == 200
|
||||||
$response.contentType == $MIMETYPE_TEXT
|
$response.contentType == $MIMETYPE_JSON
|
||||||
response.data == "Node is not ready"
|
response.data ==
|
||||||
|
HealthReport(nodeHealth: HealthStatus.INITIALIZING, protocolsHealth: @[])
|
||||||
|
|
||||||
# now kick in rln (currently the only check for health)
|
# now kick in rln (currently the only check for health)
|
||||||
await node.mountRlnRelay(
|
await node.mountRlnRelay(
|
||||||
@ -67,15 +72,19 @@ suite "Waku v2 REST API - health":
|
|||||||
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
|
rlnRelayTreePath: genTempPath("rln_tree", "wakunode"),
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
healthMonitor.setNode(node)
|
||||||
|
healthMonitor.setOverallHealth(HealthStatus.READY)
|
||||||
# When
|
# When
|
||||||
response = await client.healthCheck()
|
response = await client.healthCheck()
|
||||||
|
|
||||||
# Then
|
# Then
|
||||||
check:
|
check:
|
||||||
response.status == 200
|
response.status == 200
|
||||||
$response.contentType == $MIMETYPE_TEXT
|
$response.contentType == $MIMETYPE_JSON
|
||||||
response.data == "Node is healthy"
|
response.data.nodeHealth == HealthStatus.READY
|
||||||
|
response.data.protocolsHealth.len() == 1
|
||||||
|
response.data.protocolsHealth[0].protocol == "Rln Relay"
|
||||||
|
response.data.protocolsHealth[0].health == HealthStatus.READY
|
||||||
|
|
||||||
await restServer.stop()
|
await restServer.stop()
|
||||||
await restServer.closeWait()
|
await restServer.closeWait()
|
||||||
|
|||||||
@ -19,16 +19,11 @@ import
|
|||||||
metrics,
|
metrics,
|
||||||
metrics/chronos_httpserver
|
metrics/chronos_httpserver
|
||||||
import
|
import
|
||||||
../../waku/common/utils/nat,
|
|
||||||
../../waku/common/utils/parse_size_units,
|
|
||||||
../../waku/common/databases/db_sqlite,
|
|
||||||
../../waku/waku_archive/driver/builder,
|
|
||||||
../../waku/waku_archive/retention_policy/builder,
|
|
||||||
../../waku/waku_core,
|
../../waku/waku_core,
|
||||||
../../waku/waku_node,
|
../../waku/waku_node,
|
||||||
../../waku/node/waku_metrics,
|
../../waku/node/waku_metrics,
|
||||||
../../waku/node/peer_manager,
|
../../waku/node/peer_manager,
|
||||||
../../waku/node/peer_manager/peer_store/waku_peer_storage,
|
../../waku/node/health_monitor,
|
||||||
../../waku/waku_api/message_cache,
|
../../waku/waku_api/message_cache,
|
||||||
../../waku/waku_api/handlers,
|
../../waku/waku_api/handlers,
|
||||||
../../waku/waku_api/rest/server,
|
../../waku/waku_api/rest/server,
|
||||||
@ -43,10 +38,8 @@ import
|
|||||||
../../waku/discovery/waku_dnsdisc,
|
../../waku/discovery/waku_dnsdisc,
|
||||||
../../waku/discovery/waku_discv5,
|
../../waku/discovery/waku_discv5,
|
||||||
../../waku/waku_enr/sharding,
|
../../waku/waku_enr/sharding,
|
||||||
../../waku/waku_peer_exchange,
|
|
||||||
../../waku/waku_rln_relay,
|
../../waku/waku_rln_relay,
|
||||||
../../waku/waku_store,
|
../../waku/waku_store,
|
||||||
../../waku/waku_lightpush/common,
|
|
||||||
../../waku/waku_filter_v2,
|
../../waku/waku_filter_v2,
|
||||||
../../waku/factory/node_factory,
|
../../waku/factory/node_factory,
|
||||||
../../waku/factory/internal_config,
|
../../waku/factory/internal_config,
|
||||||
@ -70,7 +63,7 @@ type
|
|||||||
|
|
||||||
node: WakuNode
|
node: WakuNode
|
||||||
|
|
||||||
restServer: Option[WakuRestServerRef]
|
restServer*: Option[WakuRestServerRef]
|
||||||
metricsServer: Option[MetricsHttpServerRef]
|
metricsServer: Option[MetricsHttpServerRef]
|
||||||
|
|
||||||
AppResult*[T] = Result[T, string]
|
AppResult*[T] = Result[T, string]
|
||||||
@ -155,7 +148,7 @@ proc init*(T: type App, conf: WakuNodeConf): Result[App, string] =
|
|||||||
|
|
||||||
## Setup DiscoveryV5
|
## Setup DiscoveryV5
|
||||||
|
|
||||||
proc setupDiscoveryV5(app: App): WakuDiscoveryV5 =
|
proc setupDiscoveryV5*(app: App): WakuDiscoveryV5 =
|
||||||
let dynamicBootstrapEnrs =
|
let dynamicBootstrapEnrs =
|
||||||
app.dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get())
|
app.dynamicBootstrapNodes.filterIt(it.hasUdpPort()).mapIt(it.enr.get())
|
||||||
|
|
||||||
@ -281,12 +274,19 @@ proc startApp*(app: var App): AppResult[void] =
|
|||||||
|
|
||||||
## Monitoring and external interfaces
|
## Monitoring and external interfaces
|
||||||
|
|
||||||
proc startRestServer(
|
# Used to register api endpoints that are not currently installed as keys,
|
||||||
app: App, address: IpAddress, port: Port, conf: WakuNodeConf
|
# values are holding error messages to be returned to the client
|
||||||
): AppResult[WakuRestServerRef] =
|
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
|
||||||
# Used to register api endpoints that are not currently installed as keys,
|
# It will always be called from main thread anyway.
|
||||||
# values are holding error messages to be returned to the client
|
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
|
||||||
var notInstalledTab: Table[string, string] = initTable[string, string]()
|
var restServerNotInstalledTab {.threadvar.}: TableRef[string, string]
|
||||||
|
restServerNotInstalledTab = newTable[string, string]()
|
||||||
|
|
||||||
|
proc startRestServerEsentials*(
|
||||||
|
nodeHealthMonitor: WakuNodeHealthMonitor, conf: WakuNodeConf
|
||||||
|
): AppResult[Option[WakuRestServerRef]] =
|
||||||
|
if not conf.rest:
|
||||||
|
return ok(none(WakuRestServerRef))
|
||||||
|
|
||||||
let requestErrorHandler: RestRequestErrorHandler = proc(
|
let requestErrorHandler: RestRequestErrorHandler = proc(
|
||||||
error: RestRequestError, request: HttpRequestRef
|
error: RestRequestError, request: HttpRequestRef
|
||||||
@ -302,7 +302,7 @@ proc startRestServer(
|
|||||||
paths[1]
|
paths[1]
|
||||||
else:
|
else:
|
||||||
""
|
""
|
||||||
notInstalledTab.withValue(rootPath, errMsg):
|
restServerNotInstalledTab[].withValue(rootPath, errMsg):
|
||||||
return await request.respond(Http404, errMsg[], HttpTable.init())
|
return await request.respond(Http404, errMsg[], HttpTable.init())
|
||||||
do:
|
do:
|
||||||
return await request.respond(
|
return await request.respond(
|
||||||
@ -328,6 +328,8 @@ proc startRestServer(
|
|||||||
else:
|
else:
|
||||||
none(string)
|
none(string)
|
||||||
|
|
||||||
|
let address = conf.restAddress
|
||||||
|
let port = Port(conf.restPort + conf.portsShift)
|
||||||
let server =
|
let server =
|
||||||
?newRestHttpServer(
|
?newRestHttpServer(
|
||||||
address,
|
address,
|
||||||
@ -336,37 +338,64 @@ proc startRestServer(
|
|||||||
requestErrorHandler = requestErrorHandler,
|
requestErrorHandler = requestErrorHandler,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
## Health REST API
|
||||||
|
installHealthApiHandler(server.router, nodeHealthMonitor)
|
||||||
|
|
||||||
|
restServerNotInstalledTab["admin"] =
|
||||||
|
"/admin endpoints are not available while initializing."
|
||||||
|
restServerNotInstalledTab["debug"] =
|
||||||
|
"/debug endpoints are not available while initializing."
|
||||||
|
restServerNotInstalledTab["relay"] =
|
||||||
|
"/relay endpoints are not available while initializing."
|
||||||
|
restServerNotInstalledTab["filter"] =
|
||||||
|
"/filter endpoints are not available while initializing."
|
||||||
|
restServerNotInstalledTab["lightpush"] =
|
||||||
|
"/lightpush endpoints are not available while initializing."
|
||||||
|
restServerNotInstalledTab["store"] =
|
||||||
|
"/store endpoints are not available while initializing."
|
||||||
|
|
||||||
|
server.start()
|
||||||
|
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
|
||||||
|
|
||||||
|
ok(some(server))
|
||||||
|
|
||||||
|
proc startRestServerProtocolSupport(app: var App): AppResult[void] =
|
||||||
|
if not app.conf.rest or app.restServer.isNone():
|
||||||
|
## Maybe we don't need rest server at all, so it is ok.
|
||||||
|
return ok()
|
||||||
|
|
||||||
|
var router = app.restServer.get().router
|
||||||
## Admin REST API
|
## Admin REST API
|
||||||
if conf.restAdmin:
|
if app.conf.restAdmin:
|
||||||
installAdminApiHandlers(server.router, app.node)
|
installAdminApiHandlers(router, app.node)
|
||||||
|
else:
|
||||||
|
restServerNotInstalledTab["admin"] =
|
||||||
|
"/admin endpoints are not available. Please check your configuration: --rest-admin=true"
|
||||||
|
|
||||||
## Debug REST API
|
## Debug REST API
|
||||||
installDebugApiHandlers(server.router, app.node)
|
installDebugApiHandlers(router, app.node)
|
||||||
|
|
||||||
## Health REST API
|
|
||||||
installHealthApiHandler(server.router, app.node)
|
|
||||||
|
|
||||||
## Relay REST API
|
## Relay REST API
|
||||||
if conf.relay:
|
if app.conf.relay:
|
||||||
let cache = MessageCache.init(int(conf.restRelayCacheCapacity))
|
let cache = MessageCache.init(int(app.conf.restRelayCacheCapacity))
|
||||||
|
|
||||||
let handler = messageCacheHandler(cache)
|
let handler = messageCacheHandler(cache)
|
||||||
|
|
||||||
for pubsubTopic in conf.pubsubTopics:
|
for pubsubTopic in app.conf.pubsubTopics:
|
||||||
cache.pubsubSubscribe(pubsubTopic)
|
cache.pubsubSubscribe(pubsubTopic)
|
||||||
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
|
app.node.subscribe((kind: PubsubSub, topic: pubsubTopic), some(handler))
|
||||||
|
|
||||||
for contentTopic in conf.contentTopics:
|
for contentTopic in app.conf.contentTopics:
|
||||||
cache.contentSubscribe(contentTopic)
|
cache.contentSubscribe(contentTopic)
|
||||||
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler))
|
app.node.subscribe((kind: ContentSub, topic: contentTopic), some(handler))
|
||||||
|
|
||||||
installRelayApiHandlers(server.router, app.node, cache)
|
installRelayApiHandlers(router, app.node, cache)
|
||||||
else:
|
else:
|
||||||
notInstalledTab["relay"] =
|
restServerNotInstalledTab["relay"] =
|
||||||
"/relay endpoints are not available. Please check your configuration: --relay"
|
"/relay endpoints are not available. Please check your configuration: --relay"
|
||||||
|
|
||||||
## Filter REST API
|
## Filter REST API
|
||||||
if conf.filternode != "" and app.node.wakuFilterClient != nil:
|
if app.conf.filternode != "" and app.node.wakuFilterClient != nil:
|
||||||
let filterCache = MessageCache.init()
|
let filterCache = MessageCache.init()
|
||||||
|
|
||||||
let filterDiscoHandler =
|
let filterDiscoHandler =
|
||||||
@ -376,10 +405,10 @@ proc startRestServer(
|
|||||||
none(DiscoveryHandler)
|
none(DiscoveryHandler)
|
||||||
|
|
||||||
rest_filter_api.installFilterRestApiHandlers(
|
rest_filter_api.installFilterRestApiHandlers(
|
||||||
server.router, app.node, filterCache, filterDiscoHandler
|
router, app.node, filterCache, filterDiscoHandler
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
notInstalledTab["filter"] =
|
restServerNotInstalledTab["filter"] =
|
||||||
"/filter endpoints are not available. Please check your configuration: --filternode"
|
"/filter endpoints are not available. Please check your configuration: --filternode"
|
||||||
|
|
||||||
## Store REST API
|
## Store REST API
|
||||||
@ -389,10 +418,10 @@ proc startRestServer(
|
|||||||
else:
|
else:
|
||||||
none(DiscoveryHandler)
|
none(DiscoveryHandler)
|
||||||
|
|
||||||
installStoreApiHandlers(server.router, app.node, storeDiscoHandler)
|
installStoreApiHandlers(router, app.node, storeDiscoHandler)
|
||||||
|
|
||||||
## Light push API
|
## Light push API
|
||||||
if conf.lightpushnode != "" and app.node.wakuLightpushClient != nil:
|
if app.conf.lightpushnode != "" and app.node.wakuLightpushClient != nil:
|
||||||
let lightDiscoHandler =
|
let lightDiscoHandler =
|
||||||
if app.wakuDiscv5.isSome():
|
if app.wakuDiscv5.isSome():
|
||||||
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush))
|
some(defaultDiscoveryHandler(app.wakuDiscv5.get(), Lightpush))
|
||||||
@ -400,16 +429,14 @@ proc startRestServer(
|
|||||||
none(DiscoveryHandler)
|
none(DiscoveryHandler)
|
||||||
|
|
||||||
rest_lightpush_api.installLightPushRequestHandler(
|
rest_lightpush_api.installLightPushRequestHandler(
|
||||||
server.router, app.node, lightDiscoHandler
|
router, app.node, lightDiscoHandler
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
notInstalledTab["lightpush"] =
|
restServerNotInstalledTab["lightpush"] =
|
||||||
"/lightpush endpoints are not available. Please check your configuration: --lightpushnode"
|
"/lightpush endpoints are not available. Please check your configuration: --lightpushnode"
|
||||||
|
|
||||||
server.start()
|
info "REST services are installed"
|
||||||
info "Starting REST HTTP server", url = "http://" & $address & ":" & $port & "/"
|
ok()
|
||||||
|
|
||||||
ok(server)
|
|
||||||
|
|
||||||
proc startMetricsServer(
|
proc startMetricsServer(
|
||||||
serverIp: IpAddress, serverPort: Port
|
serverIp: IpAddress, serverPort: Port
|
||||||
@ -434,15 +461,11 @@ proc startMetricsLogging(): AppResult[void] =
|
|||||||
ok()
|
ok()
|
||||||
|
|
||||||
proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
proc setupMonitoringAndExternalInterfaces*(app: var App): AppResult[void] =
|
||||||
if app.conf.rest:
|
if app.conf.rest and app.restServer.isSome():
|
||||||
let startRestServerRes = startRestServer(
|
let restProtocolSupportRes = startRestServerProtocolSupport(app)
|
||||||
app, app.conf.restAddress, Port(app.conf.restPort + app.conf.portsShift), app.conf
|
if restProtocolSupportRes.isErr():
|
||||||
)
|
error "Starting REST server protocol support failed. Continuing in current state.",
|
||||||
if startRestServerRes.isErr():
|
error = restProtocolSupportRes.error
|
||||||
error "Starting REST server failed. Continuing in current state.",
|
|
||||||
error = startRestServerRes.error
|
|
||||||
else:
|
|
||||||
app.restServer = some(startRestServerRes.value)
|
|
||||||
|
|
||||||
if app.conf.metricsServer:
|
if app.conf.metricsServer:
|
||||||
let startMetricsServerRes = startMetricsServer(
|
let startMetricsServerRes = startMetricsServer(
|
||||||
|
|||||||
90
waku/node/health_monitor.nim
Normal file
90
waku/node/health_monitor.nim
Normal file
@ -0,0 +1,90 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import std/[options], chronos
|
||||||
|
|
||||||
|
import waku_node, ../waku_rln_relay
|
||||||
|
|
||||||
|
type
|
||||||
|
HealthStatus* = enum
|
||||||
|
INITIALIZING
|
||||||
|
SYNCHRONIZING
|
||||||
|
READY
|
||||||
|
NOT_READY
|
||||||
|
NOT_MOUNTED
|
||||||
|
SHUTTING_DOWN
|
||||||
|
|
||||||
|
ProtocolHealth* = object
|
||||||
|
protocol*: string
|
||||||
|
health*: HealthStatus
|
||||||
|
|
||||||
|
HealthReport* = object
|
||||||
|
nodeHealth*: HealthStatus
|
||||||
|
protocolsHealth*: seq[ProtocolHealth]
|
||||||
|
|
||||||
|
WakuNodeHealthMonitor* = ref object
|
||||||
|
nodeHealth: HealthStatus
|
||||||
|
node: Option[WakuNode]
|
||||||
|
|
||||||
|
proc `$`*(t: HealthStatus): string =
|
||||||
|
result =
|
||||||
|
case t
|
||||||
|
of INITIALIZING: "Initializing"
|
||||||
|
of SYNCHRONIZING: "Synchronizing"
|
||||||
|
of READY: "Ready"
|
||||||
|
of NOT_READY: "Not Ready"
|
||||||
|
of NOT_MOUNTED: "Not Mounted"
|
||||||
|
of SHUTTING_DOWN: "Shutting Down"
|
||||||
|
|
||||||
|
proc init*(
|
||||||
|
t: typedesc[HealthStatus], strRep: string
|
||||||
|
): HealthStatus {.raises: [ValueError].} =
|
||||||
|
case strRep
|
||||||
|
of "Initializing":
|
||||||
|
return HealthStatus.INITIALIZING
|
||||||
|
of "Synchronizing":
|
||||||
|
return HealthStatus.SYNCHRONIZING
|
||||||
|
of "Ready":
|
||||||
|
return HealthStatus.READY
|
||||||
|
of "Not Ready":
|
||||||
|
return HealthStatus.NOT_READY
|
||||||
|
of "Not Mounted":
|
||||||
|
return HealthStatus.NOT_MOUNTED
|
||||||
|
of "Shutting Down":
|
||||||
|
return HealthStatus.SHUTTING_DOWN
|
||||||
|
else:
|
||||||
|
raise newException(ValueError, "Invalid HealthStatus string representation")
|
||||||
|
|
||||||
|
const FutIsReadyTimout = 5.seconds
|
||||||
|
|
||||||
|
proc getNodeHealthReport*(hm: WakuNodeHealthMonitor): Future[HealthReport] {.async.} =
|
||||||
|
result.nodeHealth = hm.nodeHealth
|
||||||
|
|
||||||
|
if hm.node.isSome() and hm.node.get().wakuRlnRelay != nil:
|
||||||
|
let getRlnRelayHealth = proc(): Future[HealthStatus] {.async.} =
|
||||||
|
let isReadyStateFut = hm.node.get().wakuRlnRelay.isReady()
|
||||||
|
if not await isReadyStateFut.withTimeout(FutIsReadyTimout):
|
||||||
|
return HealthStatus.NOT_READY
|
||||||
|
|
||||||
|
try:
|
||||||
|
if not isReadyStateFut.completed():
|
||||||
|
return HealthStatus.NOT_READY
|
||||||
|
elif isReadyStateFut.read():
|
||||||
|
return HealthStatus.READY
|
||||||
|
|
||||||
|
return HealthStatus.SYNCHRONIZING
|
||||||
|
except:
|
||||||
|
error "exception reading state: " & getCurrentExceptionMsg()
|
||||||
|
return HealthStatus.NOT_READY
|
||||||
|
|
||||||
|
result.protocolsHealth.add(
|
||||||
|
ProtocolHealth(protocol: "Rln Relay", health: await getRlnRelayHealth())
|
||||||
|
)
|
||||||
|
|
||||||
|
proc setNode*(hm: WakuNodeHealthMonitor, node: WakuNode) =
|
||||||
|
hm.node = some(node)
|
||||||
|
|
||||||
|
proc setOverallHealth*(hm: WakuNodeHealthMonitor, health: HealthStatus) =
|
||||||
|
hm.nodeHealth = health
|
||||||
@ -5,11 +5,11 @@ else:
|
|||||||
|
|
||||||
import
|
import
|
||||||
chronicles, json_serialization, json_serialization/std/options, presto/[route, client]
|
chronicles, json_serialization, json_serialization/std/options, presto/[route, client]
|
||||||
import ../serdes, ../responses, ../rest_serdes
|
import ./types, ../serdes, ../responses, ../rest_serdes, ../../waku/node/health_monitor
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "waku node rest health_api"
|
topics = "waku node rest health_api"
|
||||||
|
|
||||||
proc healthCheck*(): RestResponse[string] {.
|
proc healthCheck*(): RestResponse[HealthReport] {.
|
||||||
rest, endpoint: "/health", meth: HttpMethod.MethodGet
|
rest, endpoint: "/health", meth: HttpMethod.MethodGet
|
||||||
.}
|
.}
|
||||||
|
|||||||
@ -4,33 +4,36 @@ else:
|
|||||||
{.push raises: [].}
|
{.push raises: [].}
|
||||||
|
|
||||||
import chronicles, json_serialization, presto/route
|
import chronicles, json_serialization, presto/route
|
||||||
import ../../../waku_node, ../responses, ../serdes
|
import ../../../waku_node, ../responses, ../serdes, ./types
|
||||||
|
|
||||||
logScope:
|
logScope:
|
||||||
topics = "waku node rest health_api"
|
topics = "waku node rest health_api"
|
||||||
|
|
||||||
const ROUTE_HEALTH* = "/health"
|
const ROUTE_HEALTH* = "/health"
|
||||||
|
|
||||||
const FutIsReadyTimout = 5.seconds
|
const FutHealthReportTimeout = 5.seconds
|
||||||
|
|
||||||
proc installHealthApiHandler*(router: var RestRouter, node: WakuNode) =
|
|
||||||
## /health endpoint provides information about node readiness to caller.
|
|
||||||
## Currently it is restricted to checking RLN (if mounted) proper setup
|
|
||||||
## TODO: Leter to extend it to a broader information about each subsystem state
|
|
||||||
## report. Rest response to change to JSON structure that can hold exact detailed
|
|
||||||
## information.
|
|
||||||
|
|
||||||
|
proc installHealthApiHandler*(
|
||||||
|
router: var RestRouter, nodeHealthMonitor: WakuNodeHealthMonitor
|
||||||
|
) =
|
||||||
router.api(MethodGet, ROUTE_HEALTH) do() -> RestApiResponse:
|
router.api(MethodGet, ROUTE_HEALTH) do() -> RestApiResponse:
|
||||||
let isReadyStateFut = node.isReady()
|
let healthReportFut = nodeHealthMonitor.getNodeHealthReport()
|
||||||
if not await isReadyStateFut.withTimeout(FutIsReadyTimout):
|
if not await healthReportFut.withTimeout(FutHealthReportTimeout):
|
||||||
return RestApiResponse.internalServerError("Health check timed out")
|
return RestApiResponse.internalServerError("Health check timed out")
|
||||||
|
|
||||||
var msg = "Node is healthy"
|
var msg = ""
|
||||||
var status = Http200
|
var status = Http200
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if not isReadyStateFut.read():
|
if healthReportFut.completed():
|
||||||
msg = "Node is not ready"
|
let healthReport = healthReportFut.read()
|
||||||
|
return RestApiResponse.jsonResponse(healthReport, Http200).valueOr:
|
||||||
|
debug "An error ocurred while building the json healthReport response",
|
||||||
|
error = error
|
||||||
|
return
|
||||||
|
RestApiResponse.internalServerError("Failed to serialize health report")
|
||||||
|
else:
|
||||||
|
msg = "Health check failed"
|
||||||
status = Http503
|
status = Http503
|
||||||
except:
|
except:
|
||||||
msg = "exception reading state: " & getCurrentExceptionMsg()
|
msg = "exception reading state: " & getCurrentExceptionMsg()
|
||||||
|
|||||||
82
waku/waku_api/rest/health/types.nim
Normal file
82
waku/waku_api/rest/health/types.nim
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
when (NimMajor, NimMinor) < (1, 4):
|
||||||
|
{.push raises: [Defect].}
|
||||||
|
else:
|
||||||
|
{.push raises: [].}
|
||||||
|
|
||||||
|
import
|
||||||
|
chronicles,
|
||||||
|
json_serialization,
|
||||||
|
json_serialization/std/options,
|
||||||
|
std/tables,
|
||||||
|
std/sequtils
|
||||||
|
import ../../../waku_node, ../serdes
|
||||||
|
|
||||||
|
#### Serialization and deserialization
|
||||||
|
|
||||||
|
proc writeValue*(
|
||||||
|
writer: var JsonWriter[RestJson], value: ProtocolHealth
|
||||||
|
) {.raises: [IOError].} =
|
||||||
|
writer.beginRecord()
|
||||||
|
writer.writeField(value.protocol, $value.health)
|
||||||
|
writer.endRecord()
|
||||||
|
|
||||||
|
proc readValue*(
|
||||||
|
reader: var JsonReader[RestJson], value: var ProtocolHealth
|
||||||
|
) {.gcsafe, raises: [SerializationError, IOError].} =
|
||||||
|
var health: HealthStatus
|
||||||
|
var fieldCount = 0
|
||||||
|
|
||||||
|
for fieldName in readObjectFields(reader):
|
||||||
|
if fieldCount > 0:
|
||||||
|
reader.raiseUnexpectedField("Too many fields", "ProtocolHealth")
|
||||||
|
fieldCount += 1
|
||||||
|
|
||||||
|
let fieldValue = reader.readValue(string)
|
||||||
|
try:
|
||||||
|
health = HealthStatus.init(fieldValue)
|
||||||
|
except ValueError:
|
||||||
|
reader.raiseUnexpectedValue("Invalid `health` value")
|
||||||
|
|
||||||
|
value = ProtocolHealth(protocol: fieldName, health: health)
|
||||||
|
|
||||||
|
proc writeValue*(
|
||||||
|
writer: var JsonWriter[RestJson], value: HealthReport
|
||||||
|
) {.raises: [IOError].} =
|
||||||
|
writer.beginRecord()
|
||||||
|
writer.writeField("nodeHealth", $value.nodeHealth)
|
||||||
|
writer.writeField("protocolsHealth", value.protocolsHealth)
|
||||||
|
writer.endRecord()
|
||||||
|
|
||||||
|
proc readValue*(
|
||||||
|
reader: var JsonReader[RestJson], value: var HealthReport
|
||||||
|
) {.raises: [SerializationError, IOError].} =
|
||||||
|
var
|
||||||
|
nodeHealth: Option[HealthStatus]
|
||||||
|
protocolsHealth: Option[seq[ProtocolHealth]]
|
||||||
|
|
||||||
|
for fieldName in readObjectFields(reader):
|
||||||
|
case fieldName
|
||||||
|
of "nodeHealth":
|
||||||
|
if nodeHealth.isSome():
|
||||||
|
reader.raiseUnexpectedField(
|
||||||
|
"Multiple `nodeHealth` fields found", "HealthReport"
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
nodeHealth = some(HealthStatus.init(reader.readValue(string)))
|
||||||
|
except ValueError:
|
||||||
|
reader.raiseUnexpectedValue("Invalid `health` value")
|
||||||
|
of "protocolsHealth":
|
||||||
|
if protocolsHealth.isSome():
|
||||||
|
reader.raiseUnexpectedField(
|
||||||
|
"Multiple `protocolsHealth` fields found", "HealthReport"
|
||||||
|
)
|
||||||
|
|
||||||
|
protocolsHealth = some(reader.readValue(seq[ProtocolHealth]))
|
||||||
|
else:
|
||||||
|
unrecognizedFieldWarning()
|
||||||
|
|
||||||
|
if nodeHealth.isNone():
|
||||||
|
reader.raiseUnexpectedValue("Field `nodeHealth` is missing")
|
||||||
|
|
||||||
|
value =
|
||||||
|
HealthReport(nodeHealth: nodeHealth.get, protocolsHealth: protocolsHealth.get(@[]))
|
||||||
@ -1,3 +1,7 @@
|
|||||||
import ./node/config, ./node/waku_switch as switch, ./node/waku_node as node
|
import
|
||||||
|
./node/config,
|
||||||
|
./node/waku_switch as switch,
|
||||||
|
./node/waku_node as node,
|
||||||
|
./node/health_monitor as health_monitor
|
||||||
|
|
||||||
export config, switch, node
|
export config, switch, node, health_monitor
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user