logos-messaging-nim/tests/node/test_wakunode_health_monitor.nim
Ivan FB aca652008a
Reshape per-layer API into api/ folders and thin the FFI over them
Each layer now separates its constructible core from its public surface:

  - core module (waku.nim / messaging_client.nim /
    reliable_channel_manager.nim): the type plus new/start/stop and the
    private construction helpers.
  - api/ folder: one module per differentiated set of operations
    (waku: topics/relay/filter/lightpush/store/peer_manager/discovery/
    debug/health) plus an events surface.

The waku api is reshaped to be the complete operation surface the C
bindings need, so the library no longer reaches into node internals:
relayPublish returns the message hash, relaySubscribe takes an optional
handler, filter/lightpush auto-select the service peer, connectedPeersInfo
returns structured data, pingPeer honours the timeout, plus
relayNumPeersInMesh / relayNumConnectedPeers / isOnline. library/ is now a
thin C-ABI shim: each {.ffi.} proc only marshals cstring/JSON/callbacks and
delegates to ctx.myLib[].waku.<op> (or messagingClient.<op>).
app_callbacks re-exports the modules defining its handler types, which the
included FFI files previously relied on by leakage.

Events move next to the surface that owns them, with each dependency kept
pointing the right way:

  - waku/events/ relocated under waku/api/events/.
  - channel events live in channels/api/events.nim.
  - the four messaging-level message events move to messaging/api/events;
    MessageSeenEvent stays in waku because it is emitted by waku core, so
    moving it would make waku depend on the messaging layer.
  - delivery_events renamed to filter_subscribe_events to match the
    OnFilterSubscribe/Unsubscribe events it actually declares.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-25 13:00:18 +02:00

437 lines
16 KiB
Nim

{.used.}
import
std/[json, options, sequtils, strutils, tables], testutils/unittests, chronos, results
import brokers/broker_context
import
logos_delivery/waku/[
waku_core,
common/waku_protocol,
node/waku_node,
node/peer_manager,
node/health_monitor/health_status,
node/health_monitor/connection_status,
node/health_monitor/protocol_health,
node/health_monitor/topic_health,
node/health_monitor/node_health_monitor,
node/waku_node/relay,
node/waku_node/store,
node/waku_node/lightpush,
node/waku_node/filter,
api/events/health_events,
api/events/peer_events,
waku_archive,
]
import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils
import logos_delivery/waku/node/subscription_manager
import logos_delivery/messaging/messaging_client
const MockDLow = 4 # Mocked GossipSub DLow value
const TestConnectivityTimeLimit = 3.seconds
proc protoHealthMock(kind: WakuProtocol, health: HealthStatus): ProtocolHealth =
var ph = ProtocolHealth.init(kind)
if health == HealthStatus.READY:
return ph.ready()
else:
return ph.notReady("mock")
suite "Health Monitor - health state calculation":
test "Disconnected, zero peers":
let protocols = @[
protoHealthMock(RelayProtocol, HealthStatus.NOT_READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
protoHealthMock(FilterClientProtocol, HealthStatus.NOT_READY),
protoHealthMock(LightpushClientProtocol, HealthStatus.NOT_READY),
]
let strength = initTable[WakuProtocol, int]()
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Disconnected
test "PartiallyConnected, weak relay":
let weakCount = MockDLow - 1
let protocols = @[protoHealthMock(RelayProtocol, HealthStatus.READY)]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = weakCount
let state = calculateConnectionState(protocols, strength, some(MockDLow))
# Partially connected since relay connectivity is weak (> 0, but < dLow)
check state == ConnectionStatus.PartiallyConnected
test "Connected, robust relay":
let protocols = @[protoHealthMock(RelayProtocol, HealthStatus.READY)]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
let state = calculateConnectionState(protocols, strength, some(MockDLow))
# Fully connected since relay connectivity is ideal (>= dLow)
check state == ConnectionStatus.Connected
test "Connected, robust edge":
let protocols = @[
protoHealthMock(RelayProtocol, HealthStatus.NOT_MOUNTED),
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = HealthyThreshold
strength[FilterClientProtocol] = HealthyThreshold
strength[StoreClientProtocol] = HealthyThreshold
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
test "Disconnected, edge missing store":
let protocols = @[
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = HealthyThreshold
strength[FilterClientProtocol] = HealthyThreshold
strength[StoreClientProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Disconnected
test "PartiallyConnected, edge meets minimum failover requirement":
let weakCount = max(1, HealthyThreshold - 1)
let protocols = @[
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = weakCount
strength[FilterClientProtocol] = weakCount
strength[StoreClientProtocol] = weakCount
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.PartiallyConnected
test "Connected, robust relay ignores store server":
let protocols = @[
protoHealthMock(RelayProtocol, HealthStatus.READY),
protoHealthMock(StoreProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
strength[StoreProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
test "Connected, robust relay ignores store client":
let protocols = @[
protoHealthMock(RelayProtocol, HealthStatus.READY),
protoHealthMock(StoreProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
strength[StoreProtocol] = 0
strength[StoreClientProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
suite "Health Monitor - events":
asyncTest "Core (relay) health update":
var nodeA: WakuNode
lockNewGlobalBrokerContext:
let nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
(await nodeA.mountRelay()).expect("Node A failed to mount Relay")
await nodeA.start()
let monitorA = NodeHealthMonitor.new(nodeA)
var
lastStatus = ConnectionStatus.Disconnected
callbackCount = 0
healthChangeSignal = newAsyncEvent()
monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} =
lastStatus = status
callbackCount.inc()
healthChangeSignal.fire()
monitorA.startHealthMonitor().expect("Health monitor failed to start")
var nodeB: WakuNode
lockNewGlobalBrokerContext:
let nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
await nodeB.mountStore()
await nodeB.start()
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async.} =
discard
nodeA.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect(
"Node A failed to subscribe"
)
nodeB.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect(
"Node B failed to subscribe"
)
let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotConnected = false
while Moment.now() < connectTimeLimit:
if lastStatus == ConnectionStatus.PartiallyConnected:
gotConnected = true
break
if await healthChangeSignal.wait().withTimeout(connectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotConnected == true
callbackCount >= 1
lastStatus == ConnectionStatus.PartiallyConnected
healthChangeSignal.clear()
await nodeB.stop()
await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo())
let disconnectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotDisconnected = false
while Moment.now() < disconnectTimeLimit:
if lastStatus == ConnectionStatus.Disconnected:
gotDisconnected = true
break
if await healthChangeSignal.wait().withTimeout(disconnectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotDisconnected == true
await monitorA.stopHealthMonitor()
await nodeA.stop()
asyncTest "Edge (light client) health update":
var nodeA: WakuNode
lockNewGlobalBrokerContext:
let nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
nodeA.mountLightpushClient()
await nodeA.mountFilterClient()
nodeA.mountStoreClient()
require nodeA.mountAutoSharding(1, 8).isOk
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
await nodeA.start()
let ds = MessagingClient
.new(MessagingClientConf(useP2PReliability: false), nodeA)
.expect("Failed to create MessagingClient")
ds.start().expect("Failed to start MessagingClient")
let monitorA = NodeHealthMonitor.new(nodeA)
var
lastStatus = ConnectionStatus.Disconnected
callbackCount = 0
healthChangeSignal = newAsyncEvent()
monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} =
lastStatus = status
callbackCount.inc()
healthChangeSignal.fire()
monitorA.startHealthMonitor().expect("Health monitor failed to start")
var nodeB: WakuNode
lockNewGlobalBrokerContext:
let nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
(await nodeB.mountLightpush()).expect("Node B failed to mount lightpush")
await nodeB.mountFilter()
await nodeB.mountStore()
require nodeB.mountAutoSharding(1, 8).isOk
nodeB.mountMetadata(1, toSeq(0'u16 ..< 8'u16)).expect(
"Node B failed to mount metadata"
)
await nodeB.start()
var metadataFut = newFuture[void]("waitForMetadata")
let metadataLis = WakuPeerEvent
.listen(
nodeA.brokerCtx,
proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} =
if not metadataFut.finished and
evt.kind == WakuPeerEventKind.EventMetadataUpdated:
metadataFut.complete()
,
)
.expect("Failed to listen for metadata")
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
let metadataOk = await metadataFut.withTimeout(TestConnectivityTimeLimit)
await WakuPeerEvent.dropListener(nodeA.brokerCtx, metadataLis)
require metadataOk
let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotConnected = false
while Moment.now() < connectTimeLimit:
if lastStatus == ConnectionStatus.PartiallyConnected:
gotConnected = true
break
if await healthChangeSignal.wait().withTimeout(connectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotConnected == true
callbackCount >= 1
lastStatus == ConnectionStatus.PartiallyConnected
healthChangeSignal.clear()
await nodeB.stop()
await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo())
let disconnectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotDisconnected = false
while Moment.now() < disconnectTimeLimit:
if lastStatus == ConnectionStatus.Disconnected:
gotDisconnected = true
break
if await healthChangeSignal.wait().withTimeout(disconnectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotDisconnected == true
lastStatus == ConnectionStatus.Disconnected
await monitorA.stopHealthMonitor()
await ds.stop()
await nodeA.stop()
asyncTest "Edge health driven by confirmed filter subscriptions":
var nodeA: WakuNode
lockNewGlobalBrokerContext:
let nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
await nodeA.mountFilterClient()
nodeA.mountLightpushClient()
nodeA.mountStoreClient()
require nodeA.mountAutoSharding(1, 8).isOk
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
await nodeA.start()
let ds = MessagingClient
.new(MessagingClientConf(useP2PReliability: false), nodeA)
.expect("Failed to create MessagingClient")
ds.start().expect("Failed to start MessagingClient")
let subMgr = nodeA.subscriptionManager
var nodeB: WakuNode
lockNewGlobalBrokerContext:
let nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
(await nodeB.mountLightpush()).expect("Node B failed to mount lightpush")
await nodeB.mountFilter()
await nodeB.mountStore()
require nodeB.mountAutoSharding(1, 8).isOk
nodeB.mountMetadata(1, toSeq(0'u16 ..< 8'u16)).expect(
"Node B failed to mount metadata"
)
await nodeB.start()
let monitorA = NodeHealthMonitor.new(nodeA)
var
lastStatus = ConnectionStatus.Disconnected
healthSignal = newAsyncEvent()
monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} =
lastStatus = status
healthSignal.fire()
monitorA.startHealthMonitor().expect("Health monitor failed to start")
var metadataFut = newFuture[void]("waitForMetadata")
let metadataLis = WakuPeerEvent
.listen(
nodeA.brokerCtx,
proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} =
if not metadataFut.finished and
evt.kind == WakuPeerEventKind.EventMetadataUpdated:
metadataFut.complete()
,
)
.expect("Failed to listen for metadata")
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
let metadataOk = await metadataFut.withTimeout(TestConnectivityTimeLimit)
await WakuPeerEvent.dropListener(nodeA.brokerCtx, metadataLis)
require metadataOk
var deadline = Moment.now() + TestConnectivityTimeLimit
while Moment.now() < deadline:
if lastStatus == ConnectionStatus.PartiallyConnected:
break
if await healthSignal.wait().withTimeout(deadline - Moment.now()):
healthSignal.clear()
check lastStatus == ConnectionStatus.PartiallyConnected
var shardHealthFut = newFuture[EventShardTopicHealthChange]("waitForShardHealth")
let shardHealthLis = EventShardTopicHealthChange
.listen(
nodeA.brokerCtx,
proc(
evt: EventShardTopicHealthChange
): Future[void] {.async: (raises: []), gcsafe.} =
if not shardHealthFut.finished and (
evt.health == TopicHealth.MINIMALLY_HEALTHY or
evt.health == TopicHealth.SUFFICIENTLY_HEALTHY
):
shardHealthFut.complete(evt)
,
)
.expect("Failed to listen for shard health")
let contentTopic = ContentTopic("/waku/2/default-content/proto")
subMgr.subscribe(contentTopic).expect("Failed to subscribe")
let shardHealthOk = await shardHealthFut.withTimeout(TestConnectivityTimeLimit)
await EventShardTopicHealthChange.dropListener(nodeA.brokerCtx, shardHealthLis)
check shardHealthOk == true
check nodeA.subscriptionManager.edgeFilterSubStates.len > 0
healthSignal.clear()
deadline = Moment.now() + TestConnectivityTimeLimit
while Moment.now() < deadline:
if lastStatus == ConnectionStatus.PartiallyConnected:
break
if await healthSignal.wait().withTimeout(deadline - Moment.now()):
healthSignal.clear()
check lastStatus == ConnectionStatus.PartiallyConnected
await ds.stop()
await monitorA.stopHealthMonitor()
await nodeB.stop()
await nodeA.stop()