diff --git a/packages/tests/tests/filter/subscribe-static-sharding.node.spec.ts b/packages/tests/tests/filter/subscribe-static-sharding.node.spec.ts new file mode 100644 index 0000000000..17adf742ba --- /dev/null +++ b/packages/tests/tests/filter/subscribe-static-sharding.node.spec.ts @@ -0,0 +1,119 @@ +import { createDecoder, createEncoder } from "@waku/core"; +import { LightNode } from "@waku/interfaces"; +import { Protocols, utf8ToBytes } from "@waku/sdk"; +import { createRoutingInfo, formatPubsubTopic } from "@waku/utils"; + +import { + afterEachCustom, + beforeEachCustom, + makeLogFileName, + MessageCollector, + runMultipleNodes, + ServiceNode, + ServiceNodesFleet, + tearDownNodes, + teardownNodesWithRedundancy +} from "../../src/index.js"; + +import { TestClusterId, TestContentTopic } from "./utils.js"; + +const runTests = (strictCheckNodes: boolean): void => { + describe(`Waku Filter: Subscribe: Multiple Service Nodes on Static Shard: Strict Check mode: ${strictCheckNodes}`, function () { + this.timeout(100000); + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + const staticNetworkConfig = { clusterId: 9 }; + const routingInfoShard1 = createRoutingInfo(staticNetworkConfig, { + shardId: 1 + }); + const encoderShard1 = createEncoder({ + contentTopic: TestContentTopic, + routingInfo: routingInfoShard1 + }); + const decoderShard1 = createDecoder(TestContentTopic, routingInfoShard1); + + beforeEachCustom(this, async () => { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + routingInfoShard1, + undefined, + strictCheckNodes + ); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + it("Subscribe and receive messages from 2 nwaku nodes each with different static shards", async function () { + await waku.filter.subscribe( + decoderShard1, + serviceNodes.messageCollector.callback + ); + + // Set up and start a new nwaku node on different shard + const nwaku2 = new ServiceNode(makeLogFileName(this) + "3"); + + try { + const routingInfoShard2 = createRoutingInfo(staticNetworkConfig, { + shardId: 2 + }); + const contentTopic2 = "/test/4/waku-filter/default"; + const decoderShard2 = createDecoder(contentTopic2, routingInfoShard2); + const encoderShard2 = createEncoder({ + contentTopic: contentTopic2, + routingInfo: routingInfoShard2 + }); + + const shardId = 2; + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + clusterId: TestClusterId, + shard: [shardId] + }); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); + + await nwaku2.ensureSubscriptions([ + formatPubsubTopic(TestClusterId, shardId) + ]); + + const messageCollector2 = new MessageCollector(); + + await waku.filter.subscribe(decoderShard2, messageCollector2.callback); + + // Making sure that messages are send and received for both subscriptions + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await serviceNodes.messageCollector.waitForMessages(1)) || + !(await messageCollector2.waitForMessages(1)) + ) { + await waku.lightPush.send(encoderShard1, { + payload: utf8ToBytes("M1") + }); + await waku.lightPush.send(encoderShard2, { + payload: utf8ToBytes("M2") + }); + } + + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: encoderShard1.contentTopic, + expectedPubsubTopic: routingInfoShard1.pubsubTopic, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: encoderShard2.contentTopic, + expectedPubsubTopic: routingInfoShard2.pubsubTopic, + expectedMessageText: "M2" + }); + } catch (e) { + await tearDownNodes([nwaku2], []); + } + }); + }); +}; + +[true, false].map((strictCheckNodes) => runTests(strictCheckNodes)); diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index fd2bcb5290..197387ee0b 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -7,8 +7,8 @@ import { getPublicKey, symmetric } from "@waku/message-encryption"; -import { Protocols, utf8ToBytes } from "@waku/sdk"; -import { createRoutingInfo, formatPubsubTopic } from "@waku/utils"; +import { utf8ToBytes } from "@waku/sdk"; +import { createRoutingInfo } from "@waku/utils"; import { expect } from "chai"; import { @@ -16,12 +16,8 @@ import { beforeEachCustom, delay, generateTestData, - makeLogFileName, - MessageCollector, runMultipleNodes, - ServiceNode, ServiceNodesFleet, - tearDownNodes, teardownNodesWithRedundancy, TEST_STRING, waitForConnections @@ -300,7 +296,7 @@ const runTests = (strictCheckNodes: boolean): void => { }); }); - // skiped as it fails in CI but not locally https://github.com/waku-org/js-waku/issues/2438 + // skipped as it fails in CI but not locally https://github.com/waku-org/js-waku/issues/2438 it.skip("Subscribe to 30 topics in separate streams (30 streams for Filter is limit) at once and receives messages", async function () { this.timeout(100_000); const topicCount = 30; @@ -596,104 +592,3 @@ const runTests = (strictCheckNodes: boolean): void => { }; [true, false].map((strictCheckNodes) => runTests(strictCheckNodes)); - -const runTestsStatic = (strictCheckNodes: boolean): void => { - describe(`Waku Filter: Subscribe: Multiple Service Nodes on Static Shard: Strict Check mode: ${strictCheckNodes}`, function () { - this.timeout(100000); - let waku: LightNode; - let serviceNodes: ServiceNodesFleet; - const staticNetworkConfig = { clusterId: 9 }; - const routingInfoShard1 = createRoutingInfo(staticNetworkConfig, { - shardId: 1 - }); - const encoderShard1 = createEncoder({ - contentTopic: TestContentTopic, - routingInfo: routingInfoShard1 - }); - const decoderShard1 = createDecoder(TestContentTopic, routingInfoShard1); - - beforeEachCustom(this, async () => { - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - routingInfoShard1, - undefined, - strictCheckNodes - ); - }); - - afterEachCustom(this, async () => { - await teardownNodesWithRedundancy(serviceNodes, waku); - }); - - it("Subscribe and receive messages from 2 nwaku nodes each with different static shards", async function () { - await waku.filter.subscribe( - decoderShard1, - serviceNodes.messageCollector.callback - ); - - // Set up and start a new nwaku node on different shard - const nwaku2 = new ServiceNode(makeLogFileName(this) + "3"); - - try { - const routingInfoShard2 = createRoutingInfo(staticNetworkConfig, { - shardId: 2 - }); - const contentTopic2 = "/test/4/waku-filter/default"; - const decoderShard2 = createDecoder(contentTopic2, routingInfoShard2); - const encoderShard2 = createEncoder({ - contentTopic: contentTopic2, - routingInfo: routingInfoShard2 - }); - - const shardId = 2; - await nwaku2.start({ - filter: true, - lightpush: true, - relay: true, - clusterId: TestClusterId, - shard: [shardId] - }); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); - - await nwaku2.ensureSubscriptions([ - formatPubsubTopic(TestClusterId, shardId) - ]); - - const messageCollector2 = new MessageCollector(); - - await waku.filter.subscribe(decoderShard2, messageCollector2.callback); - - // Making sure that messages are send and received for both subscriptions - // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 - while ( - !(await serviceNodes.messageCollector.waitForMessages(1)) || - !(await messageCollector2.waitForMessages(1)) - ) { - await waku.lightPush.send(encoderShard1, { - payload: utf8ToBytes("M1") - }); - await waku.lightPush.send(encoderShard2, { - payload: utf8ToBytes("M2") - }); - } - - serviceNodes.messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: encoderShard1.contentTopic, - expectedPubsubTopic: routingInfoShard1.pubsubTopic, - expectedMessageText: "M1" - }); - - messageCollector2.verifyReceivedMessage(0, { - expectedContentTopic: encoderShard2.contentTopic, - expectedPubsubTopic: routingInfoShard2.pubsubTopic, - expectedMessageText: "M2" - }); - } catch (e) { - await tearDownNodes([nwaku2], []); - } - }); - }); -}; - -[true, false].map((strictCheckNodes) => runTestsStatic(strictCheckNodes));