diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index 02e6c7668a..7e5fdd71fe 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -16,24 +16,20 @@ import { } from "@waku/utils"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; -import { Context } from "mocha"; import { afterEachCustom, beforeEachCustom, - makeLogFileName, - MessageCollector, - ServiceNode, + runMultipleNodes, + ServiceNodesFleet, tearDownNodes } from "../../../src/index.js"; -import { messageText, runNodes } from "../utils.js"; +import { messageText } from "../utils.js"; describe("Waku Light Push : Multiple PubsubTopics", function () { this.timeout(30000); let waku: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; + let serviceNodes: ServiceNodesFleet; const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] }; const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 }; @@ -55,13 +51,19 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { let node1PeerId: PeerId; beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, shardInfo); - messageCollector = new MessageCollector(nwaku); - node1PeerId = await nwaku.getPeerId(); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfo, + undefined, + true, + 2, + true + ); + node1PeerId = await serviceNodes.nodes[0].getPeerId(); }); afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], waku); + await tearDownNodes(serviceNodes.nodes, waku); }); it("Push message on custom pubsubTopic", async function () { @@ -72,11 +74,11 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: customPubsubTopic1 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, expectedContentTopic: customContentTopic1 }); @@ -92,50 +94,48 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); - const messageCollector2 = new MessageCollector(nwaku); - expect( - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: customPubsubTopic1 }) ).to.eq(true); expect( - await messageCollector2.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: customPubsubTopic2 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: customPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(1, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: customPubsubTopic2 }); }); - it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { - // Set up and start a new nwaku node with Default PubsubTopic - nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); - await nwaku2.start({ - filter: true, - lightpush: true, - relay: true, - pubsubTopic: [singleShardInfoToPubsubTopic(singleShardInfo2)], - clusterId: singleShardInfo2.clusterId - }); - await nwaku2.ensureSubscriptions([ + it("Light push messages to 2 service nodes each with different pubsubtopics", async function () { + const [serviceNodes2, waku2] = await runMultipleNodes( + this.ctx, + { + clusterId: singleShardInfo2.clusterId, + shards: [singleShardInfo2.shard!] + }, + undefined, + true, + 1 + ); + + await serviceNodes2.nodes[0].ensureSubscriptions([ singleShardInfoToPubsubTopic(singleShardInfo2) ]); - await waku.dial(await nwaku2.getMultiaddrWithId()); + await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); - const messageCollector2 = new MessageCollector(nwaku2); - await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); @@ -143,33 +143,33 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { payload: utf8ToBytes("M2") }); - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: customPubsubTopic1 }); - - await messageCollector2.waitForMessages(1, { + await serviceNodes2.messageCollector.waitForMessages(1, { pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) }); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: customPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes2.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) }); + + // Clean up second fleet + await tearDownNodes(serviceNodes2.nodes, waku2); }); }); describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { this.timeout(30000); let waku: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; + let serviceNodes: ServiceNodesFleet; const clusterId = 4; const customContentTopic1 = "/waku/2/content/test.js"; @@ -198,13 +198,19 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { let node1PeerId: PeerId; beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, shardInfo); - messageCollector = new MessageCollector(nwaku); - node1PeerId = await nwaku.getPeerId(); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfo, + undefined, + true, + 2, + true + ); + node1PeerId = await serviceNodes.nodes[0].getPeerId(); }); afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], waku); + await tearDownNodes(serviceNodes.nodes, waku); }); it("Push message on custom pubsubTopic", async function () { @@ -212,15 +218,14 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { payload: utf8ToBytes(messageText) }); - expect(pushResponse.failures).to.be.empty; expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic1 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, expectedContentTopic: customContentTopic1 }); @@ -236,47 +241,45 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); - const messageCollector2 = new MessageCollector(nwaku); - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic1 }) ).to.eq(true); expect( - await messageCollector2.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic2 + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: autoshardingPubsubTopic2 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(1, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); }); - it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { - // Set up and start a new nwaku node with Default PubsubTopic - nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); - await nwaku2.start({ - filter: true, - lightpush: true, - relay: true, - pubsubTopic: [autoshardingPubsubTopic2], - clusterId: shardInfo.clusterId - }); - await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + it("Light push messages to 2 service nodes each with different pubsubtopics", async function () { + // Create a second fleet for the second pubsub topic + const [serviceNodes2, waku2] = await runMultipleNodes( + this.ctx, + { clusterId, contentTopics: [customContentTopic2] }, + undefined, + true, + 1 // Only need one node for second fleet + ); - const messageCollector2 = new MessageCollector(nwaku2); + await serviceNodes2.nodes[0].ensureSubscriptionsAutosharding([ + customContentTopic2 + ]); + await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId()); + await waku.waitForPeers([Protocols.LightPush]); await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") @@ -285,34 +288,33 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { payload: utf8ToBytes("M2") }); - await messageCollector.waitForMessagesAutosharding(1, { + await serviceNodes.messageCollector.waitForMessagesAutosharding(1, { contentTopic: customContentTopic1 }); - await messageCollector2.waitForMessagesAutosharding(1, { + await serviceNodes2.messageCollector.waitForMessagesAutosharding(1, { contentTopic: customContentTopic2 }); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes2.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); + + // Clean up second fleet + await tearDownNodes(serviceNodes2.nodes, waku2); }); }); describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () { this.timeout(30000); let waku: LightNode; - let waku2: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; - let ctx: Context; + let serviceNodes: ServiceNodesFleet; const clusterId = 3; const customContentTopic1 = "/waku/2/content/utf8"; @@ -355,14 +357,19 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () let node1PeerId: PeerId; beforeEachCustom(this, async () => { - ctx = this.ctx; - [nwaku, waku] = await runNodes(ctx, testShardInfo); - messageCollector = new MessageCollector(nwaku); - node1PeerId = await nwaku.getPeerId(); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + testShardInfo, + undefined, + true, + 2, + true + ); + node1PeerId = await serviceNodes.nodes[0].getPeerId(); }); afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], [waku, waku2]); + await tearDownNodes(serviceNodes.nodes, waku); }); it("Push message on custom pubsubTopic", async function () { @@ -373,11 +380,11 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); expect( - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic1 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, expectedContentTopic: customContentTopic1 }); @@ -393,68 +400,71 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); - const messageCollector2 = new MessageCollector(nwaku); - expect( - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic1 }) ).to.eq(true); expect( - await messageCollector2.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic2 }) ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(1, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); }); - it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { - // Set up and start a new nwaku node with Default PubsubTopic - [nwaku2] = await runNodes(ctx, shardInfo2); + it("Light push messages to 2 service nodes each with different pubsubtopics", async function () { + const [serviceNodes2, waku2] = await runMultipleNodes( + this.ctx, + shardInfo2, + undefined, + true, + 1 + ); - await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]); - await waku.dial(await nwaku2.getMultiaddrWithId()); + await serviceNodes2.nodes[0].ensureSubscriptions([ + autoshardingPubsubTopic2 + ]); + await waku.dial(await serviceNodes2.nodes[0].getMultiaddrWithId()); await waku.waitForPeers([Protocols.LightPush]); - const messageCollector2 = new MessageCollector(nwaku2); - - const { failures: f1 } = await waku.lightPush.send(customEncoder1, { + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - const { failures: f2 } = await waku.lightPush.send(customEncoder2, { + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - expect(f1).to.be.empty; - expect(f2).to.be.empty; - - await messageCollector.waitForMessages(1, { + await serviceNodes.messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic1 }); - await messageCollector2.waitForMessages(1, { + await serviceNodes2.messageCollector.waitForMessages(1, { pubsubTopic: autoshardingPubsubTopic2 }); - messageCollector.verifyReceivedMessage(0, { + serviceNodes.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, expectedPubsubTopic: autoshardingPubsubTopic1 }); - messageCollector2.verifyReceivedMessage(0, { + serviceNodes2.messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, expectedPubsubTopic: autoshardingPubsubTopic2 }); + + // Clean up second fleet + await tearDownNodes(serviceNodes2.nodes, waku2); }); });