mirror of
https://github.com/logos-messaging/logos-messaging-nim.git
synced 2026-06-26 11:29:28 +00:00
Each layer now separates its constructible core from its public surface:
- core module (waku.nim / messaging_client.nim /
reliable_channel_manager.nim): the type plus new/start/stop and the
private construction helpers.
- api/ folder: one module per differentiated set of operations
(waku: topics/relay/filter/lightpush/store/peer_manager/discovery/
debug/health) plus an events surface.
The waku api is reshaped to be the complete operation surface the C
bindings need, so the library no longer reaches into node internals:
relayPublish returns the message hash, relaySubscribe takes an optional
handler, filter/lightpush auto-select the service peer, connectedPeersInfo
returns structured data, pingPeer honours the timeout, plus
relayNumPeersInMesh / relayNumConnectedPeers / isOnline. library/ is now a
thin C-ABI shim: each {.ffi.} proc only marshals cstring/JSON/callbacks and
delegates to ctx.myLib[].waku.<op> (or messagingClient.<op>).
app_callbacks re-exports the modules defining its handler types, which the
included FFI files previously relied on by leakage.
Events move next to the surface that owns them, with each dependency kept
pointing the right way:
- waku/events/ relocated under waku/api/events/.
- channel events live in channels/api/events.nim.
- the four messaging-level message events move to messaging/api/events;
MessageSeenEvent stays in waku because it is emitted by waku core, so
moving it would make waku depend on the messaging layer.
- delivery_events renamed to filter_subscribe_events to match the
OnFilterSubscribe/Unsubscribe events it actually declares.
Co-Authored-By: Claude Opus 4.8 <noreply@anthropic.com>
301 lines
10 KiB
Nim
301 lines
10 KiB
Nim
{.used.}
|
|
|
|
import std/[options, sequtils, times]
|
|
import chronos, testutils/unittests, stew/byteutils, libp2p/[switch, peerinfo]
|
|
import brokers/broker_context
|
|
import ../testlib/[common, wakucore, wakunode, testasync]
|
|
|
|
import
|
|
logos_delivery,
|
|
logos_delivery/waku/[waku_node, waku_core, waku_relay/protocol],
|
|
logos_delivery/waku/node/health_monitor/
|
|
[topic_health, health_status, protocol_health, health_report],
|
|
logos_delivery/waku/requests/health_requests,
|
|
logos_delivery/waku/requests/node_requests,
|
|
logos_delivery/waku/api/events/health_events,
|
|
logos_delivery/waku/common/waku_protocol,
|
|
logos_delivery/waku/factory/waku_conf
|
|
import tools/confutils/cli_args
|
|
|
|
const TestTimeout = chronos.seconds(10)
|
|
const DefaultShard = PubsubTopic("/waku/2/rs/3/0")
|
|
const TestContentTopic = ContentTopic("/waku/2/default-content/proto")
|
|
|
|
proc dummyHandler(
|
|
topic: PubsubTopic, msg: WakuMessage
|
|
): Future[void] {.async, gcsafe.} =
|
|
discard
|
|
|
|
proc waitForConnectionStatus(
|
|
brokerCtx: BrokerContext, expected: ConnectionStatus
|
|
) {.async.} =
|
|
var future = newFuture[void]("waitForConnectionStatus")
|
|
|
|
let handler: EventConnectionStatusChangeListenerProc = proc(
|
|
e: EventConnectionStatusChange
|
|
) {.async: (raises: []), gcsafe.} =
|
|
if not future.finished:
|
|
if e.connectionStatus == expected:
|
|
future.complete()
|
|
|
|
let handle = EventConnectionStatusChange.listen(brokerCtx, handler).valueOr:
|
|
raiseAssert error
|
|
|
|
try:
|
|
if not await future.withTimeout(TestTimeout):
|
|
raiseAssert "Timeout waiting for status: " & $expected
|
|
finally:
|
|
await EventConnectionStatusChange.dropListener(brokerCtx, handle)
|
|
|
|
proc waitForShardHealthy(
|
|
brokerCtx: BrokerContext
|
|
): Future[EventShardTopicHealthChange] {.async.} =
|
|
var future = newFuture[EventShardTopicHealthChange]("waitForShardHealthy")
|
|
|
|
let handler: EventShardTopicHealthChangeListenerProc = proc(
|
|
e: EventShardTopicHealthChange
|
|
) {.async: (raises: []), gcsafe.} =
|
|
if not future.finished:
|
|
if e.health == TopicHealth.MINIMALLY_HEALTHY or
|
|
e.health == TopicHealth.SUFFICIENTLY_HEALTHY:
|
|
future.complete(e)
|
|
|
|
let handle = EventShardTopicHealthChange.listen(brokerCtx, handler).valueOr:
|
|
raiseAssert error
|
|
|
|
try:
|
|
if await future.withTimeout(TestTimeout):
|
|
return future.read()
|
|
else:
|
|
raiseAssert "Timeout waiting for shard health event"
|
|
finally:
|
|
await EventShardTopicHealthChange.dropListener(brokerCtx, handle)
|
|
|
|
suite "LM API health checking":
|
|
var
|
|
serviceNode {.threadvar.}: WakuNode
|
|
client {.threadvar.}: LogosDelivery
|
|
servicePeerInfo {.threadvar.}: RemotePeerInfo
|
|
|
|
asyncSetup:
|
|
lockNewGlobalBrokerContext:
|
|
serviceNode =
|
|
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
|
|
(await serviceNode.mountRelay()).isOkOr:
|
|
raiseAssert error
|
|
serviceNode.mountMetadata(3, @[0'u16]).isOkOr:
|
|
raiseAssert error
|
|
await serviceNode.mountLibp2pPing()
|
|
await serviceNode.start()
|
|
|
|
servicePeerInfo = serviceNode.peerInfo.toRemotePeerInfo()
|
|
serviceNode.wakuRelay.subscribe(DefaultShard, dummyHandler)
|
|
|
|
lockNewGlobalBrokerContext:
|
|
var conf = defaultWakuNodeConf().valueOr:
|
|
raiseAssert error
|
|
conf.mode = Core
|
|
conf.listenAddress = parseIpAddress("0.0.0.0")
|
|
conf.tcpPort = Port(0)
|
|
conf.discv5UdpPort = Port(0)
|
|
conf.clusterId = some(3'u16)
|
|
conf.numShardsInNetwork = 1
|
|
conf.rest = false
|
|
|
|
client = (await LogosDelivery.new(conf)).valueOr:
|
|
raiseAssert error
|
|
(await client.start()).isOkOr:
|
|
raiseAssert error
|
|
|
|
asyncTeardown:
|
|
discard await client.stop()
|
|
await serviceNode.stop()
|
|
|
|
asyncTest "RequestShardTopicsHealth, check PubsubTopic health":
|
|
client.waku.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
|
|
await client.waku.node.connectToNodes(@[servicePeerInfo])
|
|
|
|
var isHealthy = false
|
|
let start = Moment.now()
|
|
while Moment.now() - start < TestTimeout:
|
|
let req = RequestShardTopicsHealth.request(client.waku.brokerCtx, @[DefaultShard]).valueOr:
|
|
raiseAssert "RequestShardTopicsHealth failed"
|
|
|
|
if req.topicHealth.len > 0:
|
|
let h = req.topicHealth[0].health
|
|
if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY:
|
|
isHealthy = true
|
|
break
|
|
await sleepAsync(chronos.milliseconds(100))
|
|
|
|
check isHealthy == true
|
|
|
|
asyncTest "RequestShardTopicsHealth, check disconnected PubsubTopic":
|
|
const GhostShard = PubsubTopic("/waku/2/rs/1/666")
|
|
client.waku.node.wakuRelay.subscribe(GhostShard, dummyHandler)
|
|
|
|
let req = RequestShardTopicsHealth.request(client.waku.brokerCtx, @[GhostShard]).valueOr:
|
|
raiseAssert "Request failed"
|
|
|
|
check req.topicHealth.len > 0
|
|
check req.topicHealth[0].health == TopicHealth.UNHEALTHY
|
|
|
|
asyncTest "RequestProtocolHealth, check relay status":
|
|
await client.waku.node.connectToNodes(@[servicePeerInfo])
|
|
|
|
var isReady = false
|
|
let start = Moment.now()
|
|
while Moment.now() - start < TestTimeout:
|
|
let relayReq = await RequestProtocolHealth.request(
|
|
client.waku.brokerCtx, WakuProtocol.RelayProtocol
|
|
)
|
|
if relayReq.isOk() and relayReq.get().healthStatus.health == HealthStatus.READY:
|
|
isReady = true
|
|
break
|
|
await sleepAsync(chronos.milliseconds(100))
|
|
|
|
check isReady == true
|
|
|
|
let storeReq = await RequestProtocolHealth.request(
|
|
client.waku.brokerCtx, WakuProtocol.StoreProtocol
|
|
)
|
|
if storeReq.isOk():
|
|
check storeReq.get().healthStatus.health != HealthStatus.READY
|
|
|
|
asyncTest "RequestProtocolHealth, check unmounted protocol":
|
|
let req = await RequestProtocolHealth.request(
|
|
client.waku.brokerCtx, WakuProtocol.StoreProtocol
|
|
)
|
|
check req.isOk()
|
|
|
|
let status = req.get().healthStatus
|
|
check status.health == HealthStatus.NOT_MOUNTED
|
|
check status.desc.isNone()
|
|
|
|
asyncTest "RequestConnectionStatus, check connectivity state":
|
|
let initialReq = RequestConnectionStatus.request(client.waku.brokerCtx).valueOr:
|
|
raiseAssert "RequestConnectionStatus failed"
|
|
check initialReq.connectionStatus == ConnectionStatus.Disconnected
|
|
|
|
await client.waku.node.connectToNodes(@[servicePeerInfo])
|
|
|
|
var isConnected = false
|
|
let start = Moment.now()
|
|
while Moment.now() - start < TestTimeout:
|
|
let req = RequestConnectionStatus.request(client.waku.brokerCtx).valueOr:
|
|
raiseAssert "RequestConnectionStatus failed"
|
|
|
|
if req.connectionStatus == ConnectionStatus.PartiallyConnected or
|
|
req.connectionStatus == ConnectionStatus.Connected:
|
|
isConnected = true
|
|
break
|
|
await sleepAsync(chronos.milliseconds(100))
|
|
|
|
check isConnected == true
|
|
|
|
asyncTest "EventConnectionStatusChange, detect connect and disconnect":
|
|
let connectFuture = waitForConnectionStatus(
|
|
client.waku.brokerCtx, ConnectionStatus.PartiallyConnected
|
|
)
|
|
|
|
await client.waku.node.connectToNodes(@[servicePeerInfo])
|
|
await connectFuture
|
|
|
|
let disconnectFuture =
|
|
waitForConnectionStatus(client.waku.brokerCtx, ConnectionStatus.Disconnected)
|
|
await client.waku.node.disconnectNode(servicePeerInfo)
|
|
await disconnectFuture
|
|
|
|
asyncTest "EventShardTopicHealthChange, detect health improvement":
|
|
client.waku.node.wakuRelay.subscribe(DefaultShard, dummyHandler)
|
|
|
|
let healthEventFuture = waitForShardHealthy(client.waku.brokerCtx)
|
|
|
|
await client.waku.node.connectToNodes(@[servicePeerInfo])
|
|
|
|
let event = await healthEventFuture
|
|
check event.topic == DefaultShard
|
|
|
|
asyncTest "RequestHealthReport, check aggregate report":
|
|
let req = await RequestHealthReport.request(client.waku.brokerCtx)
|
|
|
|
check req.isOk()
|
|
|
|
let report = req.get().healthReport
|
|
check report.nodeHealth == HealthStatus.READY
|
|
check report.protocolsHealth.len > 0
|
|
check report.protocolsHealth.anyIt(it.protocol == $WakuProtocol.RelayProtocol)
|
|
|
|
asyncTest "RequestContentTopicsHealth, smoke test":
|
|
let fictionalTopic = ContentTopic("/waku/2/this-does-not-exist/proto")
|
|
|
|
let req =
|
|
RequestContentTopicsHealth.request(client.waku.brokerCtx, @[fictionalTopic])
|
|
|
|
check req.isOk()
|
|
|
|
let res = req.get()
|
|
check res.contentTopicHealth.len == 1
|
|
check res.contentTopicHealth[0].topic == fictionalTopic
|
|
check res.contentTopicHealth[0].health == TopicHealth.NOT_SUBSCRIBED
|
|
|
|
asyncTest "RequestContentTopicsHealth, core mode trivial 1-shard autosharding":
|
|
let cTopic = ContentTopic("/waku/2/my-content-topic/proto")
|
|
|
|
let shardReq =
|
|
RequestRelayShard.request(client.waku.brokerCtx, none(PubsubTopic), cTopic)
|
|
|
|
check shardReq.isOk()
|
|
let targetShard = $shardReq.get().relayShard
|
|
|
|
client.waku.node.wakuRelay.subscribe(targetShard, dummyHandler)
|
|
serviceNode.wakuRelay.subscribe(targetShard, dummyHandler)
|
|
|
|
await client.waku.node.connectToNodes(@[servicePeerInfo])
|
|
|
|
var isHealthy = false
|
|
let start = Moment.now()
|
|
while Moment.now() - start < TestTimeout:
|
|
let req = RequestContentTopicsHealth.request(client.waku.brokerCtx, @[cTopic]).valueOr:
|
|
raiseAssert "Request failed"
|
|
|
|
if req.contentTopicHealth.len > 0:
|
|
let h = req.contentTopicHealth[0].health
|
|
if h == TopicHealth.MINIMALLY_HEALTHY or h == TopicHealth.SUFFICIENTLY_HEALTHY:
|
|
isHealthy = true
|
|
break
|
|
|
|
await sleepAsync(chronos.milliseconds(100))
|
|
|
|
check isHealthy == true
|
|
|
|
asyncTest "RequestProtocolHealth, edge mode smoke test":
|
|
var edgeWaku: LogosDelivery
|
|
|
|
lockNewGlobalBrokerContext:
|
|
var edgeConf = defaultWakuNodeConf().valueOr:
|
|
raiseAssert error
|
|
edgeConf.mode = Edge
|
|
edgeConf.listenAddress = parseIpAddress("0.0.0.0")
|
|
edgeConf.tcpPort = Port(0)
|
|
edgeConf.discv5UdpPort = Port(0)
|
|
edgeConf.clusterId = some(3'u16)
|
|
edgeConf.maxMessageSize = "150 KiB"
|
|
edgeConf.rest = false
|
|
|
|
edgeWaku = (await LogosDelivery.new(edgeConf)).valueOr:
|
|
raiseAssert "Failed to create edge node: " & error
|
|
|
|
(await edgeWaku.start()).isOkOr:
|
|
raiseAssert "Failed to start edge waku: " & error
|
|
|
|
let relayReq = await RequestProtocolHealth.request(
|
|
edgeWaku.waku.brokerCtx, WakuProtocol.RelayProtocol
|
|
)
|
|
check relayReq.isOk()
|
|
check relayReq.get().healthStatus.health == HealthStatus.NOT_MOUNTED
|
|
|
|
check not edgeWaku.waku.node.wakuFilterClient.isNil()
|
|
|
|
discard await edgeWaku.stop()
|