From 92e23234dd0a56235431050dc70d213adfb7f6a4 Mon Sep 17 00:00:00 2001 From: NagyZoltanPeter <113987313+NagyZoltanPeter@users.noreply.github.com> Date: Sat, 24 Jan 2026 06:47:53 +0100 Subject: [PATCH] More test added --- tests/api/test_api_send.nim | 352 ++++++++++++++++++++++++++++-------- waku/api/api.nim | 2 +- waku/api/request_id.nim | 13 -- waku/api/types.nim | 7 +- 4 files changed, 279 insertions(+), 95 deletions(-) delete mode 100644 waku/api/request_id.nim diff --git a/tests/api/test_api_send.nim b/tests/api/test_api_send.nim index 56c7c9e83..11a80be73 100644 --- a/tests/api/test_api_send.nim +++ b/tests/api/test_api_send.nim @@ -8,6 +8,125 @@ import waku, waku/[waku_node, waku_core, waku_relay/protocol, common/broker/broker_context] import waku/api/api_conf, waku/factory/waku_conf +type SendEventOutcome {.pure.} = enum + Sent + Propagated + Error + +type SendEventListenerManager = ref object + brokerCtx: BrokerContext + sentListener: MessageSentEventListener + errorListener: MessageErrorEventListener + propagatedListener: MessagePropagatedEventListener + sentFuture: Future[void] + errorFuture: Future[void] + propagatedFuture: Future[void] + sentCount: int + errorCount: int + propagatedCount: int + sentRequestIds: seq[RequestId] + errorRequestIds: seq[RequestId] + propagatedRequestIds: seq[RequestId] + +proc newSendEventListenerManager(brokerCtx: BrokerContext): SendEventListenerManager = + let manager = SendEventListenerManager(brokerCtx: brokerCtx) + manager.sentFuture = newFuture[void]("sentEvent") + manager.errorFuture = newFuture[void]("errorEvent") + manager.propagatedFuture = newFuture[void]("propagatedEvent") + + manager.sentListener = MessageSentEvent.listen( + brokerCtx, + proc(event: MessageSentEvent) {.async: (raises: []).} = + inc manager.sentCount + manager.sentRequestIds.add(event.requestId) + echo "SENT EVENT TRIGGERED (#", + manager.sentCount, "): requestId=", event.requestId + if not manager.sentFuture.finished(): + manager.sentFuture.complete() + , + ).valueOr: + raiseAssert error + + manager.errorListener = MessageErrorEvent.listen( + brokerCtx, + proc(event: MessageErrorEvent) {.async: (raises: []).} = + inc manager.errorCount + manager.errorRequestIds.add(event.requestId) + echo "ERROR EVENT TRIGGERED (#", manager.errorCount, "): ", event.error + if not manager.errorFuture.finished(): + manager.errorFuture.fail( + newException(CatchableError, "Error event triggered: " & event.error) + ) + , + ).valueOr: + raiseAssert error + + manager.propagatedListener = MessagePropagatedEvent.listen( + brokerCtx, + proc(event: MessagePropagatedEvent) {.async: (raises: []).} = + inc manager.propagatedCount + manager.propagatedRequestIds.add(event.requestId) + echo "PROPAGATED EVENT TRIGGERED (#", + manager.propagatedCount, "): requestId=", event.requestId + if not manager.propagatedFuture.finished(): + manager.propagatedFuture.complete() + , + ).valueOr: + raiseAssert error + + return manager + +proc teardown(manager: SendEventListenerManager) = + MessageSentEvent.dropListener(manager.brokerCtx, manager.sentListener) + MessageErrorEvent.dropListener(manager.brokerCtx, manager.errorListener) + MessagePropagatedEvent.dropListener(manager.brokerCtx, manager.propagatedListener) + +proc waitForEvents( + manager: SendEventListenerManager, timeout: Duration +): Future[bool] {.async.} = + return await allFutures( + manager.sentFuture, manager.propagatedFuture, manager.errorFuture + ) + .withTimeout(timeout) + +proc outcomes(manager: SendEventListenerManager): set[SendEventOutcome] = + if manager.sentFuture.completed(): + result.incl(SendEventOutcome.Sent) + if manager.propagatedFuture.completed(): + result.incl(SendEventOutcome.Propagated) + if manager.errorFuture.failed(): + result.incl(SendEventOutcome.Error) + +proc validate(manager: SendEventListenerManager, expected: set[SendEventOutcome]) = + echo "EVENT COUNTS: sent=", + manager.sentCount, ", propagated=", manager.propagatedCount, ", error=", + manager.errorCount + check manager.outcomes() == expected + +proc validate( + manager: SendEventListenerManager, + expected: set[SendEventOutcome], + expectedRequestId: RequestId, +) = + manager.validate(expected) + for requestId in manager.sentRequestIds: + check requestId == expectedRequestId + for requestId in manager.propagatedRequestIds: + check requestId == expectedRequestId + for requestId in manager.errorRequestIds: + check requestId == expectedRequestId + +proc createApiNodeConf(): NodeConfig = + result = NodeConfig.init( + mode = WakuMode.Core, + protocolsConfig = ProtocolsConfig.init( + entryNodes = @[], + clusterId = 1, + autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), + ), + p2pReliability = true, + ) + suite "Waku API - Send": var relayNode1 {.threadvar.}: WakuNode @@ -113,29 +232,9 @@ suite "Waku API - Send": ) asyncTest "Check API availability (unhealthy node)": - # Create a node config that doesn't start or has no peers - let nodeConfig = NodeConfig.init( - mode = WakuMode.Core, - protocolsConfig = ProtocolsConfig.init( - entryNodes = @[], - clusterId = 1, - autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), - ), - ) - - let wakuConfRes = toWakuConf(nodeConfig) - - ## Then - require wakuConfRes.isOk() - let wakuConf = wakuConfRes.get() - require wakuConf.validate().isOk() - check: - wakuConf.clusterId == 1 - wakuConf.shardingConf.numShardsInCluster == 1 - var node: Waku lockNewGlobalBrokerContext: - node = (await createNode(nodeConfig)).valueOr: + node = (await createNode(createApiNodeConf())).valueOr: raiseAssert error (await startWaku(addr node)).isOkOr: raiseAssert "Failed to start Waku node: " & error @@ -153,21 +252,10 @@ suite "Waku API - Send": (await node.stop()).isOkOr: raiseAssert "Failed to stop node: " & error - asyncTest "Check API availability (healthy node)": - # Create a node config that doesn't start or has no peers - let nodeConfig = NodeConfig.init( - mode = WakuMode.Core, - protocolsConfig = ProtocolsConfig.init( - entryNodes = @[], - clusterId = 1, - autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), - ), - p2pReliability = true, - ) - + asyncTest "Send fully validated": var node: Waku lockNewGlobalBrokerContext: - node = (await createNode(nodeConfig)).valueOr: + node = (await createNode(createApiNodeConf())).valueOr: raiseAssert error (await startWaku(addr node)).isOkOr: raiseAssert "Failed to start Waku node: " & error @@ -176,61 +264,167 @@ suite "Waku API - Send": @[relayNode1PeerInfo, lightpushNodePeerInfo, storeNodePeerInfo] ) - let sentEventFuture = newFuture[void]("sentEvent") - let sentListener = MessageSentEvent.listen( - node.brokerCtx, - proc(event: MessageSentEvent) {.async: (raises: []).} = - echo "SENT EVENT TRIGGERED: requestId=", event.requestId - if not sentEventFuture.finished(): - sentEventFuture.complete() - , - ).valueOr: - raiseAssert error - - let errorEventFuture = newFuture[void]("errorEvent") - let errorListener = MessageErrorEvent.listen( - node.brokerCtx, - proc(event: MessageErrorEvent) {.async: (raises: []).} = - echo "ERROR EVENT TRIGGERED: ", event.error - if not errorEventFuture.finished(): - errorEventFuture.fail( - newException(CatchableError, "Error event triggered: " & event.error) - ) - , - ).valueOr: - raiseAssert error - - let propagatedEventFuture = newFuture[void]("propagatedEvent") - let propagatedListener = MessagePropagatedEvent.listen( - node.brokerCtx, - proc(event: MessagePropagatedEvent) {.async: (raises: []).} = - echo "PROPAGATED EVENT TRIGGERED: requestId=", event.requestId - if not propagatedEventFuture.finished(): - propagatedEventFuture.complete() - , - ).valueOr: - raiseAssert error + let eventManager = newSendEventListenerManager(node.brokerCtx) defer: - MessageSentEvent.dropListener(node.brokerCtx, sentListener) - MessageErrorEvent.dropListener(node.brokerCtx, errorListener) - MessagePropagatedEvent.dropListener(node.brokerCtx, propagatedListener) + eventManager.teardown() let envelope = MessageEnvelope.init( ContentTopic("/waku/2/default-content/proto"), "test payload" ) - let sendResult = await node.send(envelope) - - check sendResult.isOk() # Depending on implementation, it might say "not healthy" + let requestId = (await node.send(envelope)).valueOr: + raiseAssert error # Wait for events with timeout const eventTimeout = 10.seconds - discard await allFutures(sentEventFuture, propagatedEventFuture, errorEventFuture) - .withTimeout(eventTimeout) + discard await eventManager.waitForEvents(eventTimeout) - check sentEventFuture.completed() - check propagatedEventFuture.completed() - check not errorEventFuture.failed() + eventManager.validate( + {SendEventOutcome.Sent, SendEventOutcome.Propagated}, requestId + ) (await node.stop()).isOkOr: raiseAssert "Failed to stop node: " & error + + asyncTest "Send only propagates": + var node: Waku + lockNewGlobalBrokerContext: + node = (await createNode(createApiNodeConf())).valueOr: + raiseAssert error + (await startWaku(addr node)).isOkOr: + raiseAssert "Failed to start Waku node: " & error + + await node.node.connectToNodes(@[relayNode1PeerInfo]) + + let eventManager = newSendEventListenerManager(node.brokerCtx) + defer: + eventManager.teardown() + + let envelope = MessageEnvelope.init( + ContentTopic("/waku/2/default-content/proto"), "test payload" + ) + + let requestId = (await node.send(envelope)).valueOr: + raiseAssert error + + # Wait for events with timeout + const eventTimeout = 10.seconds + discard await eventManager.waitForEvents(eventTimeout) + + eventManager.validate({SendEventOutcome.Propagated}, requestId) + + (await node.stop()).isOkOr: + raiseAssert "Failed to stop node: " & error + + asyncTest "Send only propagates fallback to lightpush": + var node: Waku + lockNewGlobalBrokerContext: + node = (await createNode(createApiNodeConf())).valueOr: + raiseAssert error + (await startWaku(addr node)).isOkOr: + raiseAssert "Failed to start Waku node: " & error + + await node.node.connectToNodes(@[lightpushNodePeerInfo]) + + let eventManager = newSendEventListenerManager(node.brokerCtx) + defer: + eventManager.teardown() + + let envelope = MessageEnvelope.init( + ContentTopic("/waku/2/default-content/proto"), "test payload" + ) + + let requestId = (await node.send(envelope)).valueOr: + raiseAssert error + + # Wait for events with timeout + const eventTimeout = 10.seconds + discard await eventManager.waitForEvents(eventTimeout) + + eventManager.validate({SendEventOutcome.Propagated}, requestId) + + (await node.stop()).isOkOr: + raiseAssert "Failed to stop node: " & error + + asyncTest "Send fully validates fallback to lightpush": + var node: Waku + lockNewGlobalBrokerContext: + node = (await createNode(createApiNodeConf())).valueOr: + raiseAssert error + (await startWaku(addr node)).isOkOr: + raiseAssert "Failed to start Waku node: " & error + + await node.node.connectToNodes(@[lightpushNodePeerInfo, storeNodePeerInfo]) + + let eventManager = newSendEventListenerManager(node.brokerCtx) + defer: + eventManager.teardown() + + let envelope = MessageEnvelope.init( + ContentTopic("/waku/2/default-content/proto"), "test payload" + ) + + let requestId = (await node.send(envelope)).valueOr: + raiseAssert error + + # Wait for events with timeout + const eventTimeout = 10.seconds + discard await eventManager.waitForEvents(eventTimeout) + + eventManager.validate( + {SendEventOutcome.Propagated, SendEventOutcome.Sent}, requestId + ) + (await node.stop()).isOkOr: + raiseAssert "Failed to stop node: " & error + + asyncTest "Send fails with event": + var fakeLightpushNode: WakuNode + lockNewGlobalBrokerContext: + fakeLightpushNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + fakeLightpushNode.mountMetadata(1, @[0'u16]).isOkOr: + raiseAssert "Failed to mount metadata: " & error + (await fakeLightpushNode.mountRelay()).isOkOr: + raiseAssert "Failed to mount relay" + (await fakeLightpushNode.mountLightPush()).isOkOr: + raiseAssert "Failed to mount lightpush" + await fakeLightpushNode.mountLibp2pPing() + await fakeLightpushNode.start() + let fakeLightpushNodePeerInfo = fakeLightpushNode.peerInfo.toRemotePeerInfo() + proc dummyHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + discard + + fakeLightpushNode.subscribe( + (kind: PubsubSub, topic: PubsubTopic("/waku/2/rs/1/0")), dummyHandler + ).isOkOr: + raiseAssert "Failed to subscribe fakeLightpushNode: " & error + + var node: Waku + lockNewGlobalBrokerContext: + node = (await createNode(createApiNodeConf())).valueOr: + raiseAssert error + (await startWaku(addr node)).isOkOr: + raiseAssert "Failed to start Waku node: " & error + + await node.node.connectToNodes(@[fakeLightpushNodePeerInfo]) + + let eventManager = newSendEventListenerManager(node.brokerCtx) + defer: + eventManager.teardown() + + let envelope = MessageEnvelope.init( + ContentTopic("/waku/2/default-content/proto"), "test payload" + ) + + let requestId = (await node.send(envelope)).valueOr: + raiseAssert error + + # Wait for events with timeout + const eventTimeout = 10.seconds + discard await eventManager.waitForEvents(eventTimeout) + + eventManager.validate({SendEventOutcome.Error}, requestId) + (await node.stop()).isOkOr: + raiseAssert "Failed to stop node: " & error diff --git a/waku/api/api.nim b/waku/api/api.nim index c741011ed..d9ec208b8 100644 --- a/waku/api/api.nim +++ b/waku/api/api.nim @@ -55,7 +55,7 @@ proc send*( ): Future[Result[RequestId, string]] {.async.} = ?checkApiAvailability(w) - let requestId = newRequestId(w.rng) + let requestId = RequestId.new(w.rng) let deliveryTask = DeliveryTask.create(requestId, envelope, w.brokerCtx).valueOr: return err("API send: Failed to create delivery task: " & error) diff --git a/waku/api/request_id.nim b/waku/api/request_id.nim deleted file mode 100644 index fab4ccfbf..000000000 --- a/waku/api/request_id.nim +++ /dev/null @@ -1,13 +0,0 @@ -{.push raises: [].} - -import bearssl/rand - -import waku/utils/requests as request_utils - -import ./types - -proc newRequestId*(rng: ref HmacDrbgContext): RequestId = - ## Generate a new RequestId using the provided RNG. - RequestId(request_utils.generateRequestId(rng)) - -{.pop.} diff --git a/waku/api/types.nim b/waku/api/types.nim index e78cd87e3..390c18230 100644 --- a/waku/api/types.nim +++ b/waku/api/types.nim @@ -22,13 +22,16 @@ type MinimallyHealthy Unhealthy -proc newRequestId*(rng: ref HmacDrbgContext): RequestId = +proc new*(T: typedesc[RequestId], rng: ref HmacDrbgContext): T = ## Generate a new RequestId using the provided RNG. RequestId(request_utils.generateRequestId(rng)) proc `$`*(r: RequestId): string {.inline.} = string(r) +proc `==`*(a, b: RequestId): bool {.inline.} = + string(a) == string(b) + proc init*( T: type MessageEnvelope, contentTopic: ContentTopic, @@ -48,7 +51,7 @@ proc toWakuMessage*(envelope: MessageEnvelope): WakuMessage = contentTopic: envelope.contentTopic, payload: envelope.payload, ephemeral: envelope.ephemeral, - timestamp: getNanosecondTime(getTime().toUnixFloat()), + timestamp: getNowInNanosecondTime(), ) ## TODO: First find out if proof is needed at all