diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim index 42b3937fc..5e14f88dd 100644 --- a/tests/api/test_api_subscription.nim +++ b/tests/api/test_api_subscription.nim @@ -1,45 +1,53 @@ {.used.} -import std/strutils +import std/[strutils, net, options] import chronos, testutils/unittests, stew/byteutils -import ../testlib/[common, testasync] +import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto] +import ../testlib/[common, wakucore, wakunode, testasync] + import waku, waku/[waku_node, waku_core, common/broker/broker_context, events/message_events] import waku/api/api_conf, waku/factory/waku_conf +const TestTimeout = chronos.seconds(10) +const DefaultShard = PubsubTopic("/waku/2/rs/1/0") + type ReceiveEventListenerManager = ref object brokerCtx: BrokerContext receivedListener: MessageReceivedEventListener - receivedFuture: Future[void] + receivedEvent: AsyncEvent receivedMessages: seq[WakuMessage] + targetCount: int proc newReceiveEventListenerManager( - brokerCtx: BrokerContext + brokerCtx: BrokerContext, expectedCount: int = 1 ): ReceiveEventListenerManager = - let manager = ReceiveEventListenerManager(brokerCtx: brokerCtx, receivedMessages: @[]) - manager.receivedFuture = newFuture[void]("receivedEvent") + let manager = ReceiveEventListenerManager( + brokerCtx: brokerCtx, receivedMessages: @[], targetCount: expectedCount + ) + manager.receivedEvent = newAsyncEvent() - manager.receivedListener = MessageReceivedEvent.listen( - brokerCtx, - proc(event: MessageReceivedEvent) {.async: (raises: []).} = - manager.receivedMessages.add(event.message) - echo "RECEIVED EVENT TRIGGERED: contentTopic=", event.message.contentTopic + manager.receivedListener = MessageReceivedEvent + .listen( + brokerCtx, + proc(event: MessageReceivedEvent) {.async: (raises: []).} = + manager.receivedMessages.add(event.message) - if not manager.receivedFuture.finished(): - manager.receivedFuture.complete() - , - ).valueOr: - raiseAssert error + if manager.receivedMessages.len >= manager.targetCount: + manager.receivedEvent.fire() + , + ) + .expect("Failed to listen to MessageReceivedEvent") return manager proc teardown(manager: ReceiveEventListenerManager) = MessageReceivedEvent.dropListener(manager.brokerCtx, manager.receivedListener) -proc waitForEvent( +proc waitForEvents( manager: ReceiveEventListenerManager, timeout: Duration ): Future[bool] {.async.} = - return await manager.receivedFuture.withTimeout(timeout) + return await manager.receivedEvent.wait().withTimeout(timeout) proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig = let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0) @@ -54,56 +62,99 @@ proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig = p2pReliability = true, ) -suite "Waku API - Subscription Service": - asyncTest "Subscription API, two relays with subscribe and receive message": - var node1, node2: Waku +proc setupSubscriberNode(conf: NodeConfig): Future[Waku] {.async.} = + var node: Waku + lockNewGlobalBrokerContext: + node = (await createNode(conf)).expect("Failed to create subscriber node") + (await startWaku(addr node)).expect("Failed to start subscriber node") + return node +proc publishWhenMeshReady( + publisher: WakuNode, + pubsubTopic: PubsubTopic, + contentTopic: ContentTopic, + payload: seq[byte], + maxRetries: int = 50, + retryDelay: Duration = 200.milliseconds, +): Future[Result[int, string]] {.async.} = + for _ in 0 ..< maxRetries: + let msg = WakuMessage( + payload: payload, contentTopic: contentTopic, version: 0, timestamp: now() + ) + + let publishRes = await publisher.publish(some(pubsubTopic), msg) + if publishRes.isOk() and publishRes.value > 0: + return publishRes + + await sleepAsync(retryDelay) + + return err("Timed out waiting for mesh") + +suite "Messaging API, SubscriptionService": + var + publisherNode {.threadvar.}: WakuNode + publisherPeerInfo {.threadvar.}: RemotePeerInfo + publisherPeerId {.threadvar.}: PeerId + + subscriberNode {.threadvar.}: Waku + + asyncSetup: lockNewGlobalBrokerContext: - node1 = (await createNode(createApiNodeConf())).valueOr: - raiseAssert error - (await startWaku(addr node1)).isOkOr: - raiseAssert "Failed to start node1" + publisherNode = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - lockNewGlobalBrokerContext: - node2 = (await createNode(createApiNodeConf())).valueOr: - raiseAssert error - (await startWaku(addr node2)).isOkOr: - raiseAssert "Failed to start node2" + publisherNode.mountMetadata(1, @[0'u16]).expect("Failed to mount metadata") + (await publisherNode.mountRelay()).expect("Failed to mount relay") + await publisherNode.mountLibp2pPing() + await publisherNode.start() - let node2PeerInfo = node2.node.peerInfo.toRemotePeerInfo() - await node1.node.connectToNodes(@[node2PeerInfo]) + publisherPeerInfo = publisherNode.peerInfo.toRemotePeerInfo() + publisherPeerId = publisherNode.peerInfo.peerId - await sleepAsync(2.seconds) + proc dummyHandler( + topic: PubsubTopic, msg: WakuMessage + ): Future[void] {.async, gcsafe.} = + discard + publisherNode.subscribe((kind: PubsubSub, topic: DefaultShard), dummyHandler).expect( + "Failed to subscribe publisherNode" + ) + + asyncTeardown: + if not subscriberNode.isNil(): + (await subscriberNode.stop()).expect("Failed to stop subscriber node") + subscriberNode = nil + + if not publisherNode.isNil(): + await publisherNode.stop() + publisherNode = nil + + asyncTest "Subscription API, relay node auto subscribe and receive message": + subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) + await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) let testTopic = ContentTopic("/waku/2/test-content/proto") - (await node1.subscribe(testTopic)).isOkOr: - raiseAssert "Node1 failed to subscribe: " & error + (await subscriberNode.subscribe(testTopic)).expect( + "subscriberNode failed to subscribe" + ) - (await node2.subscribe(testTopic)).isOkOr: - raiseAssert "Node2 failed to subscribe: " & error - - await sleepAsync(2.seconds) - - let eventManager = newReceiveEventListenerManager(node2.brokerCtx) + let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) defer: eventManager.teardown() - let envelope = MessageEnvelope.init(testTopic, "hello world payload") - let sendResult = await node1.send(envelope) - check sendResult.isOk() + const testMessageStr = "Hello, world!" + let msgPayload = testMessageStr.toBytes() - const eventTimeout = 5.seconds - let receivedInTime = await eventManager.waitForEvent(eventTimeout) + discard ( + await publishWhenMeshReady(publisherNode, DefaultShard, testTopic, msgPayload) + ).expect("Timed out waiting for mesh to stabilize") - check receivedInTime == true - check eventManager.receivedMessages.len == 1 + let receivedInTime = await eventManager.waitForEvents(TestTimeout) + + # Hard abort if these conditions aren't met to prevent an IndexDefect below + require receivedInTime + require eventManager.receivedMessages.len == 1 let receivedMsg = eventManager.receivedMessages[0] check receivedMsg.contentTopic == testTopic - check string.fromBytes(receivedMsg.payload) == "hello world payload" - - (await node1.stop()).isOkOr: - raiseAssert error - (await node2.stop()).isOkOr: - raiseAssert error + check string.fromBytes(receivedMsg.payload) == testMessageStr diff --git a/waku/factory/waku.nim b/waku/factory/waku.nim index dd253129c..8299439c2 100644 --- a/waku/factory/waku.nim +++ b/waku/factory/waku.nim @@ -35,6 +35,7 @@ import node/health_monitor, node/waku_metrics, node/delivery_service/delivery_service, + node/delivery_service/subscription_service, rest_api/message_cache, rest_api/endpoint/server, rest_api/endpoint/builder as rest_server_builder, @@ -416,6 +417,37 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: if not waku[].deliveryService.isNil(): waku[].deliveryService.startDeliveryService() + ## Subscription Service + if not waku.node.wakuRelay.isNil() and not waku.deliveryService.isNil(): + let subService = waku.deliveryService.subscriptionService + + if waku.node.wakuAutoSharding.isSome(): + # Subscribe relay to all shards in autosharding. + let autoSharding = waku.node.wakuAutoSharding.get() + let clusterId = autoSharding.clusterId + let numShards = autoSharding.shardCountGenZero + + if numShards > 0: + var clusterPubsubTopics = newSeqOfCap[PubsubTopic](numShards) + + for i in 0 ..< numShards: + let shardObj = RelayShard(clusterId: clusterId, shardId: uint16(i)) + clusterPubsubTopics.add(PubsubTopic($shardObj)) + + subService.subscribeShard(clusterPubsubTopics).isOkOr: + return err("Failed to auto-subscribe Relay to cluster shards: " & error) + else: + # Fallback to configured shards when no autosharding. + if waku.conf.subscribeShards.len > 0: + let manualShards = waku.conf.subscribeShards.mapIt( + PubsubTopic( + $(RelayShard(clusterId: waku.conf.clusterId, shardId: uint16(it))) + ) + ) + + subService.subscribeShard(manualShards).isOkOr: + return err("Failed to subscribe Relay to manual shards: " & error) + ## Health Monitor waku[].healthMonitor.startHealthMonitor().isOkOr: return err("failed to start health monitor: " & $error) @@ -450,7 +482,7 @@ proc startWaku*(waku: ptr Waku): Future[Result[void, string]] {.async: (raises: ).isOkOr: error "Failed to set RequestProtocolHealth provider", error = error - ## Setup RequestHealthReport provider (The lost child) + ## Setup RequestHealthReport provider RequestHealthReport.setProvider( globalBrokerContext(),