diff --git a/tests/api/test_api_subscription.nim b/tests/api/test_api_subscription.nim index 770dcd12d..5b8ccd921 100644 --- a/tests/api/test_api_subscription.nim +++ b/tests/api/test_api_subscription.nim @@ -1,6 +1,6 @@ {.used.} -import std/[strutils, net, options] +import std/[strutils, net, options, sets] import chronos, testutils/unittests, stew/byteutils import libp2p/[peerid, peerinfo, multiaddress, crypto/crypto] import ../testlib/[common, wakucore, wakunode, testasync] @@ -20,7 +20,6 @@ import waku/api/api_conf, waku/factory/waku_conf const TestTimeout = chronos.seconds(10) const NegativeTestTimeout = chronos.seconds(2) -const DefaultShard = PubsubTopic("/waku/2/rs/1/0") type ReceiveEventListenerManager = ref object brokerCtx: BrokerContext @@ -59,14 +58,21 @@ proc waitForEvents( ): Future[bool] {.async.} = return await manager.receivedEvent.wait().withTimeout(timeout) -proc createApiNodeConf(mode: WakuMode = WakuMode.Core): NodeConfig = +type TestNetwork = ref object + publisher: WakuNode + subscriber: Waku + publisherPeerInfo: RemotePeerInfo + +proc createApiNodeConf( + mode: WakuMode = WakuMode.Core, numShards: uint16 = 1 +): NodeConfig = let netConf = NetworkingConfig(listenIpv4: "0.0.0.0", p2pTcpPort: 0, discv5UdpPort: 0) result = NodeConfig.init( mode = mode, protocolsConfig = ProtocolsConfig.init( entryNodes = @[], clusterId = 1, - autoShardingConfig = AutoShardingConfig(numShardsInCluster: 1), + autoShardingConfig = AutoShardingConfig(numShardsInCluster: numShards), ), networkingConfig = netConf, p2pReliability = true, @@ -79,241 +85,315 @@ proc setupSubscriberNode(conf: NodeConfig): Future[Waku] {.async.} = (await startWaku(addr node)).expect("Failed to start subscriber node") return node -proc waitForMesh*(node: WakuNode, shard: PubsubTopic) {.async.} = +proc setupNetwork( + numShards: uint16 = 1, mode: WakuMode = WakuMode.Core +): Future[TestNetwork] {.async.} = + var net = TestNetwork() + + lockNewGlobalBrokerContext: + net.publisher = + newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) + net.publisher.mountMetadata(1, @[0'u16]).expect("Failed to mount metadata") + (await net.publisher.mountRelay()).expect("Failed to mount relay") + await net.publisher.mountLibp2pPing() + await net.publisher.start() + + net.publisherPeerInfo = net.publisher.peerInfo.toRemotePeerInfo() + + proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = + discard + + # Subscribe the publisher to all shards to guarantee a GossipSub mesh with the subscriber. + # Currently, Core/Relay nodes auto-subscribe to all network shards on boot, but if + # that changes, this will be needed to cause the publisher to have shard interest + # for any shards the subscriber may want to use, which is required for waitForMesh to work. + for i in 0 ..< numShards.int: + let shard = PubsubTopic("/waku/2/rs/1/" & $i) + net.publisher.subscribe((kind: PubsubSub, topic: shard), dummyHandler).expect( + "Failed to sub publisher" + ) + + net.subscriber = await setupSubscriberNode(createApiNodeConf(mode, numShards)) + + await net.subscriber.node.connectToNodes(@[net.publisherPeerInfo]) + + return net + +proc teardown(net: TestNetwork) {.async.} = + if not isNil(net.subscriber): + (await net.subscriber.stop()).expect("Failed to stop subscriber node") + net.subscriber = nil + + if not isNil(net.publisher): + await net.publisher.stop() + net.publisher = nil + +proc getRelayShard(node: WakuNode, contentTopic: ContentTopic): PubsubTopic = + let autoSharding = node.wakuAutoSharding.get() + let shardObj = autoSharding.getShard(contentTopic).expect("Failed to get shard") + return PubsubTopic($shardObj) + +proc waitForMesh(node: WakuNode, shard: PubsubTopic) {.async.} = for _ in 0 ..< 50: if node.wakuRelay.getNumPeersInMesh(shard).valueOr(0) > 0: return await sleepAsync(100.milliseconds) - raise newException(ValueError, "GossipSub Mesh failed to stabilize") + raise newException(ValueError, "GossipSub Mesh failed to stabilize on " & shard) -proc publishWhenMeshReady( - publisher: WakuNode, - pubsubTopic: PubsubTopic, - contentTopic: ContentTopic, - payload: seq[byte], +proc publishToMesh( + net: TestNetwork, contentTopic: ContentTopic, payload: seq[byte] ): Future[Result[int, string]] {.async.} = - await waitForMesh(publisher, pubsubTopic) + let shard = net.subscriber.node.getRelayShard(contentTopic) + + await waitForMesh(net.publisher, shard) let msg = WakuMessage( payload: payload, contentTopic: contentTopic, version: 0, timestamp: now() ) - return await publisher.publish(some(pubsubTopic), msg) + return await net.publisher.publish(some(shard), msg) suite "Messaging API, SubscriptionService": - var - publisherNode {.threadvar.}: WakuNode - publisherPeerInfo {.threadvar.}: RemotePeerInfo - publisherPeerId {.threadvar.}: PeerId - - subscriberNode {.threadvar.}: Waku - - asyncSetup: - lockNewGlobalBrokerContext: - publisherNode = - newTestWakuNode(generateSecp256k1Key(), parseIpAddress("0.0.0.0"), Port(0)) - - publisherNode.mountMetadata(1, @[0'u16]).expect("Failed to mount metadata") - (await publisherNode.mountRelay()).expect("Failed to mount relay") - await publisherNode.mountLibp2pPing() - await publisherNode.start() - - publisherPeerInfo = publisherNode.peerInfo.toRemotePeerInfo() - publisherPeerId = publisherNode.peerInfo.peerId - - proc dummyHandler(topic: PubsubTopic, msg: WakuMessage) {.async, gcsafe.} = - discard - - publisherNode.subscribe((kind: PubsubSub, topic: DefaultShard), dummyHandler).expect( - "Failed to subscribe publisherNode" - ) - - asyncTeardown: - if not subscriberNode.isNil(): - (await subscriberNode.stop()).expect("Failed to stop subscriber node") - subscriberNode = nil - - if not publisherNode.isNil(): - await publisherNode.stop() - publisherNode = nil - asyncTest "Subscription API, relay node auto subscribe and receive message": - subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) - await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) - let testTopic = ContentTopic("/waku/2/test-content/proto") + let net = await setupNetwork(1) + defer: + await net.teardown() - (await subscriberNode.subscribe(testTopic)).expect( + let testTopic = ContentTopic("/waku/2/test-content/proto") + (await net.subscriber.subscribe(testTopic)).expect( "subscriberNode failed to subscribe" ) - let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) defer: eventManager.teardown() - discard ( - await publishWhenMeshReady( - publisherNode, DefaultShard, testTopic, "Hello, world!".toBytes() - ) - ).expect("Publish failed") + discard (await net.publishToMesh(testTopic, "Hello, world!".toBytes())).expect( + "Publish failed" + ) require await eventManager.waitForEvents(TestTimeout) require eventManager.receivedMessages.len == 1 check eventManager.receivedMessages[0].contentTopic == testTopic asyncTest "Subscription API, relay node ignores unsubscribed content topics on same shard": - subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) - await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let net = await setupNetwork(1) + defer: + await net.teardown() let subbedTopic = ContentTopic("/waku/2/subbed-topic/proto") let ignoredTopic = ContentTopic("/waku/2/ignored-topic/proto") - (await subscriberNode.subscribe(subbedTopic)).expect("failed to subscribe") + (await net.subscriber.subscribe(subbedTopic)).expect("failed to subscribe") - let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) defer: eventManager.teardown() - discard ( - await publishWhenMeshReady( - publisherNode, DefaultShard, ignoredTopic, "Ghost Msg".toBytes() - ) - ).expect("Publish failed") + discard (await net.publishToMesh(ignoredTopic, "Ghost Msg".toBytes())).expect( + "Publish failed" + ) check not await eventManager.waitForEvents(NegativeTestTimeout) check eventManager.receivedMessages.len == 0 asyncTest "Subscription API, relay node unsubscribe stops message receipt": - subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) - await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let net = await setupNetwork(1) + defer: + await net.teardown() + let testTopic = ContentTopic("/waku/2/unsub-test/proto") - (await subscriberNode.subscribe(testTopic)).expect("failed to subscribe") - subscriberNode.unsubscribe(testTopic).expect("failed to unsubscribe") + (await net.subscriber.subscribe(testTopic)).expect("failed to subscribe") + net.subscriber.unsubscribe(testTopic).expect("failed to unsubscribe") - let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) defer: eventManager.teardown() - discard ( - await publishWhenMeshReady( - publisherNode, DefaultShard, testTopic, "Should be dropped".toBytes() - ) - ).expect("Publish failed") + discard (await net.publishToMesh(testTopic, "Should be dropped".toBytes())).expect( + "Publish failed" + ) check not await eventManager.waitForEvents(NegativeTestTimeout) check eventManager.receivedMessages.len == 0 asyncTest "Subscription API, overlapping topics on same shard maintain correct isolation": - subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) - await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let net = await setupNetwork(1) + defer: + await net.teardown() let topicA = ContentTopic("/waku/2/topic-a/proto") let topicB = ContentTopic("/waku/2/topic-b/proto") - (await subscriberNode.subscribe(topicA)).expect("failed to sub A") - (await subscriberNode.subscribe(topicB)).expect("failed to sub B") + (await net.subscriber.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.subscribe(topicB)).expect("failed to sub B") - let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) defer: eventManager.teardown() - await waitForMesh(publisherNode, DefaultShard) + net.subscriber.unsubscribe(topicA).expect("failed to unsub A") - subscriberNode.unsubscribe(topicA).expect("failed to unsub A") - - discard ( - await publisherNode.publish( - some(DefaultShard), - WakuMessage( - payload: "Dropped Message".toBytes(), - contentTopic: topicA, - version: 0, - timestamp: now(), - ), - ) - ).expect("Publish A failed") - - discard ( - await publisherNode.publish( - some(DefaultShard), - WakuMessage( - payload: "Kept Msg".toBytes(), - contentTopic: topicB, - version: 0, - timestamp: now(), - ), - ) - ).expect("Publish B failed") + discard (await net.publishToMesh(topicA, "Dropped Message".toBytes())).expect( + "Publish A failed" + ) + discard + (await net.publishToMesh(topicB, "Kept Msg".toBytes())).expect("Publish B failed") require await eventManager.waitForEvents(TestTimeout) require eventManager.receivedMessages.len == 1 check eventManager.receivedMessages[0].contentTopic == topicB asyncTest "Subscription API, redundant subs tolerated and subs are removed": - subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) - await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let net = await setupNetwork(1) + defer: + await net.teardown() + let glitchTopic = ContentTopic("/waku/2/glitch/proto") - (await subscriberNode.subscribe(glitchTopic)).expect("failed to sub") - (await subscriberNode.subscribe(glitchTopic)).expect("failed to double sub") - subscriberNode.unsubscribe(glitchTopic).expect("failed to unsub") + (await net.subscriber.subscribe(glitchTopic)).expect("failed to sub") + (await net.subscriber.subscribe(glitchTopic)).expect("failed to double sub") + net.subscriber.unsubscribe(glitchTopic).expect("failed to unsub") - let eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) defer: eventManager.teardown() - discard ( - await publishWhenMeshReady( - publisherNode, DefaultShard, glitchTopic, "Ghost Msg".toBytes() - ) - ).expect("Publish failed") + discard (await net.publishToMesh(glitchTopic, "Ghost Msg".toBytes())).expect( + "Publish failed" + ) check not await eventManager.waitForEvents(NegativeTestTimeout) check eventManager.receivedMessages.len == 0 asyncTest "Subscription API, resubscribe to an unsubscribed topic": - subscriberNode = await setupSubscriberNode(createApiNodeConf(WakuMode.Core)) - await subscriberNode.node.connectToNodes(@[publisherPeerInfo]) + let net = await setupNetwork(1) + defer: + await net.teardown() + let testTopic = ContentTopic("/waku/2/resub-test/proto") # Subscribe - (await subscriberNode.subscribe(testTopic)).expect("Initial sub failed") + (await net.subscriber.subscribe(testTopic)).expect("Initial sub failed") - var eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) - discard ( - await publishWhenMeshReady( - publisherNode, DefaultShard, testTopic, "Msg 1".toBytes() - ) - ).expect("Pub 1 failed") + var eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) + discard + (await net.publishToMesh(testTopic, "Msg 1".toBytes())).expect("Pub 1 failed") require await eventManager.waitForEvents(TestTimeout) eventManager.teardown() # Unsubscribe and verify teardown - subscriberNode.unsubscribe(testTopic).expect("Unsub failed") - eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + net.subscriber.unsubscribe(testTopic).expect("Unsub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) - discard ( - await publisherNode.publish( - some(DefaultShard), - WakuMessage( - payload: "Ghost".toBytes(), - contentTopic: testTopic, - version: 0, - timestamp: now(), - ), - ) - ).expect("Ghost pub failed") + discard + (await net.publishToMesh(testTopic, "Ghost".toBytes())).expect("Ghost pub failed") check not await eventManager.waitForEvents(NegativeTestTimeout) eventManager.teardown() # Resubscribe - (await subscriberNode.subscribe(testTopic)).expect("Resub failed") - eventManager = newReceiveEventListenerManager(subscriberNode.brokerCtx, 1) + (await net.subscriber.subscribe(testTopic)).expect("Resub failed") + eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 1) - discard ( - await publishWhenMeshReady( - publisherNode, DefaultShard, testTopic, "Msg 2".toBytes() - ) - ).expect("Pub 2 failed") + discard + (await net.publishToMesh(testTopic, "Msg 2".toBytes())).expect("Pub 2 failed") require await eventManager.waitForEvents(TestTimeout) check eventManager.receivedMessages[0].payload == "Msg 2".toBytes() + + asyncTest "Subscription API, two content topics in different shards": + let net = await setupNetwork(8) + defer: + await net.teardown() + + var topicA = ContentTopic("/appA/2/shard-test-a/proto") + var topicB = ContentTopic("/appB/2/shard-test-b/proto") + + # generate two content topics that land in two different shards + var i = 0 + while net.subscriber.node.getRelayShard(topicA) == + net.subscriber.node.getRelayShard(topicB): + topicB = ContentTopic("/appB" & $i & "/2/shard-test-b/proto") + inc i + + (await net.subscriber.subscribe(topicA)).expect("failed to sub A") + (await net.subscriber.subscribe(topicB)).expect("failed to sub B") + + let eventManager = newReceiveEventListenerManager(net.subscriber.brokerCtx, 2) + defer: + eventManager.teardown() + + discard (await net.publishToMesh(topicA, "Msg on Shard A".toBytes())).expect( + "Publish A failed" + ) + discard (await net.publishToMesh(topicB, "Msg on Shard B".toBytes())).expect( + "Publish B failed" + ) + + require await eventManager.waitForEvents(TestTimeout) + require eventManager.receivedMessages.len == 2 + + asyncTest "Subscription API, many content topics in many shards": + let net = await setupNetwork(8) + defer: + await net.teardown() + + var allTopics: seq[ContentTopic] + for i in 0 ..< 100: + allTopics.add(ContentTopic("/stress-app-" & $i & "/2/state-test/proto")) + + var activeSubs: seq[ContentTopic] + + proc verifyNetworkState(expected: seq[ContentTopic]) {.async.} = + let eventManager = + newReceiveEventListenerManager(net.subscriber.brokerCtx, expected.len) + + for topic in allTopics: + discard (await net.publishToMesh(topic, "Stress Payload".toBytes())).expect( + "publish failed" + ) + + require await eventManager.waitForEvents(TestTimeout) + + # here we just give a chance for any messages that we don't expect to arrive + await sleepAsync(1.seconds) + eventManager.teardown() + + # weak check (but catches most bugs) + require eventManager.receivedMessages.len == expected.len + + # strict expected receipt test + var receivedTopics = initHashSet[ContentTopic]() + for msg in eventManager.receivedMessages: + receivedTopics.incl(msg.contentTopic) + var expectedTopics = initHashSet[ContentTopic]() + for t in expected: + expectedTopics.incl(t) + + check receivedTopics == expectedTopics + + # subscribe to all content topics we generated + for t in allTopics: + (await net.subscriber.subscribe(t)).expect("sub failed") + activeSubs.add(t) + + await verifyNetworkState(activeSubs) + + # unsubscribe from some content topics + for i in 0 ..< 50: + let t = allTopics[i] + net.subscriber.unsubscribe(t).expect("unsub failed") + + let idx = activeSubs.find(t) + if idx >= 0: + activeSubs.del(idx) + + await verifyNetworkState(activeSubs) + + # re-subscribe to some content topics + for i in 0 ..< 25: + let t = allTopics[i] + (await net.subscriber.subscribe(t)).expect("resub failed") + activeSubs.add(t) + + await verifyNetworkState(activeSubs)