mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-01-03 06:23:10 +00:00
feat: Extend node /health REST endpoint with all protocol's state (#3419)
* Extend ndoe /health REST endpoint with all protocol's state * Added check for Rendezvous peers availability * Fine tune filter, added client protocols to health report * Fix /health endpoint test * Add explanatory description for state NOT_READY * Fix formattings * Apply suggestions from code review Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com> * Apply code style changes and extended test * Fix formatting --------- Co-authored-by: Ivan FB <128452529+Ivansete-status@users.noreply.github.com>
This commit is contained in:
parent
94cd2f88b4
commit
a39bcff6dc
@ -74,6 +74,10 @@ suite "Waku v2 REST API - health":
|
||||
treePath: genTempPath("rln_tree", "wakunode"),
|
||||
)
|
||||
)
|
||||
|
||||
node.mountLightPushClient()
|
||||
await node.mountFilterClient()
|
||||
|
||||
healthMonitor.setNode(node)
|
||||
healthMonitor.setOverallHealth(HealthStatus.READY)
|
||||
# When
|
||||
@ -84,9 +88,40 @@ suite "Waku v2 REST API - health":
|
||||
response.status == 200
|
||||
$response.contentType == $MIMETYPE_JSON
|
||||
response.data.nodeHealth == HealthStatus.READY
|
||||
response.data.protocolsHealth.len() == 1
|
||||
response.data.protocolsHealth[0].protocol == "Rln Relay"
|
||||
response.data.protocolsHealth[0].health == HealthStatus.READY
|
||||
response.data.protocolsHealth.len() == 14
|
||||
response.data.protocolsHealth[0].protocol == "Relay"
|
||||
response.data.protocolsHealth[0].health == HealthStatus.NOT_READY
|
||||
response.data.protocolsHealth[0].desc == some("No connected peers")
|
||||
response.data.protocolsHealth[1].protocol == "Rln Relay"
|
||||
response.data.protocolsHealth[1].health == HealthStatus.READY
|
||||
response.data.protocolsHealth[2].protocol == "Lightpush"
|
||||
response.data.protocolsHealth[2].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[3].protocol == "Legacy Lightpush"
|
||||
response.data.protocolsHealth[3].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[4].protocol == "Filter"
|
||||
response.data.protocolsHealth[4].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[5].protocol == "Store"
|
||||
response.data.protocolsHealth[5].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[6].protocol == "Legacy Store"
|
||||
response.data.protocolsHealth[6].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[7].protocol == "Peer Exchange"
|
||||
response.data.protocolsHealth[7].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[8].protocol == "Rendezvous"
|
||||
response.data.protocolsHealth[8].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[9].protocol == "Lightpush Client"
|
||||
response.data.protocolsHealth[9].health == HealthStatus.NOT_READY
|
||||
response.data.protocolsHealth[9].desc ==
|
||||
some("No Lightpush service peer available yet")
|
||||
response.data.protocolsHealth[10].protocol == "Legacy Lightpush Client"
|
||||
response.data.protocolsHealth[10].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[11].protocol == "Store Client"
|
||||
response.data.protocolsHealth[11].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[12].protocol == "Legacy Store Client"
|
||||
response.data.protocolsHealth[12].health == HealthStatus.NOT_MOUNTED
|
||||
response.data.protocolsHealth[13].protocol == "Filter Client"
|
||||
response.data.protocolsHealth[13].health == HealthStatus.NOT_READY
|
||||
response.data.protocolsHealth[13].desc ==
|
||||
some("No Filter service peer available yet")
|
||||
|
||||
await restServer.stop()
|
||||
await restServer.closeWait()
|
||||
|
||||
@ -1,8 +1,8 @@
|
||||
{.push raises: [].}
|
||||
|
||||
import std/[options], chronos
|
||||
import std/[options, sets], chronos, libp2p/protocols/rendezvous
|
||||
|
||||
import waku_node, ../waku_rln_relay
|
||||
import waku_node, ../waku_rln_relay, ../waku_relay, ./peer_manager
|
||||
|
||||
type
|
||||
HealthStatus* = enum
|
||||
@ -16,6 +16,7 @@ type
|
||||
ProtocolHealth* = object
|
||||
protocol*: string
|
||||
health*: HealthStatus
|
||||
desc*: Option[string] ## describes why a certain protocol is considered `NOT_READY`
|
||||
|
||||
HealthReport* = object
|
||||
nodeHealth*: HealthStatus
|
||||
@ -54,31 +55,236 @@ proc init*(
|
||||
else:
|
||||
raise newException(ValueError, "Invalid HealthStatus string representation")
|
||||
|
||||
proc init*(p: typedesc[ProtocolHealth], protocol: string): ProtocolHealth =
|
||||
let p = ProtocolHealth(
|
||||
protocol: protocol, health: HealthStatus.NOT_MOUNTED, desc: none[string]()
|
||||
)
|
||||
return p
|
||||
|
||||
proc notReady(p: var ProtocolHealth, desc: string): ProtocolHealth =
|
||||
p.health = HealthStatus.NOT_READY
|
||||
p.desc = some(desc)
|
||||
return p
|
||||
|
||||
proc ready(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.READY
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc notMounted(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.NOT_MOUNTED
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc synchronizing(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.SYNCHRONIZING
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc initializing(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.INITIALIZING
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
proc shuttingDown(p: var ProtocolHealth): ProtocolHealth =
|
||||
p.health = HealthStatus.SHUTTING_DOWN
|
||||
p.desc = none[string]()
|
||||
return p
|
||||
|
||||
const FutIsReadyTimout = 5.seconds
|
||||
|
||||
proc getRelayHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Relay")
|
||||
if hm.node.get().wakuRelay == nil:
|
||||
return p.notMounted()
|
||||
|
||||
let relayPeers = hm.node
|
||||
.get().wakuRelay
|
||||
.getConnectedPubSubPeers(pubsubTopic = "").valueOr:
|
||||
return p.notMounted()
|
||||
|
||||
if relayPeers.len() == 0:
|
||||
return p.notReady("No connected peers")
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getRlnRelayHealth(hm: WakuNodeHealthMonitor): Future[ProtocolHealth] {.async.} =
|
||||
var p = ProtocolHealth.init("Rln Relay")
|
||||
if hm.node.get().wakuRlnRelay.isNil():
|
||||
return p.notMounted()
|
||||
|
||||
let isReadyStateFut = hm.node.get().wakuRlnRelay.isReady()
|
||||
if not await isReadyStateFut.withTimeout(FutIsReadyTimout):
|
||||
return p.notReady("Ready state check timed out")
|
||||
|
||||
try:
|
||||
if not isReadyStateFut.completed():
|
||||
return p.notReady("Ready state check timed out")
|
||||
elif isReadyStateFut.read():
|
||||
return p.ready()
|
||||
|
||||
return p.synchronizing()
|
||||
except:
|
||||
error "exception reading state: " & getCurrentExceptionMsg()
|
||||
return p.notReady("State cannot be determined")
|
||||
|
||||
proc getLightpushHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Lightpush")
|
||||
if hm.node.get().wakuLightPush == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if relayHealth == HealthStatus.READY:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("Node has no relay peers to fullfill push requests")
|
||||
|
||||
proc getLightpushClientHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Lightpush Client")
|
||||
if hm.node.get().wakuLightpushClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
let selfServiceAvailable =
|
||||
hm.node.get().wakuLightPush != nil and relayHealth == HealthStatus.READY
|
||||
let servicePeerAvailable =
|
||||
hm.node.get().peerManager.selectPeer(WakuLightPushCodec).isSome()
|
||||
|
||||
if selfServiceAvailable or servicePeerAvailable:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("No Lightpush service peer available yet")
|
||||
|
||||
proc getLegacyLightpushHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Lightpush")
|
||||
if hm.node.get().wakuLegacyLightPush == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if relayHealth == HealthStatus.READY:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("Node has no relay peers to fullfill push requests")
|
||||
|
||||
proc getLegacyLightpushClientHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Lightpush Client")
|
||||
if hm.node.get().wakuLegacyLightpushClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if (hm.node.get().wakuLegacyLightPush != nil and relayHealth == HealthStatus.READY) or
|
||||
hm.node.get().peerManager.selectPeer(WakuLegacyLightPushCodec).isSome():
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("No Lightpush service peer available yet")
|
||||
|
||||
proc getFilterHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Filter")
|
||||
if hm.node.get().wakuFilter == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if relayHealth == HealthStatus.READY:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("Relay is not ready, filter will not be able to sort out messages")
|
||||
|
||||
proc getFilterClientHealth(
|
||||
hm: WakuNodeHealthMonitor, relayHealth: HealthStatus
|
||||
): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Filter Client")
|
||||
if hm.node.get().wakuFilterClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.get().peerManager.selectPeer(WakuFilterSubscribeCodec).isSome():
|
||||
return p.ready()
|
||||
|
||||
return p.notReady("No Filter service peer available yet")
|
||||
|
||||
proc getStoreHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Store")
|
||||
if hm.node.get().wakuStore == nil:
|
||||
return p.notMounted()
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getStoreClientHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Store Client")
|
||||
if hm.node.get().wakuStoreClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.get().peerManager.selectPeer(WakuStoreCodec).isSome() or
|
||||
hm.node.get().wakuStore != nil:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady(
|
||||
"No Store service peer available yet, neither Store service set up for the node"
|
||||
)
|
||||
|
||||
proc getLegacyStoreHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Store")
|
||||
if hm.node.get().wakuLegacyStore == nil:
|
||||
return p.notMounted()
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getLegacyStoreClientHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Legacy Store Client")
|
||||
if hm.node.get().wakuLegacyStoreClient == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.get().peerManager.selectPeer(WakuLegacyStoreCodec).isSome() or
|
||||
hm.node.get().wakuLegacyStore != nil:
|
||||
return p.ready()
|
||||
|
||||
return p.notReady(
|
||||
"No Legacy Store service peers are available yet, neither Store service set up for the node"
|
||||
)
|
||||
|
||||
proc getPeerExchangeHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Peer Exchange")
|
||||
if hm.node.get().wakuPeerExchange == nil:
|
||||
return p.notMounted()
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getRendezvousHealth(hm: WakuNodeHealthMonitor): ProtocolHealth =
|
||||
var p = ProtocolHealth.init("Rendezvous")
|
||||
if hm.node.get().wakuRendezvous == nil:
|
||||
return p.notMounted()
|
||||
|
||||
if hm.node.get().peerManager.switch.peerStore.peers(RendezVousCodec).len() == 0:
|
||||
return p.notReady("No Rendezvous peers are available yet")
|
||||
|
||||
return p.ready()
|
||||
|
||||
proc getNodeHealthReport*(hm: WakuNodeHealthMonitor): Future[HealthReport] {.async.} =
|
||||
result.nodeHealth = hm.nodeHealth
|
||||
var report: HealthReport
|
||||
report.nodeHealth = hm.nodeHealth
|
||||
|
||||
if hm.node.isSome() and hm.node.get().wakuRlnRelay != nil:
|
||||
let getRlnRelayHealth = proc(): Future[HealthStatus] {.async.} =
|
||||
let isReadyStateFut = hm.node.get().wakuRlnRelay.isReady()
|
||||
if not await isReadyStateFut.withTimeout(FutIsReadyTimout):
|
||||
return HealthStatus.NOT_READY
|
||||
if hm.node.isSome():
|
||||
let relayHealth = hm.getRelayHealth()
|
||||
report.protocolsHealth.add(relayHealth)
|
||||
report.protocolsHealth.add(await hm.getRlnRelayHealth())
|
||||
report.protocolsHealth.add(hm.getLightpushHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getLegacyLightpushHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getFilterHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getStoreHealth())
|
||||
report.protocolsHealth.add(hm.getLegacyStoreHealth())
|
||||
report.protocolsHealth.add(hm.getPeerExchangeHealth())
|
||||
report.protocolsHealth.add(hm.getRendezvousHealth())
|
||||
|
||||
try:
|
||||
if not isReadyStateFut.completed():
|
||||
return HealthStatus.NOT_READY
|
||||
elif isReadyStateFut.read():
|
||||
return HealthStatus.READY
|
||||
|
||||
return HealthStatus.SYNCHRONIZING
|
||||
except:
|
||||
error "exception reading state: " & getCurrentExceptionMsg()
|
||||
return HealthStatus.NOT_READY
|
||||
|
||||
result.protocolsHealth.add(
|
||||
ProtocolHealth(protocol: "Rln Relay", health: await getRlnRelayHealth())
|
||||
)
|
||||
report.protocolsHealth.add(hm.getLightpushClientHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getLegacyLightpushClientHealth(relayHealth.health))
|
||||
report.protocolsHealth.add(hm.getStoreClientHealth())
|
||||
report.protocolsHealth.add(hm.getLegacyStoreClientHealth())
|
||||
report.protocolsHealth.add(hm.getFilterClientHealth(relayHealth.health))
|
||||
return report
|
||||
|
||||
proc setNode*(hm: WakuNodeHealthMonitor, node: WakuNode) =
|
||||
hm.node = some(node)
|
||||
|
||||
@ -10,26 +10,36 @@ proc writeValue*(
|
||||
) {.raises: [IOError].} =
|
||||
writer.beginRecord()
|
||||
writer.writeField(value.protocol, $value.health)
|
||||
writer.writeField("desc", value.desc)
|
||||
writer.endRecord()
|
||||
|
||||
proc readValue*(
|
||||
reader: var JsonReader[RestJson], value: var ProtocolHealth
|
||||
) {.gcsafe, raises: [SerializationError, IOError].} =
|
||||
var health: HealthStatus
|
||||
var fieldCount = 0
|
||||
|
||||
var protocol = none[string]()
|
||||
var health = none[HealthStatus]()
|
||||
var desc = none[string]()
|
||||
for fieldName in readObjectFields(reader):
|
||||
if fieldCount > 0:
|
||||
reader.raiseUnexpectedField("Too many fields", "ProtocolHealth")
|
||||
fieldCount += 1
|
||||
if fieldName == "desc":
|
||||
if desc.isSome():
|
||||
reader.raiseUnexpectedField("Multiple `desc` fields found", "ProtocolHealth")
|
||||
desc = some(reader.readValue(string))
|
||||
else:
|
||||
if protocol.isSome():
|
||||
reader.raiseUnexpectedField(
|
||||
"Multiple `protocol` fields and value found", "ProtocolHealth"
|
||||
)
|
||||
|
||||
let fieldValue = reader.readValue(string)
|
||||
try:
|
||||
health = HealthStatus.init(fieldValue)
|
||||
except ValueError:
|
||||
reader.raiseUnexpectedValue("Invalid `health` value")
|
||||
let fieldValue = reader.readValue(string)
|
||||
try:
|
||||
health = some(HealthStatus.init(fieldValue))
|
||||
protocol = some(fieldName)
|
||||
except ValueError:
|
||||
reader.raiseUnexpectedValue(
|
||||
"Invalid `health` value: " & getCurrentExceptionMsg()
|
||||
)
|
||||
|
||||
value = ProtocolHealth(protocol: fieldName, health: health)
|
||||
value = ProtocolHealth(protocol: protocol.get(), health: health.get(), desc: desc)
|
||||
|
||||
proc writeValue*(
|
||||
writer: var JsonWriter[RestJson], value: HealthReport
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user