Merge branch 'master' into add_shard_metrics

This commit is contained in:
Darshan K 2025-06-17 11:36:37 +05:30 committed by GitHub
commit 4fb35dc56b
32 changed files with 692 additions and 527 deletions

View File

@ -0,0 +1,24 @@
import chronos, results, options
import waku/[waku_node, waku_core]
import publisher_base
type LegacyPublisher* = ref object of PublisherBase
proc new*(T: type LegacyPublisher, wakuNode: WakuNode): T =
if isNil(wakuNode.wakuLegacyLightpushClient):
wakuNode.mountLegacyLightPushClient()
return LegacyPublisher(wakuNode: wakuNode)
method send*(
self: LegacyPublisher,
topic: PubsubTopic,
message: WakuMessage,
servicePeer: RemotePeerInfo,
): Future[Result[void, string]] {.async.} =
# when error it must return original error desc due the text is used for distinction between error types in metrics.
discard (
await self.wakuNode.legacyLightpushPublish(some(topic), message, servicePeer)
).valueOr:
return err(error)
return ok()

View File

@ -14,13 +14,11 @@ import
waku/[
common/enr,
common/logging,
factory/waku,
factory/waku as waku_factory,
factory/external_config,
waku_node,
node/health_monitor,
node/waku_metrics,
node/peer_manager,
waku_api/rest/builder as rest_server_builder,
waku_lightpush/common,
waku_filter_v2,
waku_peer_exchange/protocol,
@ -28,8 +26,8 @@ import
waku_core/multiaddrstr,
],
./tester_config,
./lightpush_publisher,
./filter_subscriber,
./publisher,
./receiver,
./diagnose_connections,
./service_peer_management
@ -49,7 +47,7 @@ when isMainModule:
## 5. Start monitoring tools and external interfaces
## 6. Setup graceful shutdown hooks
const versionString = "version / git commit hash: " & waku.git_version
const versionString = "version / git commit hash: " & waku_factory.git_version
let confRes = LiteProtocolTesterConf.load(version = versionString)
if confRes.isErr():
@ -61,7 +59,7 @@ when isMainModule:
## Logging setup
logging.setupLog(conf.logLevel, conf.logFormat)
info "Running Lite Protocol Tester node", version = waku.git_version
info "Running Lite Protocol Tester node", version = waku_factory.git_version
logConfig(conf)
##Prepare Waku configuration
@ -69,13 +67,13 @@ when isMainModule:
## - override according to tester functionality
##
var wakuConf: WakuNodeConf
var wakuNodeConf: WakuNodeConf
if conf.configFile.isSome():
try:
var configFile {.threadvar.}: InputFile
configFile = conf.configFile.get()
wakuConf = WakuNodeConf.load(
wakuNodeConf = WakuNodeConf.load(
version = versionString,
printUsage = false,
secondarySources = proc(
@ -88,81 +86,54 @@ when isMainModule:
error "Loading Waku configuration failed", error = getCurrentExceptionMsg()
quit(QuitFailure)
wakuConf.logLevel = conf.logLevel
wakuConf.logFormat = conf.logFormat
wakuConf.nat = conf.nat
wakuConf.maxConnections = 500
wakuConf.restAddress = conf.restAddress
wakuConf.restPort = conf.restPort
wakuConf.restAllowOrigin = conf.restAllowOrigin
wakuNodeConf.logLevel = conf.logLevel
wakuNodeConf.logFormat = conf.logFormat
wakuNodeConf.nat = conf.nat
wakuNodeConf.maxConnections = 500
wakuNodeConf.restAddress = conf.restAddress
wakuNodeConf.restPort = conf.restPort
wakuNodeConf.restAllowOrigin = conf.restAllowOrigin
wakuConf.dnsAddrsNameServers = @[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")]
wakuNodeConf.dnsAddrsNameServers =
@[parseIpAddress("8.8.8.8"), parseIpAddress("1.1.1.1")]
wakuConf.shards = @[conf.shard]
wakuConf.contentTopics = conf.contentTopics
wakuConf.clusterId = conf.clusterId
wakuNodeConf.shards = @[conf.shard]
wakuNodeConf.contentTopics = conf.contentTopics
wakuNodeConf.clusterId = conf.clusterId
## TODO: Depending on the tester needs we might extend here with shards, clusterId, etc...
wakuConf.metricsServer = true
wakuConf.metricsServerAddress = parseIpAddress("0.0.0.0")
wakuConf.metricsServerPort = conf.metricsPort
wakuNodeConf.metricsServer = true
wakuNodeConf.metricsServerAddress = parseIpAddress("0.0.0.0")
wakuNodeConf.metricsServerPort = conf.metricsPort
# If bootstrap option is chosen we expect our clients will not mounted
# so we will mount PeerExchange manually to gather possible service peers,
# if got some we will mount the client protocols afterward.
wakuConf.peerExchange = false
wakuConf.relay = false
wakuConf.filter = false
wakuConf.lightpush = false
wakuConf.store = false
wakuNodeConf.peerExchange = false
wakuNodeConf.relay = false
wakuNodeConf.filter = false
wakuNodeConf.lightpush = false
wakuNodeConf.store = false
wakuConf.rest = false
wakuConf.relayServiceRatio = "40:60"
wakuNodeConf.rest = false
wakuNodeConf.relayServiceRatio = "40:60"
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
# It will always be called from main thread anyway.
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
var nodeHealthMonitor {.threadvar.}: WakuNodeHealthMonitor
nodeHealthMonitor = WakuNodeHealthMonitor()
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
let restServer = rest_server_builder.startRestServerEssentials(
nodeHealthMonitor, wakuConf
).valueOr:
error "Starting esential REST server failed.", error = $error
let wakuConf = wakuNodeConf.toWakuConf().valueOr:
error "Issue converting toWakuConf", error = $error
quit(QuitFailure)
var wakuApp = Waku.new(wakuConf).valueOr:
var waku = Waku.new(wakuConf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)
wakuApp.restServer = restServer
nodeHealthMonitor.setNode(wakuApp.node)
(waitFor startWaku(addr wakuApp)).isOkOr:
(waitFor startWaku(addr waku)).isOkOr:
error "Starting waku failed", error = error
quit(QuitFailure)
rest_server_builder.startRestServerProtocolSupport(
restServer, wakuApp.node, wakuApp.wakuDiscv5, wakuConf
).isOkOr:
error "Starting protocols support REST server failed.", error = $error
quit(QuitFailure)
wakuApp.metricsServer = waku_metrics.startMetricsServerAndLogging(wakuConf).valueOr:
error "Starting monitoring and external interfaces failed", error = error
quit(QuitFailure)
nodeHealthMonitor.setOverallHealth(HealthStatus.READY)
debug "Setting up shutdown hooks"
## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown.
proc asyncStopper(wakuApp: Waku) {.async: (raises: [Exception]).} =
nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
await wakuApp.stop()
proc asyncStopper(waku: Waku) {.async: (raises: [Exception]).} =
await waku.stop()
quit(QuitSuccess)
# Handle Ctrl-C SIGINT
@ -171,7 +142,7 @@ when isMainModule:
# workaround for https://github.com/nim-lang/Nim/issues/4057
setupForeignThreadGc()
notice "Shutting down after receiving SIGINT"
asyncSpawn asyncStopper(wakuApp)
asyncSpawn asyncStopper(waku)
setControlCHook(handleCtrlC)
@ -179,7 +150,7 @@ when isMainModule:
when defined(posix):
proc handleSigterm(signal: cint) {.noconv.} =
notice "Shutting down after receiving SIGTERM"
asyncSpawn asyncStopper(wakuApp)
asyncSpawn asyncStopper(waku)
c_signal(ansi_c.SIGTERM, handleSigterm)
@ -192,7 +163,7 @@ when isMainModule:
# Not available in -d:release mode
writeStackTrace()
waitFor wakuApp.stop()
waitFor waku.stop()
quit(QuitFailure)
c_signal(ansi_c.SIGSEGV, handleSigsegv)
@ -211,7 +182,7 @@ when isMainModule:
if conf.serviceNode.len == 0:
if conf.bootstrapNode.len > 0:
info "Bootstrapping with PeerExchange to gather random service node"
let futForServiceNode = pxLookupServiceNode(wakuApp.node, conf)
let futForServiceNode = pxLookupServiceNode(waku.node, conf)
if not (waitFor futForServiceNode.withTimeout(20.minutes)):
error "Service node not found in time via PX"
quit(QuitFailure)
@ -221,7 +192,7 @@ when isMainModule:
quit(QuitFailure)
serviceNodePeerInfo = selectRandomServicePeer(
wakuApp.node.peerManager, none(RemotePeerInfo), codec
waku.node.peerManager, none(RemotePeerInfo), codec
).valueOr:
error "Service node selection failed"
quit(QuitFailure)
@ -236,11 +207,11 @@ when isMainModule:
info "Service node to be used", serviceNode = $serviceNodePeerInfo
logSelfPeers(wakuApp.node.peerManager)
logSelfPeers(waku.node.peerManager)
if conf.testFunc == TesterFunctionality.SENDER:
setupAndPublish(wakuApp.node, conf, serviceNodePeerInfo)
setupAndPublish(waku.node, conf, serviceNodePeerInfo)
else:
setupAndSubscribe(wakuApp.node, conf, serviceNodePeerInfo)
setupAndListen(waku.node, conf, serviceNodePeerInfo)
runForever()

View File

@ -24,8 +24,8 @@ def run_tester_node(predefined_test_env):
return os.system(script_cmd)
if __name__ == "__main__":
if len(sys.argv) < 2 or sys.argv[1] not in ["RECEIVER", "SENDER"]:
print("Error: First argument must be either 'RECEIVER' or 'SENDER'")
if len(sys.argv) < 2 or sys.argv[1] not in ["RECEIVER", "SENDER", "SENDERV3"]:
print("Error: First argument must be either 'RECEIVER' or 'SENDER' or 'SENDERV3'")
sys.exit(1)
predefined_test_env_file = '/usr/bin/infra.env'

View File

@ -21,14 +21,17 @@ import
./tester_message,
./lpt_metrics,
./diagnose_connections,
./service_peer_management
./service_peer_management,
./publisher_base,
./legacy_publisher,
./v3_publisher
randomize()
type SizeRange* = tuple[min: uint64, max: uint64]
var RANDOM_PALYLOAD {.threadvar.}: seq[byte]
RANDOM_PALYLOAD = urandom(1024 * 1024)
var RANDOM_PAYLOAD {.threadvar.}: seq[byte]
RANDOM_PAYLOAD = urandom(1024 * 1024)
# 1MiB of random payload to be used to extend message
proc prepareMessage(
@ -59,9 +62,8 @@ proc prepareMessage(
if renderSize < len(contentPayload).uint64:
renderSize = len(contentPayload).uint64
let finalPayload = concat(
contentPayload, RANDOM_PALYLOAD[0 .. renderSize - len(contentPayload).uint64]
)
let finalPayload =
concat(contentPayload, RANDOM_PAYLOAD[0 .. renderSize - len(contentPayload).uint64])
let message = WakuMessage(
payload: finalPayload, # content of the message
contentTopic: contentTopic, # content topic to publish to
@ -108,6 +110,7 @@ proc reportSentMessages() =
proc publishMessages(
wakuNode: WakuNode,
publisher: PublisherBase,
servicePeer: RemotePeerInfo,
lightpushPubsubTopic: PubsubTopic,
lightpushContentTopic: ContentTopic,
@ -148,9 +151,7 @@ proc publishMessages(
let publishStartTime = Moment.now()
let wlpRes = await wakuNode.legacyLightpushPublish(
some(lightpushPubsubTopic), message, actualServicePeer
)
let wlpRes = await publisher.send(lightpushPubsubTopic, message, actualServicePeer)
let publishDuration = Moment.now() - publishStartTime
@ -213,10 +214,13 @@ proc publishMessages(
proc setupAndPublish*(
wakuNode: WakuNode, conf: LiteProtocolTesterConf, servicePeer: RemotePeerInfo
) =
if isNil(wakuNode.wakuLightpushClient):
# if we have not yet initialized lightpush client, then do it as the only way we can get here is
# by having a service peer discovered.
wakuNode.mountLegacyLightPushClient()
var publisher: PublisherBase
if conf.lightpushVersion == LightpushVersion.LEGACY:
info "Using legacy lightpush protocol for publishing messages"
publisher = LegacyPublisher.new(wakuNode)
else:
info "Using lightpush v3 protocol for publishing messages"
publisher = V3Publisher.new(wakuNode)
# give some time to receiver side to set up
let waitTillStartTesting = conf.startPublishingAfter.seconds
@ -257,6 +261,7 @@ proc setupAndPublish*(
# Start maintaining subscription
asyncSpawn publishMessages(
wakuNode,
publisher,
servicePeer,
conf.getPubsubTopic(),
conf.contentTopics[0],

View File

@ -0,0 +1,14 @@
import chronos, results
import waku/[waku_node, waku_core]
type PublisherBase* = ref object of RootObj
wakuNode*: WakuNode
method send*(
self: PublisherBase,
topic: PubsubTopic,
message: WakuMessage,
servicePeer: RemotePeerInfo,
): Future[Result[void, string]] {.base, async.} =
discard
# when error it must return original error desc due the text is used for distinction between error types in metrics.

View File

@ -116,7 +116,7 @@ proc maintainSubscription(
await sleepAsync(30.seconds) # Subscription maintenance interval
proc setupAndSubscribe*(
proc setupAndListen*(
wakuNode: WakuNode, conf: LiteProtocolTesterConf, servicePeer: RemotePeerInfo
) =
if isNil(wakuNode.wakuFilterClient):

View File

@ -25,7 +25,12 @@ fi
FUNCTION=$2
if [ "${FUNCTION}" = "SENDER" ]; then
FUNCTION=--test-func=SENDER
FUNCTION="--test-func=SENDER --lightpush-version=LEGACY"
SERVICENAME=lightpush-service
fi
if [ "${FUNCTION}" = "SENDERV3" ]; then
FUNCTION="--test-func=SENDER --lightpush-version=V3"
SERVICENAME=lightpush-service
fi

View File

@ -26,7 +26,15 @@ fi
FUNCTION=$2
if [ "${FUNCTION}" = "SENDER" ]; then
FUNCTION=--test-func=SENDER
FUNCTION="--test-func=SENDER --lightpush-version=LEGACY"
SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}}
NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"}
NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"}
METRICS_PORT=--metrics-port="${PUBLISHER_METRICS_PORT:-8003}"
fi
if [ "${FUNCTION}" = "SENDERV3" ]; then
FUNCTION="--test-func=SENDER --lightpush-version=V3"
SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}}
NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"}
NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"}

View File

@ -26,7 +26,15 @@ fi
FUNCTION=$2
if [ "${FUNCTION}" = "SENDER" ]; then
FUNCTION=--test-func=SENDER
FUNCTION="--test-func=SENDER --lightpush-version=LEGACY"
SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}}
NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"}
NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"}
METRICS_PORT=--metrics-port="${PUBLISHER_METRICS_PORT:-8003}"
fi
if [ "${FUNCTION}" = "SENDERV3" ]; then
FUNCTION="--test-func=SENDER --lightpush-version=V3"
SERIVCE_NODE_ADDR=${LIGHTPUSH_SERVICE_PEER:-${LIGHTPUSH_BOOTSTRAP:-}}
NODE_ARG=${LIGHTPUSH_SERVICE_PEER:+--service-node="${LIGHTPUSH_SERVICE_PEER}"}
NODE_ARG=${NODE_ARG:---bootstrap-node="${LIGHTPUSH_BOOTSTRAP}"}

View File

@ -158,9 +158,7 @@ proc tryCallAllPxPeers*(
proc pxLookupServiceNode*(
node: WakuNode, conf: LiteProtocolTesterConf
): Future[Result[bool, void]] {.async.} =
var codec: string = WakuLightPushCodec
if conf.testFunc == TesterFunctionality.RECEIVER:
codec = WakuFilterSubscribeCodec
let codec: string = conf.getCodec()
if node.wakuPeerExchange.isNil():
let peerExchangeNode = translateToRemotePeerInfo(conf.bootstrapNode).valueOr:

View File

@ -33,6 +33,10 @@ type TesterFunctionality* = enum
SENDER # pumps messages to the network
RECEIVER # gather and analyze messages from the network
type LightpushVersion* = enum
LEGACY # legacy lightpush protocol
V3 # lightpush v3 protocol
type LiteProtocolTesterConf* = object
configFile* {.
desc:
@ -80,6 +84,12 @@ type LiteProtocolTesterConf* = object
name: "test-func"
.}: TesterFunctionality
lightpushVersion* {.
desc: "Version of the sender to use. Supported values: legacy, v3.",
defaultValue: LightpushVersion.LEGACY,
name: "lightpush-version"
.}: LightpushVersion
numMessages* {.
desc: "Number of messages to send.", defaultValue: 120, name: "num-messages"
.}: uint32
@ -190,4 +200,14 @@ proc load*(T: type LiteProtocolTesterConf, version = ""): ConfResult[T] =
proc getPubsubTopic*(conf: LiteProtocolTesterConf): PubsubTopic =
return $RelayShard(clusterId: conf.clusterId, shardId: conf.shard)
proc getCodec*(conf: LiteProtocolTesterConf): string =
return
if conf.testFunc == TesterFunctionality.RECEIVER:
WakuFilterSubscribeCodec
else:
if conf.lightpushVersion == LightpushVersion.LEGACY:
WakuLegacyLightPushCodec
else:
WakuLightPushCodec
{.pop.}

View File

@ -0,0 +1,29 @@
import results, options, chronos
import waku/[waku_node, waku_core, waku_lightpush]
import publisher_base
type V3Publisher* = ref object of PublisherBase
proc new*(T: type V3Publisher, wakuNode: WakuNode): T =
if isNil(wakuNode.wakuLightpushClient):
wakuNode.mountLightPushClient()
return V3Publisher(wakuNode: wakuNode)
method send*(
self: V3Publisher,
topic: PubsubTopic,
message: WakuMessage,
servicePeer: RemotePeerInfo,
): Future[Result[void, string]] {.async.} =
# when error it must return original error desc due the text is used for distinction between error types in metrics.
discard (
await self.wakuNode.lightpushPublish(some(topic), message, some(servicePeer))
).valueOr:
if error.code == NO_PEERS_TO_RELAY and
error.desc != some("No peers for topic, skipping publish"):
# TODO: We need better separation of errors happening on the client side or the server side.-
return err("dial_failure")
else:
return err($error.code)
return ok()

View File

@ -16,7 +16,6 @@ import
factory/external_config,
factory/waku,
node/health_monitor,
node/waku_metrics,
waku_api/rest/builder as rest_server_builder,
]
@ -53,69 +52,21 @@ when isMainModule:
let conf = wakuNodeConf.toInspectRlnDbConf()
doInspectRlnDb(conf)
of noCommand:
# NOTE: {.threadvar.} is used to make the global variable GC safe for the closure uses it
# It will always be called from main thread anyway.
# Ref: https://nim-lang.org/docs/manual.html#threads-gc-safety
var nodeHealthMonitor {.threadvar.}: WakuNodeHealthMonitor
nodeHealthMonitor = WakuNodeHealthMonitor()
nodeHealthMonitor.setOverallHealth(HealthStatus.INITIALIZING)
let conf = wakuNodeConf.toWakuConf().valueOr:
error "Waku configuration failed", error = error
quit(QuitFailure)
var restServer: WakuRestServerRef = nil
if conf.restServerConf.isSome():
restServer = rest_server_builder.startRestServerEssentials(
nodeHealthMonitor, conf.restServerConf.get(), conf.portsShift
).valueOr:
error "Starting essential REST server failed.", error = $error
quit(QuitFailure)
var waku = Waku.new(conf).valueOr:
error "Waku initialization failed", error = error
quit(QuitFailure)
waku.restServer = restServer
nodeHealthMonitor.setNode(waku.node)
(waitFor startWaku(addr waku)).isOkOr:
error "Starting waku failed", error = error
quit(QuitFailure)
if conf.restServerConf.isSome():
rest_server_builder.startRestServerProtocolSupport(
restServer,
waku.node,
waku.wakuDiscv5,
conf.restServerConf.get(),
conf.relay,
conf.lightPush,
conf.clusterId,
conf.shards,
conf.contentTopics,
).isOkOr:
error "Starting protocols support REST server failed.", error = $error
quit(QuitFailure)
if conf.metricsServerConf.isSome():
waku.metricsServer = waku_metrics.startMetricsServerAndLogging(
conf.metricsServerConf.get(), conf.portsShift
).valueOr:
error "Starting monitoring and external interfaces failed", error = error
quit(QuitFailure)
nodeHealthMonitor.setOverallHealth(HealthStatus.READY)
debug "Setting up shutdown hooks"
## Setup shutdown hooks for this process.
## Stop node gracefully on shutdown.
proc asyncStopper(node: Waku) {.async: (raises: [Exception]).} =
nodeHealthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
await node.stop()
proc asyncStopper(waku: Waku) {.async: (raises: [Exception]).} =
await waku.stop()
quit(QuitSuccess)
# Handle Ctrl-C SIGINT

View File

@ -878,8 +878,8 @@ proc waku_is_online(
handleRequest(
ctx,
RequestType.PEER_MANAGER,
PeerManagementRequest.createShared(PeerManagementMsgType.IS_ONLINE),
RequestType.DEBUG,
DebugNodeRequest.createShared(DebugNodeMsgType.RETRIEVE_ONLINE_STATE),
callback,
userData,
)

View File

@ -7,13 +7,17 @@ import
strutils,
libp2p/peerid,
metrics
import ../../../../waku/factory/waku, ../../../../waku/node/waku_node
import
../../../../waku/factory/waku,
../../../../waku/node/waku_node,
../../../../waku/node/health_monitor
type DebugNodeMsgType* = enum
RETRIEVE_LISTENING_ADDRESSES
RETRIEVE_MY_ENR
RETRIEVE_MY_PEER_ID
RETRIEVE_METRICS
RETRIEVE_ONLINE_STATE
type DebugNodeRequest* = object
operation: DebugNodeMsgType
@ -49,6 +53,8 @@ proc process*(
return ok($waku.node.peerId())
of RETRIEVE_METRICS:
return ok(getMetrics())
of RETRIEVE_ONLINE_STATE:
return ok($waku.healthMonitor.onlineMonitor.amIOnline())
error "unsupported operation in DebugNodeRequest"
return err("unsupported operation in DebugNodeRequest")

View File

@ -8,6 +8,7 @@ import
../../../../waku/factory/node_factory,
../../../../waku/factory/networks_config,
../../../../waku/factory/app_callbacks,
../../../../waku/waku_api/rest/builder,
../../../alloc
type NodeLifecycleMsgType* = enum
@ -73,9 +74,11 @@ proc createWaku(
appCallbacks.topicHealthChangeHandler = nil
# TODO: Convert `confJson` directly to `WakuConf`
let wakuConf = conf.toWakuConf().valueOr:
var wakuConf = conf.toWakuConf().valueOr:
return err("Configuration error: " & $error)
wakuConf.restServerConf = none(RestServerConf) ## don't want REST in libwaku
let wakuRes = Waku.new(wakuConf, appCallbacks).valueOr:
error "waku initialization failed", error = error
return err("Failed setting up Waku: " & $error)

View File

@ -16,7 +16,6 @@ type PeerManagementMsgType* {.pure.} = enum
DIAL_PEER
DIAL_PEER_BY_ID
GET_CONNECTED_PEERS
IS_ONLINE
type PeerManagementRequest* = object
operation: PeerManagementMsgType
@ -156,7 +155,5 @@ proc process*(
(inPeerIds, outPeerIds) = waku.node.peerManager.connectedPeers()
connectedPeerids = concat(inPeerIds, outPeerIds)
return ok(connectedPeerids.mapIt($it).join(","))
of IS_ONLINE:
return ok($waku.node.peerManager.isOnline())
return ok("")

View File

@ -39,7 +39,7 @@ suite "Waku v2 REST API - health":
asyncTest "Get node health info - GET /health":
# Given
let node = testWakuNode()
let healthMonitor = WakuNodeHealthMonitor()
let healthMonitor = NodeHealthMonitor()
await node.start()
(await node.mountRelay()).isOkOr:
assert false, "Failed to mount relay"
@ -78,7 +78,7 @@ suite "Waku v2 REST API - health":
node.mountLightPushClient()
await node.mountFilterClient()
healthMonitor.setNode(node)
healthMonitor.setNodeToHealthMonitor(node)
healthMonitor.setOverallHealth(HealthStatus.READY)
# When
response = await client.healthCheck()

View File

@ -209,7 +209,6 @@ proc build*(builder: WakuNodeBuilder): Result[WakuNode, string] =
maxServicePeers = some(builder.maxServicePeers),
colocationLimit = builder.colocationLimit,
shardedPeerManagement = builder.shardAware,
dnsNameServers = netConfig.dnsNameServers,
)
var node: WakuNode

View File

@ -426,7 +426,7 @@ proc startNode*(
## Connect to static nodes and start
## keep-alive, if configured.
# Start Waku v2 node
info "Running nwaku node", version = git_version
try:
await node.start()
except CatchableError:

View File

@ -26,9 +26,11 @@ import
../waku_node,
../node/peer_manager,
../node/health_monitor,
../node/waku_metrics,
../node/delivery_monitor/delivery_monitor,
../waku_api/message_cache,
../waku_api/rest/server,
../waku_api/rest/builder as rest_server_builder,
../waku_archive,
../waku_relay/protocol,
../discovery/waku_dnsdisc,
@ -66,6 +68,8 @@ type Waku* = ref object
node*: WakuNode
healthMonitor*: NodeHealthMonitor
deliveryMonitor: DeliveryMonitor
restServer*: WakuRestServerRef
@ -159,19 +163,33 @@ proc new*(
logging.setupLog(wakuConf.logLevel, wakuConf.logFormat)
?wakuConf.validate()
wakuConf.logConf()
info "Running nwaku node", version = git_version
let healthMonitor = NodeHealthMonitor.new(wakuConf.dnsAddrsNameServers)
let restServer: WakuRestServerRef =
if wakuConf.restServerConf.isSome():
let restServer = startRestServerEssentials(
healthMonitor, wakuConf.restServerConf.get(), wakuConf.portsShift
).valueOr:
error "Starting essential REST server failed", error = $error
return err("Failed to start essential REST server in Waku.new: " & $error)
restServer
else:
nil
var relay = newCircuitRelay(wakuConf.circuitRelayClient)
let nodeRes = setupNode(wakuConf, rng, relay)
if nodeRes.isErr():
error "Failed setting up node", error = nodeRes.error
return err("Failed setting up node: " & nodeRes.error)
let node = setupNode(wakuConf, rng, relay).valueOr:
error "Failed setting up node", error = $error
return err("Failed setting up node: " & $error)
let node = nodeRes.get()
healthMonitor.setNodeToHealthMonitor(node)
healthMonitor.onlineMonitor.setPeerStoreToOnlineMonitor(node.switch.peerStore)
healthMonitor.onlineMonitor.addOnlineStateObserver(
node.peerManager.getOnlineStateObserver()
)
node.setupAppCallbacks(wakuConf, appCallbacks).isOkOr:
error "Failed setting up app callbacks", error = error
@ -197,8 +215,10 @@ proc new*(
rng: rng,
key: wakuConf.nodeKey,
node: node,
healthMonitor: healthMonitor,
deliveryMonitor: deliveryMonitor,
appCallbacks: appCallbacks,
restServer: restServer,
)
waku.setupSwitchServices(wakuConf, relay, rng)
@ -334,15 +354,6 @@ proc startDnsDiscoveryRetryLoop(waku: ptr Waku): Future[void] {.async.} =
error "failed to connect to dynamic bootstrap nodes: " & getCurrentExceptionMsg()
return
# The network connectivity loop checks periodically whether the node is online or not
# and triggers any change that depends on the network connectivity state
proc startNetworkConnectivityLoop(waku: Waku): Future[void] {.async.} =
while true:
await sleepAsync(15.seconds)
# Update online state
await waku.node.peerManager.updateOnlineState()
proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
debug "Retrieve dynamic bootstrap nodes"
let conf = waku[].conf
@ -369,7 +380,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
return err("Error in updateApp: " & $error)
## Discv5
if conf.discv5Conf.isSome:
if conf.discv5Conf.isSome():
waku[].wakuDiscV5 = waku_discv5.setupDiscoveryV5(
waku.node.enr,
waku.node.peerManager,
@ -389,23 +400,41 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async.} =
if not waku[].deliveryMonitor.isNil():
waku[].deliveryMonitor.startDeliveryMonitor()
# Start network connectivity check loop
waku[].networkConnLoopHandle = waku[].startNetworkConnectivityLoop()
## Health Monitor
waku[].healthMonitor.startHealthMonitor()
if conf.restServerConf.isSome():
rest_server_builder.startRestServerProtocolSupport(
waku[].restServer,
waku[].node,
waku[].wakuDiscv5,
conf.restServerConf.get(),
conf.relay,
conf.lightPush,
conf.clusterId,
conf.shards,
conf.contentTopics,
).isOkOr:
return err ("Starting protocols support REST server failed: " & $error)
if conf.metricsServerConf.isSome():
waku[].metricsServer = waku_metrics.startMetricsServerAndLogging(
conf.metricsServerConf.get(), conf.portsShift
).valueOr:
return err("Starting monitoring and external interfaces failed: " & error)
waku[].healthMonitor.setOverallHealth(HealthStatus.READY)
return ok()
# Waku shutdown
proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} =
if not waku.restServer.isNil():
await waku.restServer.stop()
## Waku shutdown
waku.healthMonitor.setOverallHealth(HealthStatus.SHUTTING_DOWN)
if not waku.metricsServer.isNil():
await waku.metricsServer.stop()
if not waku.networkConnLoopHandle.isNil():
await waku.networkConnLoopHandle.cancelAndWait()
if not waku.wakuDiscv5.isNil():
await waku.wakuDiscv5.stop()
@ -414,3 +443,9 @@ proc stop*(waku: Waku): Future[void] {.async: (raises: [Exception]).} =
if not waku.dnsRetryLoopHandle.isNil():
await waku.dnsRetryLoopHandle.cancelAndWait()
if not waku.healthMonitor.isNil():
await waku.healthMonitor.stopHealthMonitor()
if not waku.restServer.isNil():
await waku.restServer.stop()

View File

@ -1,293 +1,4 @@
{.push raises: [].}
import
health_monitor/[node_health_monitor, protocol_health, online_monitor, health_status]
import std/[options, sets], chronos, libp2p/protocols/rendezvous
import waku_node, ../waku_rln_relay, ../waku_relay, ./peer_manager
type
HealthStatus* = enum
INITIALIZING
SYNCHRONIZING
READY
NOT_READY
NOT_MOUNTED
SHUTTING_DOWN
ProtocolHealth* = object
protocol*: string
health*: HealthStatus
desc*: Option[string] ## describes why a certain protocol is considered `NOT_READY`
HealthReport* = object
nodeHealth*: HealthStatus
protocolsHealth*: seq[ProtocolHealth]
WakuNodeHealthMonitor* = ref object
nodeHealth: HealthStatus
node: Option[WakuNode]
proc `$`*(t: HealthStatus): string =
result =
case t
of INITIALIZING: "Initializing"
of SYNCHRONIZING: "Synchronizing"
of READY: "Ready"
of NOT_READY: "Not Ready"
of NOT_MOUNTED: "Not Mounted"
of SHUTTING_DOWN: "Shutting Down"
proc init*(
t: typedesc[HealthStatus], strRep: string
): HealthStatus {.raises: [ValueError].} =
case strRep
of "Initializing":
return HealthStatus.INITIALIZING
of "Synchronizing":
return HealthStatus.SYNCHRONIZING
of "Ready":
return HealthStatus.READY
of "Not Ready":
return HealthStatus.NOT_READY
of "Not Mounted":
return HealthStatus.NOT_MOUNTED
of "Shutting Down":
return HealthStatus.SHUTTING_DOWN
else:
raise newException(ValueError, "Invalid HealthStatus string representation")
proc init*(p: typedesc[ProtocolHealth], protocol: string): ProtocolHealth =
let p = ProtocolHealth(
protocol: protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]()
)
return p
proc notReady(p: var ProtocolHealth, desc: string): ProtocolHealth =
p.health = HealthStatus.NOT_READY
p.desc = some(desc)
return p
proc ready(p: var ProtocolHealth): ProtocolHealth =
p.health = HealthStatus.READY
p.desc = none[string]()
return p
proc notMounted(p: var ProtocolHealth): ProtocolHealth =
p.health = HealthStatus.NOT_MOUNTED
p.desc = none[string]()
return p
proc synchronizing(p: var ProtocolHealth): ProtocolHealth =
p.health = HealthStatus.SYNCHRONIZING
p.desc = none[string]()
return p
proc initializing(p: var ProtocolHealth): ProtocolHealth =
p.health = HealthStatus.INITIALIZING
p.desc = none[string]()
return p
proc shuttingDown(p: var ProtocolHealth): ProtocolHealth =
p.health = HealthStatus.SHUTTING_DOWN
p.desc = none[string]()
return p
const FutIsReadyTimout = 5.seconds
proc getRelayHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Relay")
if hm.node.get().wakuRelay == nil:
return p.notMounted()
let relayPeers = hm.node
.get().wakuRelay
.getConnectedPubSubPeers(pubsubTopic = "").valueOr:
return p.notMounted()
if relayPeers.len() == 0:
return p.notReady("No connected peers")
return p.ready()
proc getRlnRelayHealth(hm: WakuNodeHealthMonitor): Future[ProtocolHealth] {.async.} =
var p = ProtocolHealth.init("Rln Relay")
if hm.node.get().wakuRlnRelay.isNil():
return p.notMounted()
let isReadyStateFut = hm.node.get().wakuRlnRelay.isReady()
if not await isReadyStateFut.withTimeout(FutIsReadyTimout):
return p.notReady("Ready state check timed out")
try:
if not isReadyStateFut.completed():
return p.notReady("Ready state check timed out")
elif isReadyStateFut.read():
return p.ready()
return p.synchronizing()
except:
error "exception reading state: " & getCurrentExceptionMsg()
return p.notReady("State cannot be determined")
proc getLightpushHealth(
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Lightpush")
if hm.node.get().wakuLightPush == nil:
return p.notMounted()
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Node has no relay peers to fullfill push requests")
proc getLightpushClientHealth(
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Lightpush Client")
if hm.node.get().wakuLightpushClient == nil:
return p.notMounted()
let selfServiceAvailable =
hm.node.get().wakuLightPush != nil and relayHealth == HealthStatus.READY
let servicePeerAvailable =
hm.node.get().peerManager.selectPeer(WakuLightPushCodec).isSome()
if selfServiceAvailable or servicePeerAvailable:
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getLegacyLightpushHealth(
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Lightpush")
if hm.node.get().wakuLegacyLightPush == nil:
return p.notMounted()
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Node has no relay peers to fullfill push requests")
proc getLegacyLightpushClientHealth(
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Lightpush Client")
if hm.node.get().wakuLegacyLightpushClient == nil:
return p.notMounted()
if (hm.node.get().wakuLegacyLightPush != nil and relayHealth == HealthStatus.READY) or
hm.node.get().peerManager.selectPeer(WakuLegacyLightPushCodec).isSome():
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getFilterHealth(
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Filter")
if hm.node.get().wakuFilter == nil:
return p.notMounted()
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Relay is not ready, filter will not be able to sort out messages")
proc getFilterClientHealth(
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Filter Client")
if hm.node.get().wakuFilterClient == nil:
return p.notMounted()
if hm.node.get().peerManager.selectPeer(WakuFilterSubscribeCodec).isSome():
return p.ready()
return p.notReady("No Filter service peer available yet")
proc getStoreHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Store")
if hm.node.get().wakuStore == nil:
return p.notMounted()
return p.ready()
proc getStoreClientHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Store Client")
if hm.node.get().wakuStoreClient == nil:
return p.notMounted()
if hm.node.get().peerManager.selectPeer(WakuStoreCodec).isSome() or
hm.node.get().wakuStore != nil:
return p.ready()
return p.notReady(
"No Store service peer available yet, neither Store service set up for the node"
)
proc getLegacyStoreHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Store")
if hm.node.get().wakuLegacyStore == nil:
return p.notMounted()
return p.ready()
proc getLegacyStoreClientHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Store Client")
if hm.node.get().wakuLegacyStoreClient == nil:
return p.notMounted()
if hm.node.get().peerManager.selectPeer(WakuLegacyStoreCodec).isSome() or
hm.node.get().wakuLegacyStore != nil:
return p.ready()
return p.notReady(
"No Legacy Store service peers are available yet, neither Store service set up for the node"
)
proc getPeerExchangeHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Peer Exchange")
if hm.node.get().wakuPeerExchange == nil:
return p.notMounted()
return p.ready()
proc getRendezvousHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Rendezvous")
if hm.node.get().wakuRendezvous == nil:
return p.notMounted()
if hm.node.get().peerManager.switch.peerStore.peers(RendezVousCodec).len() == 0:
return p.notReady("No Rendezvous peers are available yet")
return p.ready()
proc getNodeHealthReport*(hm: WakuNodeHealthMonitor): Future[HealthReport] {.async.} =
var report: HealthReport
report.nodeHealth = hm.nodeHealth
if hm.node.isSome():
let relayHealth = hm.getRelayHealth()
report.protocolsHealth.add(relayHealth)
report.protocolsHealth.add(await hm.getRlnRelayHealth())
report.protocolsHealth.add(hm.getLightpushHealth(relayHealth.health))
report.protocolsHealth.add(hm.getLegacyLightpushHealth(relayHealth.health))
report.protocolsHealth.add(hm.getFilterHealth(relayHealth.health))
report.protocolsHealth.add(hm.getStoreHealth())
report.protocolsHealth.add(hm.getLegacyStoreHealth())
report.protocolsHealth.add(hm.getPeerExchangeHealth())
report.protocolsHealth.add(hm.getRendezvousHealth())
report.protocolsHealth.add(hm.getLightpushClientHealth(relayHealth.health))
report.protocolsHealth.add(hm.getLegacyLightpushClientHealth(relayHealth.health))
report.protocolsHealth.add(hm.getStoreClientHealth())
report.protocolsHealth.add(hm.getLegacyStoreClientHealth())
report.protocolsHealth.add(hm.getFilterClientHealth(relayHealth.health))
return report
proc setNode*(hm: WakuNodeHealthMonitor, node: WakuNode) =
hm.node = some(node)
proc setOverallHealth*(hm: WakuNodeHealthMonitor, health: HealthStatus) =
hm.nodeHealth = health
export node_health_monitor, protocol_health, online_monitor, health_status

View File

@ -0,0 +1,16 @@
import results, std/strutils
type HealthStatus* {.pure.} = enum
INITIALIZING
SYNCHRONIZING
READY
NOT_READY
NOT_MOUNTED
SHUTTING_DOWN
proc init*(t: typedesc[HealthStatus], strRep: string): Result[HealthStatus, string] =
try:
let status = parseEnum[HealthStatus](strRep)
return ok(status)
except ValueError:
return err("Invalid HealthStatus string representation: " & strRep)

View File

@ -0,0 +1,270 @@
{.push raises: [].}
import std/[options, sets, strformat], chronos, chronicles, libp2p/protocols/rendezvous
import
../waku_node,
../../waku_rln_relay,
../../waku_relay,
../peer_manager,
./online_monitor,
./health_status,
./protocol_health
## This module is aimed to check the state of the "self" Waku Node
type
HealthReport* = object
nodeHealth*: HealthStatus
protocolsHealth*: seq[ProtocolHealth]
NodeHealthMonitor* = ref object
nodeHealth: HealthStatus
node: WakuNode
onlineMonitor*: OnlineMonitor
template checkWakuNodeNotNil(node: WakuNode, p: ProtocolHealth): untyped =
if node.isNil():
warn "WakuNode is not set, cannot check health", protocol_health_instance = $p
return p.notMounted()
proc getRelayHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Relay")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuRelay == nil:
return p.notMounted()
let relayPeers = hm.node.wakuRelay.getConnectedPubSubPeers(pubsubTopic = "").valueOr:
return p.notMounted()
if relayPeers.len() == 0:
return p.notReady("No connected peers")
return p.ready()
proc getRlnRelayHealth(hm: NodeHealthMonitor): Future[ProtocolHealth] {.async.} =
var p = ProtocolHealth.init("Rln Relay")
if hm.node.isNil():
warn "WakuNode is not set, cannot check health", protocol_health_instance = $p
return p.notMounted()
if hm.node.wakuRlnRelay.isNil():
return p.notMounted()
const FutIsReadyTimout = 5.seconds
let isReadyStateFut = hm.node.wakuRlnRelay.isReady()
if not await isReadyStateFut.withTimeout(FutIsReadyTimout):
return p.notReady("Ready state check timed out")
try:
if not isReadyStateFut.completed():
return p.notReady("Ready state check timed out")
elif isReadyStateFut.read():
return p.ready()
return p.synchronizing()
except:
error "exception reading state: " & getCurrentExceptionMsg()
return p.notReady("State cannot be determined")
proc getLightpushHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Lightpush")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuLightPush == nil:
return p.notMounted()
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Node has no relay peers to fullfill push requests")
proc getLightpushClientHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Lightpush Client")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuLightpushClient == nil:
return p.notMounted()
let selfServiceAvailable =
hm.node.wakuLightPush != nil and relayHealth == HealthStatus.READY
let servicePeerAvailable = hm.node.peerManager.selectPeer(WakuLightPushCodec).isSome()
if selfServiceAvailable or servicePeerAvailable:
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getLegacyLightpushHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Lightpush")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuLegacyLightPush == nil:
return p.notMounted()
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Node has no relay peers to fullfill push requests")
proc getLegacyLightpushClientHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Lightpush Client")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuLegacyLightpushClient == nil:
return p.notMounted()
if (hm.node.wakuLegacyLightPush != nil and relayHealth == HealthStatus.READY) or
hm.node.peerManager.selectPeer(WakuLegacyLightPushCodec).isSome():
return p.ready()
return p.notReady("No Lightpush service peer available yet")
proc getFilterHealth(hm: NodeHealthMonitor, relayHealth: HealthStatus): ProtocolHealth =
var p = ProtocolHealth.init("Filter")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuFilter == nil:
return p.notMounted()
if relayHealth == HealthStatus.READY:
return p.ready()
return p.notReady("Relay is not ready, filter will not be able to sort out messages")
proc getFilterClientHealth(
hm: NodeHealthMonitor, relayHealth: HealthStatus
): ProtocolHealth =
var p = ProtocolHealth.init("Filter Client")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuFilterClient == nil:
return p.notMounted()
if hm.node.peerManager.selectPeer(WakuFilterSubscribeCodec).isSome():
return p.ready()
return p.notReady("No Filter service peer available yet")
proc getStoreHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Store")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuStore == nil:
return p.notMounted()
return p.ready()
proc getStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Store Client")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuStoreClient == nil:
return p.notMounted()
if hm.node.peerManager.selectPeer(WakuStoreCodec).isSome() or hm.node.wakuStore != nil:
return p.ready()
return p.notReady(
"No Store service peer available yet, neither Store service set up for the node"
)
proc getLegacyStoreHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Store")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuLegacyStore == nil:
return p.notMounted()
return p.ready()
proc getLegacyStoreClientHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Legacy Store Client")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuLegacyStoreClient == nil:
return p.notMounted()
if hm.node.peerManager.selectPeer(WakuLegacyStoreCodec).isSome() or
hm.node.wakuLegacyStore != nil:
return p.ready()
return p.notReady(
"No Legacy Store service peers are available yet, neither Store service set up for the node"
)
proc getPeerExchangeHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Peer Exchange")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuPeerExchange == nil:
return p.notMounted()
return p.ready()
proc getRendezvousHealth(hm: NodeHealthMonitor): ProtocolHealth =
var p = ProtocolHealth.init("Rendezvous")
checkWakuNodeNotNil(hm.node, p)
if hm.node.wakuRendezvous == nil:
return p.notMounted()
if hm.node.peerManager.switch.peerStore.peers(RendezVousCodec).len() == 0:
return p.notReady("No Rendezvous peers are available yet")
return p.ready()
proc getNodeHealthReport*(hm: NodeHealthMonitor): Future[HealthReport] {.async.} =
var report: HealthReport
report.nodeHealth = hm.nodeHealth
if not hm.node.isNil():
let relayHealth = hm.getRelayHealth()
report.protocolsHealth.add(relayHealth)
report.protocolsHealth.add(await hm.getRlnRelayHealth())
report.protocolsHealth.add(hm.getLightpushHealth(relayHealth.health))
report.protocolsHealth.add(hm.getLegacyLightpushHealth(relayHealth.health))
report.protocolsHealth.add(hm.getFilterHealth(relayHealth.health))
report.protocolsHealth.add(hm.getStoreHealth())
report.protocolsHealth.add(hm.getLegacyStoreHealth())
report.protocolsHealth.add(hm.getPeerExchangeHealth())
report.protocolsHealth.add(hm.getRendezvousHealth())
report.protocolsHealth.add(hm.getLightpushClientHealth(relayHealth.health))
report.protocolsHealth.add(hm.getLegacyLightpushClientHealth(relayHealth.health))
report.protocolsHealth.add(hm.getStoreClientHealth())
report.protocolsHealth.add(hm.getLegacyStoreClientHealth())
report.protocolsHealth.add(hm.getFilterClientHealth(relayHealth.health))
return report
proc setNodeToHealthMonitor*(hm: NodeHealthMonitor, node: WakuNode) =
hm.node = node
proc setOverallHealth*(hm: NodeHealthMonitor, health: HealthStatus) =
hm.nodeHealth = health
proc startHealthMonitor*(hm: NodeHealthMonitor) =
hm.onlineMonitor.startOnlineMonitor()
proc stopHealthMonitor*(hm: NodeHealthMonitor) {.async.} =
await hm.onlineMonitor.stopOnlineMonitor()
proc new*(
T: type NodeHealthMonitor,
dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
): T =
T(
nodeHealth: INITIALIZING,
node: nil,
onlineMonitor: OnlineMonitor.init(dnsNameServers),
)

View File

@ -0,0 +1,77 @@
import std/sequtils
import chronos, chronicles, libp2p/nameresolving/dnsresolver, libp2p/peerstore
import ../peer_manager/waku_peer_store, waku/waku_core/peers
type
OnOnlineStateChange* = proc(online: bool) {.gcsafe, raises: [].}
OnlineMonitor* = ref object
onOnlineStateChange: OnOnlineStateChange
dnsNameServers*: seq[IpAddress]
onlineStateObservers: seq[OnOnlineStateChange]
networkConnLoopHandle: Future[void] # node: WakuNode
peerStore: PeerStore
online: bool
proc checkInternetConnectivity(
nameServerIps: seq[IpAddress], timeout = 2.seconds
): Future[bool] {.async.} =
const DNSCheckDomain = "one.one.one.one"
let nameServers = nameServerIps.mapIt(initTAddress(it, Port(53)))
let dnsResolver = DnsResolver.new(nameServers)
# Resolve domain IP
let resolved = await dnsResolver.resolveIp(DNSCheckDomain, 0.Port, Domain.AF_UNSPEC)
if resolved.len > 0:
return true
else:
return false
proc updateOnlineState(self: OnlineMonitor) {.async.} =
if self.onlineStateObservers.len == 0:
trace "No online state observers registered, cannot notify about online state change"
return
let numConnectedPeers =
if self.peerStore.isNil():
0
else:
self.peerStore.peers().countIt(it.connectedness == Connected)
self.online =
if numConnectedPeers > 0:
true
else:
await checkInternetConnectivity(self.dnsNameServers)
for onlineStateObserver in self.onlineStateObservers:
onlineStateObserver(self.online)
proc networkConnectivityLoop(self: OnlineMonitor): Future[void] {.async.} =
## Checks periodically whether the node is online or not
## and triggers any change that depends on the network connectivity state
while true:
await self.updateOnlineState()
await sleepAsync(15.seconds)
proc startOnlineMonitor*(self: OnlineMonitor) =
self.networkConnLoopHandle = self.networkConnectivityLoop()
proc stopOnlineMonitor*(self: OnlineMonitor) {.async.} =
if not self.networkConnLoopHandle.isNil():
await self.networkConnLoopHandle.cancelAndWait()
proc setPeerStoreToOnlineMonitor*(self: OnlineMonitor, peerStore: PeerStore) =
self.peerStore = peerStore
proc addOnlineStateObserver*(self: OnlineMonitor, observer: OnOnlineStateChange) =
## Adds an observer that will be called when the online state changes
if observer notin self.onlineStateObservers:
self.onlineStateObservers.add(observer)
proc amIOnline*(self: OnlineMonitor): bool =
return self.online
proc init*(T: type OnlineMonitor, dnsNameServers: seq[IpAddress]): OnlineMonitor =
T(dnsNameServers: dnsNameServers, onlineStateObservers: @[])

View File

@ -0,0 +1,46 @@
import std/[options, strformat]
import ./health_status
type ProtocolHealth* = object
protocol*: string
health*: HealthStatus
desc*: Option[string] ## describes why a certain protocol is considered `NOT_READY`
proc notReady*(p: var ProtocolHealth, desc: string): ProtocolHealth =
p.health = HealthStatus.NOT_READY
p.desc = some(desc)
return p
proc ready*(p: var ProtocolHealth): ProtocolHealth =
p.health = HealthStatus.READY
p.desc = none[string]()
return p
proc notMounted*(p: var ProtocolHealth): ProtocolHealth =
p.health = HealthStatus.NOT_MOUNTED
p.desc = none[string]()
return p
proc synchronizing*(p: var ProtocolHealth): ProtocolHealth =
p.health = HealthStatus.SYNCHRONIZING
p.desc = none[string]()
return p
proc initializing*(p: var ProtocolHealth): ProtocolHealth =
p.health = HealthStatus.INITIALIZING
p.desc = none[string]()
return p
proc shuttingDown*(p: var ProtocolHealth): ProtocolHealth =
p.health = HealthStatus.SHUTTING_DOWN
p.desc = none[string]()
return p
proc `$`*(p: ProtocolHealth): string =
return fmt"protocol: {p.protocol}, health: {p.health}, description: {p.desc}"
proc init*(p: typedesc[ProtocolHealth], protocol: string): ProtocolHealth =
let p = ProtocolHealth(
protocol: protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]()
)
return p

View File

@ -8,7 +8,6 @@ import
libp2p/multistream,
libp2p/muxers/muxer,
libp2p/nameresolving/nameresolver,
libp2p/nameresolving/dnsresolver,
libp2p/peerstore
import
@ -21,6 +20,7 @@ import
../../waku_enr/sharding,
../../waku_enr/capabilities,
../../waku_metadata,
../health_monitor/online_monitor,
./peer_store/peer_storage,
./waku_peer_store
@ -76,8 +76,6 @@ const
# Max peers that we allow from the same IP
DefaultColocationLimit* = 5
DNSCheckDomain = "one.one.one.one"
type ConnectionChangeHandler* = proc(
peerId: PeerId, peerEvent: PeerEventKind
): Future[void] {.gcsafe, raises: [Defect].}
@ -100,16 +98,12 @@ type PeerManager* = ref object of RootObj
started: bool
shardedPeerManagement: bool # temp feature flag
onConnectionChange*: ConnectionChangeHandler
dnsNameServers*: seq[IpAddress]
online: bool
online: bool ## state managed by online_monitor module
#~~~~~~~~~~~~~~~~~~~#
# Helper Functions #
#~~~~~~~~~~~~~~~~~~~#
template isOnline*(self: PeerManager): bool =
self.online
proc calculateBackoff(
initialBackoffInSec: int, backoffFactor: int, failedAttempts: int
): timer.Duration =
@ -545,35 +539,9 @@ proc getStreamByPeerIdAndProtocol*(
return ok(streamRes.get())
proc checkInternetConnectivity(
nameServerIps: seq[IpAddress], timeout = 2.seconds
): Future[bool] {.async.} =
var nameServers: seq[TransportAddress]
for ip in nameServerIps:
nameServers.add(initTAddress(ip, Port(53))) # Assume all servers use port 53
let dnsResolver = DnsResolver.new(nameServers)
# Resolve domain IP
let resolved = await dnsResolver.resolveIp(DNSCheckDomain, 0.Port, Domain.AF_UNSPEC)
if resolved.len > 0:
return true
else:
return false
proc updateOnlineState*(pm: PeerManager) {.async.} =
let numConnectedPeers =
pm.switch.peerStore.peers().countIt(it.connectedness == Connected)
if numConnectedPeers > 0:
pm.online = true
else:
pm.online = await checkInternetConnectivity(pm.dnsNameServers)
proc connectToRelayPeers*(pm: PeerManager) {.async.} =
# only attempt if current node is online
if not pm.isOnline():
if not pm.online:
error "connectToRelayPeers: won't attempt new connections - node is offline"
return
@ -741,6 +709,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
debug "Pruning connection due to ip colocation", peerId = peerId, ip = ip
asyncSpawn(pm.switch.disconnect(peerId))
peerStore.delete(peerId)
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
asyncSpawn pm.onConnectionChange(peerId, Joined)
@ -755,6 +724,7 @@ proc onPeerEvent(pm: PeerManager, peerId: PeerId, event: PeerEvent) {.async.} =
if pm.ipTable[ip].len == 0:
pm.ipTable.del(ip)
break
if not pm.onConnectionChange.isNil():
# we don't want to await for the callback to finish
asyncSpawn pm.onConnectionChange(peerId, Left)
@ -821,6 +791,11 @@ proc logAndMetrics(pm: PeerManager) {.async.} =
connectedPeers.len.float64, labelValues = [$shard]
)
proc getOnlineStateObserver*(pm: PeerManager): OnOnlineStateChange =
return proc(online: bool) {.gcsafe, raises: [].} =
pm.online = online
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
# Pruning and Maintenance (Stale Peers Management) #
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
@ -829,7 +804,7 @@ proc manageRelayPeers*(pm: PeerManager) {.async.} =
if pm.wakuMetadata.shards.len == 0:
return
if not pm.isOnline():
if not pm.online:
error "manageRelayPeers: won't attempt new connections - node is offline"
return
@ -1060,7 +1035,6 @@ proc new*(
maxFailedAttempts = MaxFailedAttempts,
colocationLimit = DefaultColocationLimit,
shardedPeerManagement = false,
dnsNameServers = @[parseIpAddress("1.1.1.1"), parseIpAddress("1.0.0.1")],
): PeerManager {.gcsafe.} =
let capacity = switch.peerStore.capacity
let maxConnections = switch.connManager.inSema.size
@ -1111,7 +1085,6 @@ proc new*(
maxFailedAttempts: maxFailedAttempts,
colocationLimit: colocationLimit,
shardedPeerManagement: shardedPeerManagement,
dnsNameServers: dnsNameServers,
online: true,
)

View File

@ -40,7 +40,7 @@ type RestServerConf* = object
relayCacheCapacity*: uint32
proc startRestServerEssentials*(
nodeHealthMonitor: WakuNodeHealthMonitor, conf: RestServerConf, portsShift: uint16
nodeHealthMonitor: NodeHealthMonitor, conf: RestServerConf, portsShift: uint16
): Result[WakuRestServerRef, string] =
let requestErrorHandler: RestRequestErrorHandler = proc(
error: RestRequestError, request: HttpRequestRef

View File

@ -11,7 +11,7 @@ const ROUTE_HEALTH* = "/health"
const FutHealthReportTimeout = 5.seconds
proc installHealthApiHandler*(
router: var RestRouter, nodeHealthMonitor: WakuNodeHealthMonitor
router: var RestRouter, nodeHealthMonitor: NodeHealthMonitor
) =
router.api(MethodGet, ROUTE_HEALTH) do() -> RestApiResponse:
let healthReportFut = nodeHealthMonitor.getNodeHealthReport()

View File

@ -1,5 +1,6 @@
{.push raises: [].}
import results
import chronicles, json_serialization, json_serialization/std/options
import ../../../waku_node, ../serdes
@ -31,13 +32,10 @@ proc readValue*(
)
let fieldValue = reader.readValue(string)
try:
health = some(HealthStatus.init(fieldValue))
protocol = some(fieldName)
except ValueError:
reader.raiseUnexpectedValue(
"Invalid `health` value: " & getCurrentExceptionMsg()
)
let h = HealthStatus.init(fieldValue).valueOr:
reader.raiseUnexpectedValue("Invalid `health` value: " & $error)
health = some(h)
protocol = some(fieldName)
value = ProtocolHealth(protocol: protocol.get(), health: health.get(), desc: desc)
@ -63,10 +61,11 @@ proc readValue*(
reader.raiseUnexpectedField(
"Multiple `nodeHealth` fields found", "HealthReport"
)
try:
nodeHealth = some(HealthStatus.init(reader.readValue(string)))
except ValueError:
reader.raiseUnexpectedValue("Invalid `health` value")
let health = HealthStatus.init(reader.readValue(string)).valueOr:
reader.raiseUnexpectedValue("Invalid `health` value: " & $error)
nodeHealth = some(health)
of "protocolsHealth":
if protocolsHealth.isSome():
reader.raiseUnexpectedField(

View File

@ -44,7 +44,7 @@ type
proc `$`*(err: ArchiveError): string =
case err.kind
of ArchiveErrorKind.DRIVER_ERROR:
"DIRVER_ERROR: " & err.cause
"DRIVER_ERROR: " & err.cause
of ArchiveErrorKind.INVALID_QUERY:
"INVALID_QUERY: " & err.cause
of ArchiveErrorKind.UNKNOWN:

View File

@ -78,7 +78,7 @@ type
proc `$`*(err: ArchiveError): string =
case err.kind
of ArchiveErrorKind.DRIVER_ERROR:
"DIRVER_ERROR: " & err.cause
"DRIVER_ERROR: " & err.cause
of ArchiveErrorKind.INVALID_QUERY:
"INVALID_QUERY: " & err.cause
of ArchiveErrorKind.UNKNOWN: