logos-messaging-nim/tests/api/test_api_health.nim
NagyZoltanPeter a7f893555d
Integrate api-shape phase2 (#3989) + api interfaces (#3975) (#3999)
* Reshape per-layer API into api/ folders and thin the FFI over them

Each layer now separates its constructible core from its public surface:

  - core module (waku.nim / messaging_client.nim /
    reliable_channel_manager.nim): the type plus new/start/stop and the
    private construction helpers.
  - api/ folder: one module per differentiated set of operations
    (waku: topics/relay/filter/lightpush/store/peer_manager/discovery/
    debug/health) plus an events surface.

The waku api is reshaped to be the complete operation surface the C
bindings need, so the library no longer reaches into node internals:
relayPublish returns the message hash, relaySubscribe takes an optional
handler, filter/lightpush auto-select the service peer, connectedPeersInfo
returns structured data, pingPeer honours the timeout, plus
relayNumPeersInMesh / relayNumConnectedPeers / isOnline. library/ is now a
thin C-ABI shim: each {.ffi.} proc only marshals cstring/JSON/callbacks and
delegates to ctx.myLib[].waku.<op> (or messagingClient.<op>).
app_callbacks re-exports the modules defining its handler types, which the
included FFI files previously relied on by leakage.

Events move next to the surface that owns them, with each dependency kept
pointing the right way:

  - waku/events/ relocated under waku/api/events/.
  - channel events live in channels/api/events.nim.
  - the four messaging-level message events move to messaging/api/events;
    MessageSeenEvent stays in waku because it is emitted by waku core, so
    moving it would make waku depend on the messaging layer.
  - delivery_events renamed to filter_subscribe_events to match the
    OnFilterSubscribe/Unsubscribe events it actually declares.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Add reliable-channel FFI ops + events (nim-ffi v0.1.3)

Expose the reliable-channel layer through the v0.1.3 FFI:
- channel_create / channel_send / channel_close call the
  ReliableChannelManager api (createReliableChannel / send / closeChannel),
  marshalling channel id + base64 payload + ephemeral by hand
- channel message received / sent / errored are surfaced by listening to the
  channel-layer broker events in start_node and forwarding them through
  callEventCallback (received payload base64-encoded), dropped in stop_node

Stays on nim-ffi v0.1.3 (no typed/CBOR rewrite).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Expose reliable-channel ops in the stable C header (#3851)

The library already ships as a single .so with a tiered header surface
(liblogosdelivery.h = stable Messaging/Reliable-Channels, liblogosdelivery_kernel.h
= advanced Kernel). Per that tiering, the reliable-channel ops belong on the
stable surface, so declare channel_create / channel_send / channel_close in
liblogosdelivery.h and document the channel lifecycle events delivered through
the event callback.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Graft PR#3975 interface layer onto decomposed foundation (events deduped)

Add IKernel/IMessagingClient/IReliableChannelManager/ILogosDelivery interface
classes under logos_delivery/api/. The EventBroker types PR#3975 hoisted into
these files already exist in PR#3989's decomposed */api/events/ modules, so the
interface files re-export those modules instead of redefining the types
(avoids 8 duplicate EventBroker definitions). api/types.nim kept at the
foundation version (ChannelId stays in channels/types.nim, which the decomposed
modules import).

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Wire impl classes to interfaces (inherit; relocate SendHandler)

- Waku : IKernel, MessagingClient : IMessagingClient,
  ReliableChannelManager : IReliableChannelManager.
- The operation procs already live in PR#3989's decomposed */api/ modules and
  stay as plain procs (nothing dispatches through the interface types, so no
  method-ization is needed).
- SendHandler now lives in reliable_channel_manager_api.nim (its PR#3975 home);
  removed the duplicate from reliable_channel.nim, which re-exports the
  interface module so channels/api/{channel_lifecycle,send} still see it.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Wire LogosDelivery to ILogosDelivery orchestrator interface

LogosDelivery : ILogosDelivery; start/stop/isOnline become method overrides.
Peripheral PR#3975 edits (lightpush/store clients, self_req_handlers,
statistics) are import-reorg artifacts of deleting waku/utils/requests.nim,
which the decomposed structure keeps -- so they are intentionally not ported.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Dedup EventConnectionStatusChange (re-export from health_events)

9th duplicate EventBroker type: defined in both logos_delivery_api.nim and the
decomposed waku/api/events/health_events.nim. The interface file now re-exports
it. liblogosdelivery builds clean.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* Move events back into interface-class source files (restore #3975 placement)

Reverses the earlier dedup-by-re-export: event TYPE definitions now live in the
interface classes, and the emptied decomposed event files are removed.

- MessageSeenEvent            -> logos_delivery/api/kernel_api.nim
- Message{Sent,Error,Propagated,Received}Event -> api/messaging_client_api.nim
- ChannelMessage{Received,Sent,Error}Event     -> api/reliable_channel_manager_api.nim
- EventConnectionStatusChange -> api/logos_delivery_api.nim

Deleted (became empty after the move):
- logos_delivery/waku/api/events/message_events.nim
- logos_delivery/messaging/api/events.nim
- logos_delivery/channels/api/events.nim
health_events.nim keeps its two remaining events (content/shard topic health).

Rewiring: each layer re-exports its interface module (waku->kernel_api,
messaging_client->messaging_client_api, reliable_channel->reliable_channel_manager_api,
which also re-exports messaging_client_api). Deep emitters/listeners
(subscription_manager, waku_node, waku_node/relay, node_health_monitor,
recv_service, send_service) import the owning interface module directly.
kernel_api stays below node level (types/topics/message/store-common) so the
node->kernel_api imports are acyclic. liblogosdelivery builds.

Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>

* nph formatting

---------

Co-authored-by: Ivan FB <ivansete@status.im>
Co-authored-by: Claude Opus 4.8 <noreply@anthropic.com>
2026-06-30 11:51:22 +02:00

301 lines
10 KiB
Nim

{.used.}
import std/[options, sequtils, times]
import chronos, testutils/unittests, stew/byteutils, libp2p/[switch, peerinfo]
import brokers/broker_context
import ../testlib/[common, wakucore, wakunode, testasync]
import
logos_delivery,
logos_delivery/waku/[waku_node, waku_core, waku_relay/protocol],
logos_delivery/waku/node/health_monitor/
[topic_health, health_status, protocol_health, health_report],
logos_delivery/waku/requests/health_requests,
logos_delivery/waku/requests/node_requests,
logos_delivery/waku/api/events/health_events,
logos_delivery/waku/common/waku_protocol,
logos_delivery/waku/factory/waku_conf
import tools/confutils/cli_args
const TestTimeout = chronos.seconds(10)
const DefaultShard = PubsubTopic("/waku/2/rs/3/0")
const TestContentTopic = ContentTopic("/waku/2/default-content/proto")
proc dummyHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
discard
proc waitForConnectionStatus(
brokerCtx: BrokerContext, expected: ConnectionStatus
) {.async.} =
var future = newFuture[void]("waitForConnectionStatus")
let handler: EventConnectionStatusChangeListenerProc = proc(
e: EventConnectionStatusChange
) {.async: (raises: []), gcsafe.} =
if not future.finished:
if e.connectionStatus == expected:
future.complete()
let handle = EventConnectionStatusChange.listen(brokerCtx, handler).valueOr:
raiseAssert error
try:
if not await future.withTimeout(TestTimeout):
raiseAssert "Timeout waiting for status: " & $expected
finally:
await EventConnectionStatusChange.dropListener(brokerCtx, handle)
proc waitForShardHealthy(
brokerCtx: BrokerContext
): Future[EventShardTopicHealthChange] {.async.} =
var future = newFuture[EventShardTopicHealthChange]("waitForShardHealthy")
let handler: EventShardTopicHealthChangeListenerProc = proc(
e: EventShardTopicHealthChange
) {.async: (raises: []), gcsafe.} =
if not future.finished:
if e.health == TopicHealth.MINIMALLY_HEALTHY or
e.health == TopicHealth.SUFFICIENTLY_HEALTHY:
future.complete(e)
let handle = EventShardTopicHealthChange.listen(brokerCtx, handler).valueOr:
raiseAssert error
try:
if await future.withTimeout(TestTimeout):
return future.read()
else:
raiseAssert "Timeout waiting for shard health event"
finally:
await EventShardTopicHealthChange.dropListener(brokerCtx, handle)
suite "LM API health checking":
var
serviceNode {.threadvar.}: WakuNode
client {.threadvar.}: LogosDelivery
servicePeerInfo {.threadvar.}: RemotePeerInfo
asyncSetup:
lockNewGlobalBrokerContext:
serviceNode =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
(await serviceNode.mountRelay()).isOkOr:
raiseAssert error
serviceNode.mountMetadata(3, @[0'u16]).isOkOr:
raiseAssert error
await serviceNode.mountLibp2pPing()
await serviceNode.start()
servicePeerInfo = serviceNode.peerInfo.toRemotePeerInfo()
serviceNode.wakuRelay.subscribe(DefaultShard, dummyHandler)
lockNewGlobalBrokerContext:
var conf = defaultWakuNodeConf().valueOr:
raiseAssert error
conf.mode = Core
conf.listenAddress = parseIpAddress("0.0.0.0")
conf.tcpPort = Port(0)
conf.discv5UdpPort = Port(0)
conf.clusterId = some(3'u16)
conf.numShardsInNetwork = 1
conf.rest = false
client = (await LogosDelivery.new(conf)).valueOr:
raiseAssert error
(await client.start()).isOkOr:
raiseAssert error
asyncTeardown:
discard await client.stop()
await serviceNode.stop()
asyncTest "RequestShardTopicsHealth, check PubsubTopic health":
client.waku.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
await client.waku.node.connectToNodes(@[servicePeerInfo])
var isHealthy = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestShardTopicsHealth.request(client.waku.brokerCtx, @[DefaultShard]).valueOr:
raiseAssert "RequestShardTopicsHealth failed"
if req.topicHealth.len > 0:
let h = req.topicHealth[0].health
if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY:
isHealthy = true
break
await sleepAsync(chronos.milliseconds(100))
check isHealthy == true
asyncTest "RequestShardTopicsHealth, check disconnected PubsubTopic":
const GhostShard = PubsubTopic("/waku/2/rs/1/666")
client.waku.node.wakuRelay.subscribe(GhostShard, dummyHandler)
let req = RequestShardTopicsHealth.request(client.waku.brokerCtx, @[GhostShard]).valueOr:
raiseAssert "Request failed"
check req.topicHealth.len > 0
check req.topicHealth[0].health == TopicHealth.UNHEALTHY
asyncTest "RequestProtocolHealth, check relay status":
await client.waku.node.connectToNodes(@[servicePeerInfo])
var isReady = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let relayReq = await RequestProtocolHealth.request(
client.waku.brokerCtx, WakuProtocol.RelayProtocol
)
if relayReq.isOk() and relayReq.get().healthStatus.health == HealthStatus.READY:
isReady = true
break
await sleepAsync(chronos.milliseconds(100))
check isReady == true
let storeReq = await RequestProtocolHealth.request(
client.waku.brokerCtx, WakuProtocol.StoreProtocol
)
if storeReq.isOk():
check storeReq.get().healthStatus.health != HealthStatus.READY
asyncTest "RequestProtocolHealth, check unmounted protocol":
let req = await RequestProtocolHealth.request(
client.waku.brokerCtx, WakuProtocol.StoreProtocol
)
check req.isOk()
let status = req.get().healthStatus
check status.health == HealthStatus.NOT_MOUNTED
check status.desc.isNone()
asyncTest "RequestConnectionStatus, check connectivity state":
let initialReq = RequestConnectionStatus.request(client.waku.brokerCtx).valueOr:
raiseAssert "RequestConnectionStatus failed"
check initialReq.connectionStatus == ConnectionStatus.Disconnected
await client.waku.node.connectToNodes(@[servicePeerInfo])
var isConnected = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestConnectionStatus.request(client.waku.brokerCtx).valueOr:
raiseAssert "RequestConnectionStatus failed"
if req.connectionStatus == ConnectionStatus.PartiallyConnected or
req.connectionStatus == ConnectionStatus.Connected:
isConnected = true
break
await sleepAsync(chronos.milliseconds(100))
check isConnected == true
asyncTest "EventConnectionStatusChange, detect connect and disconnect":
let connectFuture = waitForConnectionStatus(
client.waku.brokerCtx, ConnectionStatus.PartiallyConnected
)
await client.waku.node.connectToNodes(@[servicePeerInfo])
await connectFuture
let disconnectFuture =
waitForConnectionStatus(client.waku.brokerCtx, ConnectionStatus.Disconnected)
await client.waku.node.disconnectNode(servicePeerInfo)
await disconnectFuture
asyncTest "EventShardTopicHealthChange, detect health improvement":
client.waku.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
let healthEventFuture = waitForShardHealthy(client.waku.brokerCtx)
await client.waku.node.connectToNodes(@[servicePeerInfo])
let event = await healthEventFuture
check event.topic == DefaultShard
asyncTest "RequestHealthReport, check aggregate report":
let req = await RequestHealthReport.request(client.waku.brokerCtx)
check req.isOk()
let report = req.get().healthReport
check report.nodeHealth == HealthStatus.READY
check report.protocolsHealth.len > 0
check report.protocolsHealth.anyIt(it.protocol == $WakuProtocol.RelayProtocol)
asyncTest "RequestContentTopicsHealth, smoke test":
let fictionalTopic = ContentTopic("/waku/2/this-does-not-exist/proto")
let req =
RequestContentTopicsHealth.request(client.waku.brokerCtx, @[fictionalTopic])
check req.isOk()
let res = req.get()
check res.contentTopicHealth.len == 1
check res.contentTopicHealth[0].topic == fictionalTopic
check res.contentTopicHealth[0].health == TopicHealth.NOT_SUBSCRIBED
asyncTest "RequestContentTopicsHealth, core mode trivial 1-shard autosharding":
let cTopic = ContentTopic("/waku/2/my-content-topic/proto")
let shardReq =
RequestRelayShard.request(client.waku.brokerCtx, none(PubsubTopic), cTopic)
check shardReq.isOk()
let targetShard = $shardReq.get().relayShard
client.waku.node.wakuRelay.subscribe(targetShard, dummyHandler)
serviceNode.wakuRelay.subscribe(targetShard, dummyHandler)
await client.waku.node.connectToNodes(@[servicePeerInfo])
var isHealthy = false
let start = Moment.now()
while Moment.now() - start < TestTimeout:
let req = RequestContentTopicsHealth.request(client.waku.brokerCtx, @[cTopic]).valueOr:
raiseAssert "Request failed"
if req.contentTopicHealth.len > 0:
let h = req.contentTopicHealth[0].health
if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY:
isHealthy = true
break
await sleepAsync(chronos.milliseconds(100))
check isHealthy == true
asyncTest "RequestProtocolHealth, edge mode smoke test":
var edgeWaku: LogosDelivery
lockNewGlobalBrokerContext:
var edgeConf = defaultWakuNodeConf().valueOr:
raiseAssert error
edgeConf.mode = Edge
edgeConf.listenAddress = parseIpAddress("0.0.0.0")
edgeConf.tcpPort = Port(0)
edgeConf.discv5UdpPort = Port(0)
edgeConf.clusterId = some(3'u16)
edgeConf.maxMessageSize = "150 KiB"
edgeConf.rest = false
edgeWaku = (await LogosDelivery.new(edgeConf)).valueOr:
raiseAssert "Failed to create edge node: " & error
(await edgeWaku.start()).isOkOr:
raiseAssert "Failed to start edge waku: " & error
let relayReq = await RequestProtocolHealth.request(
edgeWaku.waku.brokerCtx, WakuProtocol.RelayProtocol
)
check relayReq.isOk()
check relayReq.get().healthStatus.health == HealthStatus.NOT_MOUNTED
check not edgeWaku.waku.node.wakuFilterClient.isNil()
discard await edgeWaku.stop()