logos-delivery/tests/node/test_wakunode_health_monitor.nim
Ivan FB 89d1f87bfe
Rename kernel_api dir to waku_node and tidy node module layout
Why these changes hang together:

- Rename `waku/node/kernel_api/` to `waku/node/waku_node/`: the folder
  holds the node's protocol APIs, so it should carry the node's name
  rather than the legacy "kernel_api" label.

- Collapse the old `kernel_api.nim` aggregator into the top-level
  `waku/waku_node.nim` barrel, and drop `net_config`/`health_monitor`
  from it. Those aren't the node's concern; consumers that used them
  now import them directly (clearer, explicit deps).

- Move the `WakuNode` type from `node_types.nim` into `waku_node.nim`.
  `node_types.nim` only existed to dodge a `WakuNode`/`SubscriptionManager`
  import cycle that Nim actually handles fine, so the type now lives in
  one obvious home and the indirection module is deleted.

- Extract `ShardSubscription` and `EdgeFilterSubState` into their own
  small modules: they are standalone value types with no back-reference
  to the node. `SubscriptionManager` stays with `WakuNode` on purpose
  (it is the node's subscription subsystem; the relationship is real).

Verified: `wakunode2` and `libwaku` build; representative node tests pass.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-03 13:59:07 +02:00

435 lines
16 KiB
Nim

{.used.}
import
std/[json, options, sequtils, strutils, tables], testutils/unittests, chronos, results
import brokers/broker_context
import
waku/[
waku_core,
common/waku_protocol,
node/waku_node,
node/peer_manager,
node/health_monitor/health_status,
node/health_monitor/connection_status,
node/health_monitor/protocol_health,
node/health_monitor/topic_health,
node/health_monitor/node_health_monitor,
messaging_client,
node/waku_node/relay,
node/waku_node/store,
node/waku_node/lightpush,
node/waku_node/filter,
events/health_events,
events/peer_events,
waku_archive,
]
import ../testlib/[wakunode, wakucore], ../waku_archive/archive_utils
import waku/node/subscription_manager
const MockDLow = 4 # Mocked GossipSub DLow value
const TestConnectivityTimeLimit = 3.seconds
proc protoHealthMock(kind: WakuProtocol, health: HealthStatus): ProtocolHealth =
var ph = ProtocolHealth.init(kind)
if health == HealthStatus.READY:
return ph.ready()
else:
return ph.notReady("mock")
suite "Health Monitor - health state calculation":
test "Disconnected, zero peers":
let protocols = @[
protoHealthMock(RelayProtocol, HealthStatus.NOT_READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
protoHealthMock(FilterClientProtocol, HealthStatus.NOT_READY),
protoHealthMock(LightpushClientProtocol, HealthStatus.NOT_READY),
]
let strength = initTable[WakuProtocol, int]()
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Disconnected
test "PartiallyConnected, weak relay":
let weakCount = MockDLow - 1
let protocols = @[protoHealthMock(RelayProtocol, HealthStatus.READY)]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = weakCount
let state = calculateConnectionState(protocols, strength, some(MockDLow))
# Partially connected since relay connectivity is weak (> 0, but < dLow)
check state == ConnectionStatus.PartiallyConnected
test "Connected, robust relay":
let protocols = @[protoHealthMock(RelayProtocol, HealthStatus.READY)]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
let state = calculateConnectionState(protocols, strength, some(MockDLow))
# Fully connected since relay connectivity is ideal (>= dLow)
check state == ConnectionStatus.Connected
test "Connected, robust edge":
let protocols = @[
protoHealthMock(RelayProtocol, HealthStatus.NOT_MOUNTED),
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = HealthyThreshold
strength[FilterClientProtocol] = HealthyThreshold
strength[StoreClientProtocol] = HealthyThreshold
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
test "Disconnected, edge missing store":
let protocols = @[
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = HealthyThreshold
strength[FilterClientProtocol] = HealthyThreshold
strength[StoreClientProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Disconnected
test "PartiallyConnected, edge meets minimum failover requirement":
let weakCount = max(1, HealthyThreshold - 1)
let protocols = @[
protoHealthMock(LightpushClientProtocol, HealthStatus.READY),
protoHealthMock(FilterClientProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[LightpushClientProtocol] = weakCount
strength[FilterClientProtocol] = weakCount
strength[StoreClientProtocol] = weakCount
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.PartiallyConnected
test "Connected, robust relay ignores store server":
let protocols = @[
protoHealthMock(RelayProtocol, HealthStatus.READY),
protoHealthMock(StoreProtocol, HealthStatus.READY),
]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
strength[StoreProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
test "Connected, robust relay ignores store client":
let protocols = @[
protoHealthMock(RelayProtocol, HealthStatus.READY),
protoHealthMock(StoreProtocol, HealthStatus.READY),
protoHealthMock(StoreClientProtocol, HealthStatus.NOT_READY),
]
var strength = initTable[WakuProtocol, int]()
strength[RelayProtocol] = MockDLow
strength[StoreProtocol] = 0
strength[StoreClientProtocol] = 0
let state = calculateConnectionState(protocols, strength, some(MockDLow))
check state == ConnectionStatus.Connected
suite "Health Monitor - events":
asyncTest "Core (relay) health update":
var nodeA: WakuNode
lockNewGlobalBrokerContext:
let nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
(await nodeA.mountRelay()).expect("Node A failed to mount Relay")
await nodeA.start()
let monitorA = NodeHealthMonitor.new(nodeA)
var
lastStatus = ConnectionStatus.Disconnected
callbackCount = 0
healthChangeSignal = newAsyncEvent()
monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} =
lastStatus = status
callbackCount.inc()
healthChangeSignal.fire()
monitorA.startHealthMonitor().expect("Health monitor failed to start")
var nodeB: WakuNode
lockNewGlobalBrokerContext:
let nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
await nodeB.mountStore()
await nodeB.start()
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
proc dummyHandler(topic: PubsubTopic, msg: WakuMessage): Future[void] {.async.} =
discard
nodeA.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect(
"Node A failed to subscribe"
)
nodeB.subscribe((kind: PubsubSub, topic: DefaultPubsubTopic), dummyHandler).expect(
"Node B failed to subscribe"
)
let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotConnected = false
while Moment.now() < connectTimeLimit:
if lastStatus == ConnectionStatus.PartiallyConnected:
gotConnected = true
break
if await healthChangeSignal.wait().withTimeout(connectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotConnected == true
callbackCount >= 1
lastStatus == ConnectionStatus.PartiallyConnected
healthChangeSignal.clear()
await nodeB.stop()
await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo())
let disconnectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotDisconnected = false
while Moment.now() < disconnectTimeLimit:
if lastStatus == ConnectionStatus.Disconnected:
gotDisconnected = true
break
if await healthChangeSignal.wait().withTimeout(disconnectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotDisconnected == true
await monitorA.stopHealthMonitor()
await nodeA.stop()
asyncTest "Edge (light client) health update":
var nodeA: WakuNode
lockNewGlobalBrokerContext:
let nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
nodeA.mountLightpushClient()
await nodeA.mountFilterClient()
nodeA.mountStoreClient()
require nodeA.mountAutoSharding(1, 8).isOk
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
await nodeA.start()
let ds =
MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient")
ds.start().expect("Failed to start MessagingClient")
let monitorA = NodeHealthMonitor.new(nodeA)
var
lastStatus = ConnectionStatus.Disconnected
callbackCount = 0
healthChangeSignal = newAsyncEvent()
monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} =
lastStatus = status
callbackCount.inc()
healthChangeSignal.fire()
monitorA.startHealthMonitor().expect("Health monitor failed to start")
var nodeB: WakuNode
lockNewGlobalBrokerContext:
let nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
(await nodeB.mountLightpush()).expect("Node B failed to mount lightpush")
await nodeB.mountFilter()
await nodeB.mountStore()
require nodeB.mountAutoSharding(1, 8).isOk
nodeB.mountMetadata(1, toSeq(0'u16 ..< 8'u16)).expect(
"Node B failed to mount metadata"
)
await nodeB.start()
var metadataFut = newFuture[void]("waitForMetadata")
let metadataLis = WakuPeerEvent
.listen(
nodeA.brokerCtx,
proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} =
if not metadataFut.finished and
evt.kind == WakuPeerEventKind.EventMetadataUpdated:
metadataFut.complete()
,
)
.expect("Failed to listen for metadata")
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
let metadataOk = await metadataFut.withTimeout(TestConnectivityTimeLimit)
await WakuPeerEvent.dropListener(nodeA.brokerCtx, metadataLis)
require metadataOk
let connectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotConnected = false
while Moment.now() < connectTimeLimit:
if lastStatus == ConnectionStatus.PartiallyConnected:
gotConnected = true
break
if await healthChangeSignal.wait().withTimeout(connectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotConnected == true
callbackCount >= 1
lastStatus == ConnectionStatus.PartiallyConnected
healthChangeSignal.clear()
await nodeB.stop()
await nodeA.disconnectNode(nodeB.switch.peerInfo.toRemotePeerInfo())
let disconnectTimeLimit = Moment.now() + TestConnectivityTimeLimit
var gotDisconnected = false
while Moment.now() < disconnectTimeLimit:
if lastStatus == ConnectionStatus.Disconnected:
gotDisconnected = true
break
if await healthChangeSignal.wait().withTimeout(disconnectTimeLimit - Moment.now()):
healthChangeSignal.clear()
check:
gotDisconnected == true
lastStatus == ConnectionStatus.Disconnected
await monitorA.stopHealthMonitor()
await ds.stop()
await nodeA.stop()
asyncTest "Edge health driven by confirmed filter subscriptions":
var nodeA: WakuNode
lockNewGlobalBrokerContext:
let nodeAKey = generateSecp256k1Key()
nodeA = newTestWakuNode(nodeAKey, parseIpAddress("127.0.0.1"), Port(0))
await nodeA.mountFilterClient()
nodeA.mountLightpushClient()
nodeA.mountStoreClient()
require nodeA.mountAutoSharding(1, 8).isOk
nodeA.mountMetadata(1, @[0'u16]).expect("Node A failed to mount metadata")
await nodeA.start()
let ds =
MessagingClient.new(false, nodeA).expect("Failed to create MessagingClient")
ds.start().expect("Failed to start MessagingClient")
let subMgr = nodeA.subscriptionManager
var nodeB: WakuNode
lockNewGlobalBrokerContext:
let nodeBKey = generateSecp256k1Key()
nodeB = newTestWakuNode(nodeBKey, parseIpAddress("127.0.0.1"), Port(0))
let driver = newSqliteArchiveDriver()
nodeB.mountArchive(driver).expect("Node B failed to mount archive")
(await nodeB.mountRelay()).expect("Node B failed to mount relay")
(await nodeB.mountLightpush()).expect("Node B failed to mount lightpush")
await nodeB.mountFilter()
await nodeB.mountStore()
require nodeB.mountAutoSharding(1, 8).isOk
nodeB.mountMetadata(1, toSeq(0'u16 ..< 8'u16)).expect(
"Node B failed to mount metadata"
)
await nodeB.start()
let monitorA = NodeHealthMonitor.new(nodeA)
var
lastStatus = ConnectionStatus.Disconnected
healthSignal = newAsyncEvent()
monitorA.onConnectionStatusChange = proc(status: ConnectionStatus) {.async.} =
lastStatus = status
healthSignal.fire()
monitorA.startHealthMonitor().expect("Health monitor failed to start")
var metadataFut = newFuture[void]("waitForMetadata")
let metadataLis = WakuPeerEvent
.listen(
nodeA.brokerCtx,
proc(evt: WakuPeerEvent): Future[void] {.async: (raises: []), gcsafe.} =
if not metadataFut.finished and
evt.kind == WakuPeerEventKind.EventMetadataUpdated:
metadataFut.complete()
,
)
.expect("Failed to listen for metadata")
await nodeA.connectToNodes(@[nodeB.switch.peerInfo.toRemotePeerInfo()])
let metadataOk = await metadataFut.withTimeout(TestConnectivityTimeLimit)
await WakuPeerEvent.dropListener(nodeA.brokerCtx, metadataLis)
require metadataOk
var deadline = Moment.now() + TestConnectivityTimeLimit
while Moment.now() < deadline:
if lastStatus == ConnectionStatus.PartiallyConnected:
break
if await healthSignal.wait().withTimeout(deadline - Moment.now()):
healthSignal.clear()
check lastStatus == ConnectionStatus.PartiallyConnected
var shardHealthFut = newFuture[EventShardTopicHealthChange]("waitForShardHealth")
let shardHealthLis = EventShardTopicHealthChange
.listen(
nodeA.brokerCtx,
proc(
evt: EventShardTopicHealthChange
): Future[void] {.async: (raises: []), gcsafe.} =
if not shardHealthFut.finished and (
evt.health == TopicHealth.MINIMALLY_HEALTHY or
evt.health == TopicHealth.SUFFICIENTLY_HEALTHY
):
shardHealthFut.complete(evt)
,
)
.expect("Failed to listen for shard health")
let contentTopic = ContentTopic("/waku/2/default-content/proto")
subMgr.subscribe(contentTopic).expect("Failed to subscribe")
let shardHealthOk = await shardHealthFut.withTimeout(TestConnectivityTimeLimit)
await EventShardTopicHealthChange.dropListener(nodeA.brokerCtx, shardHealthLis)
check shardHealthOk == true
check nodeA.subscriptionManager.edgeFilterSubStates.len > 0
healthSignal.clear()
deadline = Moment.now() + TestConnectivityTimeLimit
while Moment.now() < deadline:
if lastStatus == ConnectionStatus.PartiallyConnected:
break
if await healthSignal.wait().withTimeout(deadline - Moment.now()):
healthSignal.clear()
check lastStatus == ConnectionStatus.PartiallyConnected
await ds.stop()
await monitorA.stopHealthMonitor()
await nodeB.stop()
await nodeA.stop()