More test added

This commit is contained in:
NagyZoltanPeter 2026-01-24 06:47:53 +01:00
parent 4b163ea4f7
commit 5af834b639
No known key found for this signature in database
GPG Key ID: 3E1F97CF4A7B6F42
4 changed files with 279 additions and 95 deletions

View File

@ -8,6 +8,125 @@ import
waku, waku/[waku_node, waku_core, waku_relay/protocol, common/broker/broker_context]
import waku/api/api_conf, waku/factory/waku_conf
type SendEventOutcome {.pure.} = enum
Sent
Propagated
Error
type SendEventListenerManager = ref object
brokerCtx: BrokerContext
sentListener: MessageSentEventListener
errorListener: MessageErrorEventListener
propagatedListener: MessagePropagatedEventListener
sentFuture: Future[void]
errorFuture: Future[void]
propagatedFuture: Future[void]
sentCount: int
errorCount: int
propagatedCount: int
sentRequestIds: seq[RequestId]
errorRequestIds: seq[RequestId]
propagatedRequestIds: seq[RequestId]
proc newSendEventListenerManager(brokerCtx: BrokerContext): SendEventListenerManager =
let manager = SendEventListenerManager(brokerCtx: brokerCtx)
manager.sentFuture = newFuture[void]("sentEvent")
manager.errorFuture = newFuture[void]("errorEvent")
manager.propagatedFuture = newFuture[void]("propagatedEvent")
manager.sentListener = MessageSentEvent.listen(
brokerCtx,
proc(event: MessageSentEvent) {.async: (raises: []).} =
inc manager.sentCount
manager.sentRequestIds.add(event.requestId)
echo "SENT EVENT TRIGGERED (#",
manager.sentCount, "): requestId=", event.requestId
if not manager.sentFuture.finished():
manager.sentFuture.complete()
,
).valueOr:
raiseAssert error
manager.errorListener = MessageErrorEvent.listen(
brokerCtx,
proc(event: MessageErrorEvent) {.async: (raises: []).} =
inc manager.errorCount
manager.errorRequestIds.add(event.requestId)
echo "ERROR EVENT TRIGGERED (#", manager.errorCount, "): ", event.error
if not manager.errorFuture.finished():
manager.errorFuture.fail(
newException(CatchableError, "Error event triggered: " & event.error)
)
,
).valueOr:
raiseAssert error
manager.propagatedListener = MessagePropagatedEvent.listen(
brokerCtx,
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
inc manager.propagatedCount
manager.propagatedRequestIds.add(event.requestId)
echo "PROPAGATED EVENT TRIGGERED (#",
manager.propagatedCount, "): requestId=", event.requestId
if not manager.propagatedFuture.finished():
manager.propagatedFuture.complete()
,
).valueOr:
raiseAssert error
return manager
proc teardown(manager: SendEventListenerManager) =
MessageSentEvent.dropListener(manager.brokerCtx, manager.sentListener)
MessageErrorEvent.dropListener(manager.brokerCtx, manager.errorListener)
MessagePropagatedEvent.dropListener(manager.brokerCtx, manager.propagatedListener)
proc waitForEvents(
manager: SendEventListenerManager, timeout: Duration
): Future[bool] {.async.} =
return await allFutures(
manager.sentFuture, manager.propagatedFuture, manager.errorFuture
)
.withTimeout(timeout)
proc outcomes(manager: SendEventListenerManager): set[SendEventOutcome] =
if manager.sentFuture.completed():
result.incl(SendEventOutcome.Sent)
if manager.propagatedFuture.completed():
result.incl(SendEventOutcome.Propagated)
if manager.errorFuture.failed():
result.incl(SendEventOutcome.Error)
proc validate(manager: SendEventListenerManager, expected: set[SendEventOutcome]) =
echo "EVENT COUNTS: sent=",
manager.sentCount, ", propagated=", manager.propagatedCount, ", error=",
manager.errorCount
check manager.outcomes() == expected
proc validate(
manager: SendEventListenerManager,
expected: set[SendEventOutcome],
expectedRequestId: RequestId,
) =
manager.validate(expected)
for requestId in manager.sentRequestIds:
check requestId == expectedRequestId
for requestId in manager.propagatedRequestIds:
check requestId == expectedRequestId
for requestId in manager.errorRequestIds:
check requestId == expectedRequestId
proc createApiNodeConf(): NodeConfig =
result = NodeConfig.init(
mode = WakuMode.Core,
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
),
p2pReliability = true,
)
suite "Waku API - Send":
var
relayNode1 {.threadvar.}: WakuNode
@ -113,29 +232,9 @@ suite "Waku API - Send":
)
asyncTest "Check API availability (unhealthy node)":
# Create a node config that doesn't start or has no peers
let nodeConfig = NodeConfig.init(
mode = WakuMode.Core,
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
),
)
let wakuConfRes = toWakuConf(nodeConfig)
## Then
require wakuConfRes.isOk()
let wakuConf = wakuConfRes.get()
require wakuConf.validate().isOk()
check:
wakuConf.clusterId == 1
wakuConf.shardingConf.numShardsInCluster == 1
var node: Waku
lockNewGlobalBrokerContext:
node = (await createNode(nodeConfig)).valueOr:
node = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error
(await startWaku(addr node)).isOkOr:
raiseAssert "Failed to start Waku node: " & error
@ -153,21 +252,10 @@ suite "Waku API - Send":
(await node.stop()).isOkOr:
raiseAssert "Failed to stop node: " & error
asyncTest "Check API availability (healthy node)":
# Create a node config that doesn't start or has no peers
let nodeConfig = NodeConfig.init(
mode = WakuMode.Core,
protocolsConfig = ProtocolsConfig.init(
entryNodes = @[],
clusterId = 1,
autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1),
),
p2pReliability = true,
)
asyncTest "Send fully validated":
var node: Waku
lockNewGlobalBrokerContext:
node = (await createNode(nodeConfig)).valueOr:
node = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error
(await startWaku(addr node)).isOkOr:
raiseAssert "Failed to start Waku node: " & error
@ -176,61 +264,167 @@ suite "Waku API - Send":
@[relayNode1PeerInfo, lightpushNodePeerInfo, storeNodePeerInfo]
)
let sentEventFuture = newFuture[void]("sentEvent")
let sentListener = MessageSentEvent.listen(
node.brokerCtx,
proc(event: MessageSentEvent) {.async: (raises: []).} =
echo "SENT EVENT TRIGGERED: requestId=", event.requestId
if not sentEventFuture.finished():
sentEventFuture.complete()
,
).valueOr:
raiseAssert error
let errorEventFuture = newFuture[void]("errorEvent")
let errorListener = MessageErrorEvent.listen(
node.brokerCtx,
proc(event: MessageErrorEvent) {.async: (raises: []).} =
echo "ERROR EVENT TRIGGERED: ", event.error
if not errorEventFuture.finished():
errorEventFuture.fail(
newException(CatchableError, "Error event triggered: " & event.error)
)
,
).valueOr:
raiseAssert error
let propagatedEventFuture = newFuture[void]("propagatedEvent")
let propagatedListener = MessagePropagatedEvent.listen(
node.brokerCtx,
proc(event: MessagePropagatedEvent) {.async: (raises: []).} =
echo "PROPAGATED EVENT TRIGGERED: requestId=", event.requestId
if not propagatedEventFuture.finished():
propagatedEventFuture.complete()
,
).valueOr:
raiseAssert error
let eventManager = newSendEventListenerManager(node.brokerCtx)
defer:
MessageSentEvent.dropListener(node.brokerCtx, sentListener)
MessageErrorEvent.dropListener(node.brokerCtx, errorListener)
MessagePropagatedEvent.dropListener(node.brokerCtx, propagatedListener)
eventManager.teardown()
let envelope = MessageEnvelope.init(
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let sendResult = await node.send(envelope)
check sendResult.isOk() # Depending on implementation, it might say "not healthy"
let requestId = (await node.send(envelope)).valueOr:
raiseAssert error
# Wait for events with timeout
const eventTimeout = 10.seconds
discard await allFutures(sentEventFuture, propagatedEventFuture, errorEventFuture)
.withTimeout(eventTimeout)
discard await eventManager.waitForEvents(eventTimeout)
check sentEventFuture.completed()
check propagatedEventFuture.completed()
check not errorEventFuture.failed()
eventManager.validate(
{SendEventOutcome.Sent, SendEventOutcome.Propagated}, requestId
)
(await node.stop()).isOkOr:
raiseAssert "Failed to stop node: " & error
asyncTest "Send only propagates":
var node: Waku
lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error
(await startWaku(addr node)).isOkOr:
raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[relayNode1PeerInfo])
let eventManager = newSendEventListenerManager(node.brokerCtx)
defer:
eventManager.teardown()
let envelope = MessageEnvelope.init(
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let requestId = (await node.send(envelope)).valueOr:
raiseAssert error
# Wait for events with timeout
const eventTimeout = 10.seconds
discard await eventManager.waitForEvents(eventTimeout)
eventManager.validate({SendEventOutcome.Propagated}, requestId)
(await node.stop()).isOkOr:
raiseAssert "Failed to stop node: " & error
asyncTest "Send only propagates fallback to lightpush":
var node: Waku
lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error
(await startWaku(addr node)).isOkOr:
raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[lightpushNodePeerInfo])
let eventManager = newSendEventListenerManager(node.brokerCtx)
defer:
eventManager.teardown()
let envelope = MessageEnvelope.init(
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let requestId = (await node.send(envelope)).valueOr:
raiseAssert error
# Wait for events with timeout
const eventTimeout = 10.seconds
discard await eventManager.waitForEvents(eventTimeout)
eventManager.validate({SendEventOutcome.Propagated}, requestId)
(await node.stop()).isOkOr:
raiseAssert "Failed to stop node: " & error
asyncTest "Send fully validates fallback to lightpush":
var node: Waku
lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error
(await startWaku(addr node)).isOkOr:
raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[lightpushNodePeerInfo, storeNodePeerInfo])
let eventManager = newSendEventListenerManager(node.brokerCtx)
defer:
eventManager.teardown()
let envelope = MessageEnvelope.init(
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let requestId = (await node.send(envelope)).valueOr:
raiseAssert error
# Wait for events with timeout
const eventTimeout = 10.seconds
discard await eventManager.waitForEvents(eventTimeout)
eventManager.validate(
{SendEventOutcome.Propagated, SendEventOutcome.Sent}, requestId
)
(await node.stop()).isOkOr:
raiseAssert "Failed to stop node: " & error
asyncTest "Send fails with event":
var fakeLightpushNode: WakuNode
lockNewGlobalBrokerContext:
fakeLightpushNode =
newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0))
fakeLightpushNode.mountMetadata(1, @[0'u16]).isOkOr:
raiseAssert "Failed to mount metadata: " & error
(await fakeLightpushNode.mountRelay()).isOkOr:
raiseAssert "Failed to mount relay"
(await fakeLightpushNode.mountLightPush()).isOkOr:
raiseAssert "Failed to mount lightpush"
await fakeLightpushNode.mountLibp2pPing()
await fakeLightpushNode.start()
let fakeLightpushNodePeerInfo = fakeLightpushNode.peerInfo.toRemotePeerInfo()
proc dummyHandler(
topic: PubsubTopic, msg: WakuMessage
): Future[void] {.async, gcsafe.} =
discard
fakeLightpushNode.subscribe(
(kind: PubsubSub, topic: PubsubTopic("/waku/2/rs/1/0")), dummyHandler
).isOkOr:
raiseAssert "Failed to subscribe fakeLightpushNode: " & error
var node: Waku
lockNewGlobalBrokerContext:
node = (await createNode(createApiNodeConf())).valueOr:
raiseAssert error
(await startWaku(addr node)).isOkOr:
raiseAssert "Failed to start Waku node: " & error
await node.node.connectToNodes(@[fakeLightpushNodePeerInfo])
let eventManager = newSendEventListenerManager(node.brokerCtx)
defer:
eventManager.teardown()
let envelope = MessageEnvelope.init(
ContentTopic("/waku/2/default-content/proto"), "test payload"
)
let requestId = (await node.send(envelope)).valueOr:
raiseAssert error
# Wait for events with timeout
const eventTimeout = 10.seconds
discard await eventManager.waitForEvents(eventTimeout)
eventManager.validate({SendEventOutcome.Error}, requestId)
(await node.stop()).isOkOr:
raiseAssert "Failed to stop node: " & error

View File

@ -55,7 +55,7 @@ proc send*(
): Future[Result[RequestId, string]] {.async.} =
?checkApiAvailability(w)
let requestId = newRequestId(w.rng)
let requestId = RequestId.new(w.rng)
let deliveryTask = DeliveryTask.create(requestId, envelope, w.brokerCtx).valueOr:
return err("API send: Failed to create delivery task: " & error)

View File

@ -1,13 +0,0 @@
{.push raises: [].}
import bearssl/rand
import waku/utils/requests as request_utils
import ./types
proc newRequestId*(rng: ref HmacDrbgContext): RequestId =
## Generate a new RequestId using the provided RNG.
RequestId(request_utils.generateRequestId(rng))
{.pop.}

View File

@ -22,13 +22,16 @@ type
MinimallyHealthy
Unhealthy
proc newRequestId*(rng: ref HmacDrbgContext): RequestId =
proc new*(T: typedesc[RequestId], rng: ref HmacDrbgContext): T =
## Generate a new RequestId using the provided RNG.
RequestId(request_utils.generateRequestId(rng))
proc `$`*(r: RequestId): string {.inline.} =
string(r)
proc `==`*(a, b: RequestId): bool {.inline.} =
string(a) == string(b)
proc init*(
T: type MessageEnvelope,
contentTopic: ContentTopic,
@ -48,7 +51,7 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage =
contentTopic: envelope.contentTopic,
payload: envelope.payload,
ephemeral: envelope.ephemeral,
timestamp: getNanosecondTime(getTime().toUnixFloat()),
timestamp: getNowInNanosecondTime(),
)
## TODO: First find out if proof is needed at all