From 3793e6f5c0072c6ba963eb070af6fd8b80dd14d3 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Fri, 11 Apr 2025 01:34:11 +0200 Subject: [PATCH] chore: update interop test suit for latest nwaku (0.35.1) (#2345) * update Filter test suit, make service nodes connected to each other, remove single node Filter test suit, use 0.35 nwaku image * update light push tests * improve auto shard tests * update static sharding test * skip blocked tests * fix test * remove usage of pusubtopic with nwaku * remove comment --- .github/workflows/ci.yml | 4 +- packages/tests/src/lib/dockerode.ts | 24 +- packages/tests/src/lib/index.ts | 46 +- packages/tests/src/lib/message_collector.ts | 4 +- packages/tests/src/lib/runNodes.ts | 2 +- packages/tests/src/lib/service_node.ts | 37 +- packages/tests/src/run-tests.js | 2 +- packages/tests/src/sync-rln-tree.js | 2 +- packages/tests/src/types.ts | 1 - packages/tests/src/utils/nodes.ts | 1 - packages/tests/tests/enr.node.spec.ts | 10 +- packages/tests/tests/ephemeral.node.spec.ts | 13 +- packages/tests/tests/filter/push.node.spec.ts | 23 + .../single_node/multiple_pubsub.node.spec.ts | 456 ----------------- .../filter/single_node/ping.node.spec.ts | 129 ----- .../filter/single_node/push.node.spec.ts | 255 ---------- .../filter/single_node/subscribe.node.spec.ts | 470 ------------------ .../single_node/unsubscribe.node.spec.ts | 209 -------- .../tests/tests/filter/single_node/utils.ts | 22 - .../tests/tests/filter/subscribe.node.spec.ts | 103 +++- .../light-push/multiple_pubsub.node.spec.ts | 141 ++++++ .../light-push/single_node/index.node.spec.ts | 274 ---------- .../single_node/multiple_pubsub.node.spec.ts | 460 ----------------- packages/tests/tests/light-push/utils.ts | 16 +- packages/tests/tests/metadata.spec.ts | 13 +- .../tests/tests/peer-exchange/index.spec.ts | 9 +- .../tests/tests/peer-exchange/query.spec.ts | 17 +- .../tests/sharding/auto_sharding.spec.ts | 456 ++++++++--------- .../tests/sharding/peer_management.spec.ts | 26 +- .../tests/sharding/static_sharding.spec.ts | 339 ++++++------- .../tests/tests/store/multiple_pubsub.spec.ts | 14 +- packages/tests/tests/utils.spec.ts | 2 +- .../tests/wait_for_remote_peer.node.spec.ts | 3 +- 33 files changed, 752 insertions(+), 2831 deletions(-) delete mode 100644 packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts delete mode 100644 packages/tests/tests/filter/single_node/ping.node.spec.ts delete mode 100644 packages/tests/tests/filter/single_node/push.node.spec.ts delete mode 100644 packages/tests/tests/filter/single_node/subscribe.node.spec.ts delete mode 100644 packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts delete mode 100644 packages/tests/tests/filter/single_node/utils.ts create mode 100644 packages/tests/tests/light-push/multiple_pubsub.node.spec.ts delete mode 100644 packages/tests/tests/light-push/single_node/index.node.spec.ts delete mode 100644 packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6e5260806f..8a689cae6b 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -122,14 +122,14 @@ jobs: uses: ./.github/workflows/test-node.yml secrets: inherit with: - nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.31.0' }} + nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.35.1' }} test_type: node allure_reports: true node_optional: uses: ./.github/workflows/test-node.yml with: - nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.31.0' }} + nim_wakunode_image: ${{ inputs.nim_wakunode_image || 'wakuorg/nwaku:v0.35.1' }} test_type: node-optional node_with_nwaku_master: diff --git a/packages/tests/src/lib/dockerode.ts b/packages/tests/src/lib/dockerode.ts index 823a02859b..355e4b84b3 100644 --- a/packages/tests/src/lib/dockerode.ts +++ b/packages/tests/src/lib/dockerode.ts @@ -18,7 +18,7 @@ export default class Dockerode { public containerId?: string; private static network: Docker.Network; - private containerIp: string; + public readonly containerIp: string; private constructor(imageName: string, containerIp: string) { this.docker = new Docker(); @@ -107,7 +107,10 @@ export default class Dockerode { const container = await this.docker.createContainer({ Image: this.IMAGE_NAME, HostConfig: { + NetworkMode: NETWORK_NAME, AutoRemove: true, + Dns: ["8.8.8.8"], + Links: [], PortBindings: { [`${restPort}/tcp`]: [{ HostPort: restPort.toString() }], [`${tcpPort}/tcp`]: [{ HostPort: tcpPort.toString() }], @@ -135,18 +138,19 @@ export default class Dockerode { [`${discv5UdpPort}/udp`]: {} }) }, - Cmd: argsArrayWithIP - }); - await container.start(); - - await Dockerode.network.connect({ - Container: container.id, - EndpointConfig: { - IPAMConfig: { - IPv4Address: this.containerIp + Cmd: argsArrayWithIP, + NetworkingConfig: { + EndpointsConfig: { + [NETWORK_NAME]: { + IPAMConfig: { + IPv4Address: this.containerIp + } + } } } }); + await container.start(); + const logStream = fs.createWriteStream(logPath); container.logs( diff --git a/packages/tests/src/lib/index.ts b/packages/tests/src/lib/index.ts index 6eabefc3f4..080f9a8ef2 100644 --- a/packages/tests/src/lib/index.ts +++ b/packages/tests/src/lib/index.ts @@ -1,6 +1,6 @@ import { DecodedMessage } from "@waku/core"; -import { NetworkConfig } from "@waku/interfaces"; -import { derivePubsubTopicsFromNetworkConfig, Logger } from "@waku/utils"; +import { AutoSharding, NetworkConfig, StaticSharding } from "@waku/interfaces"; +import { contentTopicToShardIndex, Logger } from "@waku/utils"; import { expect } from "chai"; import { DefaultTestPubsubTopic } from "../constants.js"; @@ -29,24 +29,27 @@ export class ServiceNodesFleet { _args?: Args, withoutFilter = false ): Promise { - const serviceNodePromises = Array.from( - { length: nodesToCreate }, - async () => { - const node = new ServiceNode( - makeLogFileName(mochaContext) + - Math.random().toString(36).substring(7) - ); + const nodes: ServiceNode[] = []; - const args = getArgs(networkConfig, _args); - await node.start(args, { - retries: 3 - }); + for (let i = 0; i < nodesToCreate; i++) { + const node = new ServiceNode( + makeLogFileName(mochaContext) + Math.random().toString(36).substring(7) + ); - return node; + const args = getArgs(networkConfig, _args); + + if (nodes[0]) { + const addr = await nodes[0].getExternalMultiaddr(); + args.staticnode = addr ?? args.staticnode; } - ); - const nodes = await Promise.all(serviceNodePromises); + await node.start(args, { + retries: 3 + }); + + nodes.push(node); + } + return new ServiceNodesFleet(nodes, withoutFilter, strictChecking); } @@ -251,16 +254,23 @@ class MultipleNodesMessageCollector { } function getArgs(networkConfig: NetworkConfig, args?: Args): Args { - const pubsubTopics = derivePubsubTopicsFromNetworkConfig(networkConfig); const defaultArgs = { lightpush: true, filter: true, discv5Discovery: true, peerExchange: true, relay: true, - pubsubTopic: pubsubTopics, clusterId: networkConfig.clusterId } as Args; + if ((networkConfig as StaticSharding).shards) { + defaultArgs.shard = (networkConfig as StaticSharding).shards; + } else if ((networkConfig as AutoSharding).contentTopics) { + defaultArgs.contentTopic = (networkConfig as AutoSharding).contentTopics; + defaultArgs.shard = (networkConfig as AutoSharding).contentTopics.map( + (topic) => contentTopicToShardIndex(topic) + ); + } + return { ...defaultArgs, ...args }; } diff --git a/packages/tests/src/lib/message_collector.ts b/packages/tests/src/lib/message_collector.ts index 3343dfc711..0bbc8a4891 100644 --- a/packages/tests/src/lib/message_collector.ts +++ b/packages/tests/src/lib/message_collector.ts @@ -269,8 +269,6 @@ export class MessageCollector { } private getPubsubTopicToUse(pubsubTopic: string | undefined): string { - return ( - pubsubTopic || this.nwaku?.pubsubTopics?.[0] || DefaultTestPubsubTopic - ); + return pubsubTopic || DefaultTestPubsubTopic; } } diff --git a/packages/tests/src/lib/runNodes.ts b/packages/tests/src/lib/runNodes.ts index f387e90493..bba48f343a 100644 --- a/packages/tests/src/lib/runNodes.ts +++ b/packages/tests/src/lib/runNodes.ts @@ -37,7 +37,7 @@ export async function runNodes( lightpush: true, relay: true, store: true, - pubsubTopic: pubsubTopics, + shard: shardInfo.shards, clusterId: shardInfo.clusterId }, { retries: 3 } diff --git a/packages/tests/src/lib/service_node.ts b/packages/tests/src/lib/service_node.ts index 71e3330c21..60e5f72dc4 100644 --- a/packages/tests/src/lib/service_node.ts +++ b/packages/tests/src/lib/service_node.ts @@ -1,7 +1,7 @@ import type { PeerId } from "@libp2p/interface"; import { peerIdFromString } from "@libp2p/peer-id"; import { Multiaddr, multiaddr } from "@multiformats/multiaddr"; -import { isDefined } from "@waku/utils"; +import { isDefined, shardInfoToPubsubTopics } from "@waku/utils"; import { Logger } from "@waku/utils"; import pRetry from "p-retry"; import portfinder from "portfinder"; @@ -27,7 +27,7 @@ const WAKU_SERVICE_NODE_PARAMS = const NODE_READY_LOG_LINE = "Node setup complete"; export const DOCKER_IMAGE_NAME = - process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; + process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.35.1"; const LOG_DIR = "./log"; @@ -235,9 +235,15 @@ export class ServiceNode { ); } - public async messages(pubsubTopic?: string): Promise { + public async messages(_pubsubTopic?: string): Promise { + const pubsubTopic = + _pubsubTopic ?? + shardInfoToPubsubTopics({ + clusterId: this.args?.clusterId, + shards: this.args?.shard + })[0]; return this.restCall( - `/relay/v1/messages/${encodeURIComponent(pubsubTopic || this?.args?.pubsubTopic?.[0] || DefaultTestPubsubTopic)}`, + `/relay/v1/messages/${encodeURIComponent(pubsubTopic)}`, "GET", null, async (response) => { @@ -262,7 +268,7 @@ export class ServiceNode { public async sendMessage( message: MessageRpcQuery, - pubsubTopic?: string + _pubsubTopic?: string ): Promise { this.checkProcess(); @@ -270,8 +276,14 @@ export class ServiceNode { message.timestamp = BigInt(new Date().valueOf()) * OneMillion; } + const pubsubTopic = + _pubsubTopic ?? + shardInfoToPubsubTopics({ + clusterId: this.args?.clusterId, + shards: this.args?.shard + })[0]; return this.restCall( - `/relay/v1/messages/${encodeURIComponent(pubsubTopic || this.args?.pubsubTopic?.[0] || DefaultTestPubsubTopic)}`, + `/relay/v1/messages/${encodeURIComponent(pubsubTopic || DefaultTestPubsubTopic)}`, "POST", message, async (response) => response.status === 200 @@ -348,10 +360,6 @@ export class ServiceNode { return `http://127.0.0.1:${this.restPort}`; } - public get pubsubTopics(): string[] { - return this.args?.pubsubTopic ?? []; - } - public async restCall( endpoint: string, method: "GET" | "POST", @@ -377,6 +385,15 @@ export class ServiceNode { } } + public async getExternalMultiaddr(): Promise { + if (!this.docker?.container) { + return undefined; + } + const containerIp = this.docker.containerIp; + const peerId = await this.getPeerId(); + return `/ip4/${containerIp}/tcp/${this.websocketPort}/ws/p2p/${peerId}`; + } + private checkProcess(): void { if (!this.docker?.container) { throw `Container hasn't started`; diff --git a/packages/tests/src/run-tests.js b/packages/tests/src/run-tests.js index b468bdc00e..7de7da2253 100644 --- a/packages/tests/src/run-tests.js +++ b/packages/tests/src/run-tests.js @@ -3,7 +3,7 @@ import { promisify } from "util"; const execAsync = promisify(exec); -const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; +const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.35.1"; async function main() { try { diff --git a/packages/tests/src/sync-rln-tree.js b/packages/tests/src/sync-rln-tree.js index d69c3d1899..6939c1141e 100644 --- a/packages/tests/src/sync-rln-tree.js +++ b/packages/tests/src/sync-rln-tree.js @@ -7,7 +7,7 @@ import { ServiceNode } from "./lib/index.js"; const execAsync = promisify(exec); -const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.31.0"; +const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.35.1"; const containerName = "rln_tree"; async function syncRlnTree() { diff --git a/packages/tests/src/types.ts b/packages/tests/src/types.ts index d82eeaf752..872cadbe5a 100644 --- a/packages/tests/src/types.ts +++ b/packages/tests/src/types.ts @@ -14,7 +14,6 @@ export interface Args { peerExchange?: boolean; discv5Discovery?: boolean; storeMessageDbUrl?: string; - pubsubTopic?: Array; contentTopic?: Array; websocketSupport?: boolean; tcpPort?: number; diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index 6053cfdf63..298378d961 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -45,7 +45,6 @@ export async function runMultipleNodes( }; const waku = await createLightNode(wakuOptions); - await waku.start(); if (!waku) { throw new Error("Failed to initialize waku"); diff --git a/packages/tests/tests/enr.node.spec.ts b/packages/tests/tests/enr.node.spec.ts index c4a467d85f..83bd80fd01 100644 --- a/packages/tests/tests/enr.node.spec.ts +++ b/packages/tests/tests/enr.node.spec.ts @@ -6,7 +6,6 @@ import { expect } from "chai"; import { afterEachCustom, - DefaultTestPubsubTopic, makeLogFileName, NOISE_KEY_1, ServiceNode, @@ -30,7 +29,8 @@ describe("ENR Interop: ServiceNode", function () { store: false, filter: false, lightpush: false, - pubsubTopic: [DefaultTestPubsubTopic] + clusterId: DefaultTestShardInfo.clusterId, + shard: DefaultTestShardInfo.shards }); const multiAddrWithId = await nwaku.getMultiaddrWithId(); @@ -64,7 +64,8 @@ describe("ENR Interop: ServiceNode", function () { store: true, filter: false, lightpush: false, - pubsubTopic: [DefaultTestPubsubTopic] + clusterId: DefaultTestShardInfo.clusterId, + shard: DefaultTestShardInfo.shards }); const multiAddrWithId = await nwaku.getMultiaddrWithId(); @@ -98,7 +99,8 @@ describe("ENR Interop: ServiceNode", function () { store: true, filter: true, lightpush: true, - pubsubTopic: [DefaultTestPubsubTopic] + clusterId: DefaultTestShardInfo.clusterId, + shard: DefaultTestShardInfo.shards }); const multiAddrWithId = await nwaku.getMultiaddrWithId(); diff --git a/packages/tests/tests/ephemeral.node.spec.ts b/packages/tests/tests/ephemeral.node.spec.ts index 2abed950d2..793873f7e5 100644 --- a/packages/tests/tests/ephemeral.node.spec.ts +++ b/packages/tests/tests/ephemeral.node.spec.ts @@ -15,7 +15,11 @@ import { createEncoder as createSymEncoder } from "@waku/message-encryption/symmetric"; import { createLightNode } from "@waku/sdk"; -import { contentTopicToPubsubTopic, Logger } from "@waku/utils"; +import { + contentTopicToPubsubTopic, + contentTopicToShardIndex, + Logger +} from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -84,14 +88,15 @@ describe("Waku Message Ephemeral field", function () { beforeEachCustom(this, async () => { nwaku = new ServiceNode(makeLogFileName(this.ctx)); + const contentTopics = [TestContentTopic, AsymContentTopic, SymContentTopic]; await nwaku.start({ filter: true, lightpush: true, store: true, relay: true, - pubsubTopic: [PubsubTopic], - contentTopic: [TestContentTopic, AsymContentTopic, SymContentTopic], - clusterId: ClusterId + contentTopic: contentTopics, + clusterId: ClusterId, + shard: contentTopics.map((t) => contentTopicToShardIndex(t)) }); await nwaku.ensureSubscriptionsAutosharding([ TestContentTopic, diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index c85b67ac11..d333cd47c4 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -146,6 +146,29 @@ const runTests = (strictCheckNodes: boolean): void => { ).to.eq(false); }); + it("Check message with no pubsub topic is not received", async function () { + await waku.filter.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + await delay(400); + + await serviceNodes.nodes[0].restCall( + `/relay/v1/messages/`, + "POST", + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000) + }, + async (res) => res.status === 200 + ); + + expect(await serviceNodes.messageCollector.waitForMessages(1)).to.eq( + false + ); + }); + it("Check message with no content topic is not received", async function () { await waku.filter.subscribe( [TestDecoder], diff --git a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts deleted file mode 100644 index ec7ba216f0..0000000000 --- a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts +++ /dev/null @@ -1,456 +0,0 @@ -import { createDecoder, createEncoder } from "@waku/core"; -import type { - ContentTopicInfo, - LightNode, - ShardInfo, - SingleShardInfo -} from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; -import { - contentTopicToPubsubTopic, - contentTopicToShardIndex, - singleShardInfoToPubsubTopic -} from "@waku/utils"; -import { utf8ToBytes } from "@waku/utils/bytes"; -import { expect } from "chai"; - -import { - afterEachCustom, - beforeEachCustom, - makeLogFileName, - MessageCollector, - ServiceNode, - tearDownNodes -} from "../../../src/index.js"; - -import { runNodes } from "./utils.js"; - -describe("Waku Filter V2: Multiple PubsubTopics", function () { - // Set the timeout for all tests in this suite. Can be overwritten at test level - this.timeout(30000); - let waku: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; - - const customPubsubTopic1 = singleShardInfoToPubsubTopic({ - clusterId: 3, - shard: 1 - }); - const customPubsubTopic2 = singleShardInfoToPubsubTopic({ - clusterId: 3, - shard: 2 - }); - const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] }; - const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 }; - const singleShardInfo2: SingleShardInfo = { clusterId: 3, shard: 2 }; - const customContentTopic1 = "/test/2/waku-filter"; - const customContentTopic2 = "/test/3/waku-filter"; - const customEncoder1 = createEncoder({ - pubsubTopicShardInfo: singleShardInfo1, - contentTopic: customContentTopic1 - }); - const customDecoder1 = createDecoder(customContentTopic1, singleShardInfo1); - const customEncoder2 = createEncoder({ - pubsubTopicShardInfo: singleShardInfo2, - contentTopic: customContentTopic2 - }); - const customDecoder2 = createDecoder(customContentTopic2, singleShardInfo2); - - beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, shardInfo); - messageCollector = new MessageCollector(); - }); - - afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], waku); - }); - - it("Subscribe and receive messages on custom pubsubtopic", async function () { - await waku.filter.subscribe([customDecoder1], messageCollector.callback); - await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: customPubsubTopic1, - expectedMessageText: "M1" - }); - }); - - it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - await waku.filter.subscribe([customDecoder1], messageCollector.callback); - - const messageCollector2 = new MessageCollector(); - - await waku.filter.subscribe([customDecoder2], messageCollector2.callback); - - await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - - expect(await messageCollector.waitForMessages(1)).to.eq(true); - expect(await messageCollector2.waitForMessages(1)).to.eq(true); - - messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: customPubsubTopic1, - expectedMessageText: "M1" - }); - - messageCollector2.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: customPubsubTopic2, - expectedMessageText: "M2" - }); - }); - - it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { - await waku.filter.subscribe([customDecoder1], messageCollector.callback); - - // Set up and start a new nwaku node with customPubsubTopic1 - nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); - await nwaku2.start({ - filter: true, - lightpush: true, - relay: true, - pubsubTopic: [customPubsubTopic2], - clusterId: 3 - }); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); - - await nwaku2.ensureSubscriptions([customPubsubTopic2]); - - const messageCollector2 = new MessageCollector(); - - await waku.filter.subscribe([customDecoder2], messageCollector2.callback); - - // Making sure that messages are send and reveiced for both subscriptions - // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 - while ( - !(await messageCollector.waitForMessages(1, { - pubsubTopic: customPubsubTopic1 - })) || - !(await messageCollector2.waitForMessages(1, { - pubsubTopic: customPubsubTopic2 - })) - ) { - await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - } - - messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: customPubsubTopic1, - expectedMessageText: "M1" - }); - - messageCollector2.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: customPubsubTopic2, - expectedMessageText: "M2" - }); - }); -}); - -describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { - // Set the timeout for all tests in this suite. Can be overwritten at test level - this.timeout(30000); - const clusterId = 3; - let waku: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; - - const customContentTopic1 = "/waku/2/content/utf8"; - const customContentTopic2 = "/myapp/1/latest/proto"; - const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( - customContentTopic1, - clusterId - ); - const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( - customContentTopic2, - clusterId - ); - const contentTopicInfo: ContentTopicInfo = { - clusterId: clusterId, - contentTopics: [customContentTopic1, customContentTopic2] - }; - const customEncoder1 = createEncoder({ - contentTopic: customContentTopic1, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(customContentTopic1) - } - }); - const customDecoder1 = createDecoder(customContentTopic1, { - clusterId: clusterId, - shard: contentTopicToShardIndex(customContentTopic1) - }); - const customEncoder2 = createEncoder({ - contentTopic: customContentTopic2, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(customContentTopic2) - } - }); - const customDecoder2 = createDecoder(customContentTopic2, { - clusterId: clusterId, - shard: contentTopicToShardIndex(customContentTopic2) - }); - - beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, contentTopicInfo); - messageCollector = new MessageCollector(); - }); - - afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], waku); - }); - - it("Subscribe and receive messages on autosharded pubsubtopic", async function () { - await waku.filter.subscribe([customDecoder1], messageCollector.callback); - await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: autoshardingPubsubTopic1, - expectedMessageText: "M1" - }); - }); - - it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - await waku.filter.subscribe([customDecoder1], messageCollector.callback); - - const messageCollector2 = new MessageCollector(); - await waku.filter.subscribe([customDecoder2], messageCollector2.callback); - - await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 - }) - ).to.eq(true); - expect( - await messageCollector2.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic2 - }) - ).to.eq(true); - - messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: autoshardingPubsubTopic1, - expectedMessageText: "M1" - }); - - messageCollector2.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: autoshardingPubsubTopic2, - expectedMessageText: "M2" - }); - }); - - it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { - await waku.filter.subscribe([customDecoder1], messageCollector.callback); - - // Set up and start a new nwaku node with customPubsubTopic1 - nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); - await nwaku2.start({ - filter: true, - lightpush: true, - relay: true, - pubsubTopic: [autoshardingPubsubTopic2], - clusterId: clusterId, - contentTopic: [customContentTopic2] - }); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); - - await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); - - const messageCollector2 = new MessageCollector(); - - await waku.filter.subscribe([customDecoder2], messageCollector2.callback); - - // Making sure that messages are send and reveiced for both subscriptions - // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 - while ( - !(await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 - })) || - !(await messageCollector2.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic2 - })) - ) { - await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - } - - messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: autoshardingPubsubTopic1, - expectedMessageText: "M1" - }); - - messageCollector2.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: autoshardingPubsubTopic2, - expectedMessageText: "M2" - }); - }); - - it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () { - // this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2` - try { - await waku.filter.subscribe([customDecoder2], messageCollector.callback); - } catch (error) { - expect((error as Error).message).to.include( - "Pubsub topic not configured" - ); - } - }); -}); - -describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { - // Set the timeout for all tests in this suite. Can be overwritten at test level - this.timeout(30000); - let waku: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; - - const customPubsubTopic1 = singleShardInfoToPubsubTopic({ - clusterId: 3, - shard: 1 - }); - const customPubsubTopic2 = singleShardInfoToPubsubTopic({ - clusterId: 3, - shard: 2 - }); - const shardInfo = { - clusterId: 3, - shards: [1, 2] - }; - const customContentTopic1 = "/test/2/waku-filter"; - const customContentTopic2 = "/test/3/waku-filter"; - const customEncoder1 = createEncoder({ - pubsubTopic: customPubsubTopic1, - contentTopic: customContentTopic1 - }); - const customDecoder1 = createDecoder(customContentTopic1, customPubsubTopic1); - const customEncoder2 = createEncoder({ - pubsubTopic: customPubsubTopic2, - contentTopic: customContentTopic2 - }); - const customDecoder2 = createDecoder(customContentTopic2, customPubsubTopic2); - - beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, shardInfo); - messageCollector = new MessageCollector(); - }); - - afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], waku); - }); - - it("Subscribe and receive messages on custom pubsubtopic", async function () { - await waku.filter.subscribe([customDecoder1], messageCollector.callback); - await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: customPubsubTopic1, - expectedMessageText: "M1" - }); - }); - - it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - await waku.filter.subscribe([customDecoder1], messageCollector.callback); - - const messageCollector2 = new MessageCollector(); - - await waku.filter.subscribe([customDecoder2], messageCollector2.callback); - - await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - - expect(await messageCollector.waitForMessages(1)).to.eq(true); - expect(await messageCollector2.waitForMessages(1)).to.eq(true); - - messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: customPubsubTopic1, - expectedMessageText: "M1" - }); - - messageCollector2.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: customPubsubTopic2, - expectedMessageText: "M2" - }); - }); - - it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { - await waku.filter.subscribe([customDecoder1], messageCollector.callback); - - // Set up and start a new nwaku node with customPubsubTopic1 - nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); - await nwaku2.start({ - filter: true, - lightpush: true, - relay: true, - pubsubTopic: [customPubsubTopic2], - clusterId: 3 - }); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); - - await nwaku2.ensureSubscriptions([customPubsubTopic2]); - - const messageCollector2 = new MessageCollector(); - - await waku.filter.subscribe([customDecoder2], messageCollector2.callback); - - // Making sure that messages are send and reveiced for both subscriptions - // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 - while ( - !(await messageCollector.waitForMessages(1, { - pubsubTopic: customPubsubTopic1 - })) || - !(await messageCollector2.waitForMessages(1, { - pubsubTopic: customPubsubTopic2 - })) - ) { - await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); - } - - messageCollector.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: customPubsubTopic1, - expectedMessageText: "M1" - }); - - messageCollector2.verifyReceivedMessage(0, { - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: customPubsubTopic2, - expectedMessageText: "M2" - }); - }); - - it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () { - // this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2` - try { - await waku.filter.subscribe([customDecoder2], messageCollector.callback); - } catch (error) { - expect((error as Error).message).to.include( - "Pubsub topic not configured" - ); - } - }); -}); diff --git a/packages/tests/tests/filter/single_node/ping.node.spec.ts b/packages/tests/tests/filter/single_node/ping.node.spec.ts deleted file mode 100644 index ea6fafbc00..0000000000 --- a/packages/tests/tests/filter/single_node/ping.node.spec.ts +++ /dev/null @@ -1,129 +0,0 @@ -import { ISubscription, LightNode } from "@waku/interfaces"; -import { utf8ToBytes } from "@waku/sdk"; -import { expect } from "chai"; - -import { - afterEachCustom, - beforeEachCustom, - MessageCollector, - ServiceNode, - tearDownNodes -} from "../../../src/index.js"; -import { - TestContentTopic, - TestDecoder, - TestEncoder, - TestShardInfo, - validatePingError -} from "../utils.js"; - -import { runNodes } from "./utils.js"; - -describe("Waku Filter V2: Ping", function () { - // Set the timeout for all tests in this suite. Can be overwritten at test level - this.timeout(10000); - let waku: LightNode; - let nwaku: ServiceNode; - let messageCollector: MessageCollector; - - beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - messageCollector = new MessageCollector(); - }); - - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - it("Ping on subscribed peer", async function () { - const { subscription, error } = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (error) { - throw error; - } - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - expect(await messageCollector.waitForMessages(1)).to.eq(true); - - // If ping is successfull(node has active subscription) we receive a success status code. - await subscription.ping(); - - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); - - // Confirm new messages are received after a ping. - expect(await messageCollector.waitForMessages(2)).to.eq(true); - }); - - it("Ping on peer without subscriptions", async function () { - const { subscription, error } = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (error) { - throw error; - } - await subscription.unsubscribe([TestContentTopic]); - await validatePingError(subscription); - }); - - it("Ping on unsubscribed peer", async function () { - const { error, subscription } = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (error) { - throw error; - } - - await subscription.ping(); - await subscription.unsubscribe([TestContentTopic]); - - // Ping imediately after unsubscribe - await validatePingError(subscription); - }); - - it("Reopen subscription with peer with lost subscription", async function () { - let subscription: ISubscription; - const openSubscription = async (): Promise => { - const result = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (result.error) { - throw result.error; - } - subscription = result.subscription; - }; - - const unsubscribe = async (): Promise => { - await subscription.unsubscribe([TestContentTopic]); - }; - - const pingAndReinitiateSubscription = async (): Promise => { - try { - await subscription.ping(); - } catch (error) { - if ( - error instanceof Error && - error.message.includes("peer has no subscriptions") - ) { - await openSubscription(); - } else { - throw error; - } - } - }; - - // open subscription & ping -> should pass - await openSubscription(); - await pingAndReinitiateSubscription(); - - // unsubscribe & ping -> should fail and reinitiate subscription - await unsubscribe(); - await pingAndReinitiateSubscription(); - - // ping -> should pass as subscription is reinitiated - await pingAndReinitiateSubscription(); - }); -}); diff --git a/packages/tests/tests/filter/single_node/push.node.spec.ts b/packages/tests/tests/filter/single_node/push.node.spec.ts deleted file mode 100644 index a0c7c115d1..0000000000 --- a/packages/tests/tests/filter/single_node/push.node.spec.ts +++ /dev/null @@ -1,255 +0,0 @@ -import { LightNode, Protocols } from "@waku/interfaces"; -import { utf8ToBytes } from "@waku/sdk"; -import { expect } from "chai"; - -import { - afterEachCustom, - beforeEachCustom, - delay, - MessageCollector, - ServiceNode, - tearDownNodes, - TEST_STRING, - TEST_TIMESTAMPS -} from "../../../src/index.js"; -import { runNodes } from "../../light-push/utils.js"; -import { - messageText, - TestContentTopic, - TestDecoder, - TestEncoder, - TestPubsubTopic, - TestShardInfo -} from "../utils.js"; - -describe("Waku Filter V2: FilterPush", function () { - // Set the timeout for all tests in this suite. Can be overwritten at test level - this.timeout(10000); - let waku: LightNode; - let nwaku: ServiceNode; - let messageCollector: MessageCollector; - - beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - messageCollector = new MessageCollector(nwaku); - }); - - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - TEST_STRING.forEach((testItem) => { - it(`Check received message containing ${testItem.description}`, async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes(testItem.value) - }); - - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: testItem.value, - expectedContentTopic: TestContentTopic - }); - }); - }); - - TEST_TIMESTAMPS.forEach((testItem) => { - it(`Check received message with timestamp: ${testItem} `, async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await delay(400); - - await nwaku.restCall( - `/relay/v1/messages/${encodeURIComponent(TestPubsubTopic)}`, - "POST", - { - contentTopic: TestContentTopic, - payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), - timestamp: testItem, - version: 0 - }, - async (res) => res.status === 200 - ); - - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - checkTimestamp: false, - expectedContentTopic: TestContentTopic - }); - - // Check if the timestamp matches - const timestamp = messageCollector.getMessage(0).timestamp; - if (testItem == undefined) { - expect(timestamp).to.eq(undefined); - } - if (timestamp !== undefined && timestamp instanceof Date) { - expect(testItem?.toString()).to.contain(timestamp.getTime().toString()); - } - }); - }); - - it("Check message with invalid timestamp is not received", async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await delay(400); - - await nwaku.restCall( - `/relay/v1/messages/${encodeURIComponent(TestPubsubTopic)}`, - "POST", - { - contentTopic: TestContentTopic, - payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), - timestamp: "2023-09-06T12:05:38.609Z" - }, - async (res) => res.status === 200 - ); - - // Verify that no message was received - expect(await messageCollector.waitForMessages(1)).to.eq(false); - }); - - it("Check message on other pubsub topic is not received", async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await delay(400); - - await nwaku.restCall( - `/relay/v1/messages/${encodeURIComponent("/othertopic")}`, - "POST", - { - contentTopic: TestContentTopic, - payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), - timestamp: BigInt(Date.now()) * BigInt(1000000) - }, - async (res) => res.status === 200 - ); - - expect(await messageCollector.waitForMessages(1)).to.eq(false); - }); - - it("Check message with no pubsub topic is not received", async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await delay(400); - - await nwaku.restCall( - `/relay/v1/messages/`, - "POST", - { - contentTopic: TestContentTopic, - payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), - timestamp: BigInt(Date.now()) * BigInt(1000000) - }, - async (res) => res.status === 200 - ); - - expect(await messageCollector.waitForMessages(1)).to.eq(false); - }); - - it("Check message with no content topic is not received", async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await delay(400); - - await nwaku.restCall( - `/relay/v1/messages/${encodeURIComponent(TestPubsubTopic)}`, - "POST", - { - payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), - timestamp: BigInt(Date.now()) * BigInt(1000000) - }, - async (res) => res.status === 200 - ); - - expect(await messageCollector.waitForMessages(1)).to.eq(false); - }); - - it("Check message with no payload is not received", async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await delay(400); - - await nwaku.restCall( - `/relay/v1/messages/${encodeURIComponent(TestPubsubTopic)}`, - "POST", - { - contentTopic: TestContentTopic, - timestamp: BigInt(Date.now()) * BigInt(1000000) - }, - async (res) => res.status === 200 - ); - - expect(await messageCollector.waitForMessages(1)).to.eq(false); - }); - - it("Check message with non string payload is not received", async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await delay(400); - - await nwaku.restCall( - `/relay/v1/messages/${encodeURIComponent(TestPubsubTopic)}`, - "POST", - { - contentTopic: TestContentTopic, - payload: 12345, - timestamp: BigInt(Date.now()) * BigInt(1000000) - }, - async (res) => res.status === 200 - ); - - expect(await messageCollector.waitForMessages(1)).to.eq(false); - }); - - // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done - it.skip("Check message received after jswaku node is restarted", async function () { - // Subscribe and send message - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - expect(await messageCollector.waitForMessages(1)).to.eq(true); - - // Restart js-waku node - await waku.stop(); - expect(waku.isStarted()).to.eq(false); - await waku.start(); - expect(waku.isStarted()).to.eq(true); - - // Redo the connection and create a new subscription - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); - - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); - - // Confirm both messages were received. - expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1", - expectedContentTopic: TestContentTopic - }); - messageCollector.verifyReceivedMessage(1, { - expectedMessageText: "M2", - expectedContentTopic: TestContentTopic - }); - }); - - // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done - it.skip("Check message received after nwaku node is restarted", async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - expect(await messageCollector.waitForMessages(1)).to.eq(true); - - // Restart nwaku node - await tearDownNodes(nwaku, []); - await nwaku.start(); - await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); - - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); - - // Confirm both messages were received. - expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1", - expectedContentTopic: TestContentTopic - }); - messageCollector.verifyReceivedMessage(1, { - expectedMessageText: "M2", - expectedContentTopic: TestContentTopic - }); - }); -}); diff --git a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts deleted file mode 100644 index 4eb49a7129..0000000000 --- a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts +++ /dev/null @@ -1,470 +0,0 @@ -import { createDecoder, createEncoder } from "@waku/core"; -import { LightNode, Protocols } from "@waku/interfaces"; -import { - ecies, - generatePrivateKey, - generateSymmetricKey, - getPublicKey, - symmetric -} from "@waku/message-encryption"; -import { utf8ToBytes } from "@waku/sdk"; -import { expect } from "chai"; -import type { Context } from "mocha"; - -import { - afterEachCustom, - beforeEachCustom, - delay, - generateTestData, - MessageCollector, - ServiceNode, - tearDownNodes, - TEST_STRING -} from "../../../src/index.js"; -import { - messagePayload, - messageText, - TestContentTopic, - TestDecoder, - TestEncoder, - TestPubsubTopic, - TestShardInfo -} from "../utils.js"; - -import { runNodes } from "./utils.js"; - -describe("Waku Filter V2: Subscribe: Single Service Node", function () { - // Set the timeout for all tests in this suite. Can be overwritten at test level - this.timeout(10000); - let waku: LightNode; - let waku2: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; - let ctx: Context; - - beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - messageCollector = new MessageCollector(); - await nwaku.ensureSubscriptions([TestPubsubTopic]); - }); - - afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], [waku, waku2]); - }); - - it("Subscribe and receive messages via lightPush", async function () { - const { error } = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (error) { - throw error; - } - - await waku.lightPush.send(TestEncoder, messagePayload); - - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - expect((await nwaku.messages()).length).to.eq(1); - }); - - it("Subscribe and receive ecies encrypted messages via lightPush", async function () { - const privateKey = generatePrivateKey(); - const publicKey = getPublicKey(privateKey); - const encoder = ecies.createEncoder({ - contentTopic: TestContentTopic, - publicKey, - pubsubTopic: TestPubsubTopic - }); - const decoder = ecies.createDecoder( - TestContentTopic, - privateKey, - TestPubsubTopic - ); - - const { error } = await waku.filter.subscribe( - [decoder], - messageCollector.callback - ); - if (error) { - throw error; - } - - await waku.lightPush.send(encoder, messagePayload); - - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedVersion: 1, - expectedPubsubTopic: TestPubsubTopic - }); - expect((await nwaku.messages()).length).to.eq(1); - }); - - it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () { - const symKey = generateSymmetricKey(); - const encoder = symmetric.createEncoder({ - contentTopic: TestContentTopic, - symKey, - pubsubTopic: TestPubsubTopic - }); - const decoder = symmetric.createDecoder( - TestContentTopic, - symKey, - TestPubsubTopic - ); - - const { error } = await waku.filter.subscribe( - [decoder], - messageCollector.callback - ); - if (error) { - throw error; - } - - await waku.lightPush.send(encoder, messagePayload); - - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedVersion: 1, - expectedPubsubTopic: TestPubsubTopic - }); - expect((await nwaku.messages()).length).to.eq(1); - }); - - it("Subscribe and receive messages via waku relay post", async function () { - const { error } = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (error) { - throw error; - } - - await delay(400); - - // Send a test message using the relay post method. - await nwaku.sendMessage( - ServiceNode.toMessageRpcQuery({ - contentTopic: TestContentTopic, - payload: utf8ToBytes(messageText) - }) - ); - - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - expect((await nwaku.messages()).length).to.eq(1); - }); - - it("Subscribe and receive 2 messages on the same topic", async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - - await waku.lightPush.send(TestEncoder, messagePayload); - - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - - // Send another message on the same topic. - const newMessageText = "Filtering still works!"; - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes(newMessageText) - }); - - // Verify that the second message was successfully received. - expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage(1, { - expectedMessageText: newMessageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - expect((await nwaku.messages()).length).to.eq(2); - }); - - it("Subscribe and receive messages on 2 different content topics", async function () { - // Subscribe to the first content topic and send a message. - const { error, subscription } = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (error) { - throw error; - } - await waku.lightPush.send(TestEncoder, messagePayload); - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - - // Modify subscription to include a new content topic and send a message. - const newMessageText = "Filtering still works!"; - const newMessagePayload = { payload: utf8ToBytes(newMessageText) }; - const newContentTopic = "/test/2/waku-filter/default"; - const newEncoder = createEncoder({ - contentTopic: newContentTopic, - pubsubTopic: TestPubsubTopic - }); - const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await subscription.subscribe([newDecoder], messageCollector.callback); - await waku.lightPush.send(newEncoder, { - payload: utf8ToBytes(newMessageText) - }); - expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage(1, { - expectedContentTopic: newContentTopic, - expectedMessageText: newMessageText, - expectedPubsubTopic: TestPubsubTopic - }); - - // Send another message on the initial content topic to verify it still works. - await waku.lightPush.send(TestEncoder, newMessagePayload); - expect(await messageCollector.waitForMessages(3)).to.eq(true); - messageCollector.verifyReceivedMessage(2, { - expectedMessageText: newMessageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - expect((await nwaku.messages()).length).to.eq(3); - }); - - it("Subscribe and receives messages on 20 topics", async function () { - const topicCount = 20; - const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); - - // Subscribe to all 20 topics. - for (let i = 0; i < topicCount; i++) { - await waku.filter.subscribe([td.decoders[i]], messageCollector.callback); - } - - // Send a unique message on each topic. - for (let i = 0; i < topicCount; i++) { - await waku.lightPush.send(td.encoders[i], { - payload: utf8ToBytes(`Message for Topic ${i + 1}`) - }); - } - - // Verify that each message was received on the corresponding topic. - expect(await messageCollector.waitForMessages(20)).to.eq(true); - td.contentTopics.forEach((topic, index) => { - messageCollector.verifyReceivedMessage(index, { - expectedContentTopic: topic, - expectedMessageText: `Message for Topic ${index + 1}`, - expectedPubsubTopic: TestPubsubTopic - }); - }); - }); - - it("Subscribe to 100 topics (new limit) at once and receives messages", async function () { - this.timeout(100_000); - const topicCount = 100; - const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); - - await waku.filter.subscribe(td.decoders, messageCollector.callback); - - // Send a unique message on each topic. - for (let i = 0; i < topicCount; i++) { - await waku.lightPush.send(td.encoders[i], { - payload: utf8ToBytes(`Message for Topic ${i + 1}`) - }); - } - - // Verify that each message was received on the corresponding topic. - expect(await messageCollector.waitForMessages(topicCount)).to.eq(true); - td.contentTopics.forEach((topic, index) => { - messageCollector.verifyReceivedMessage(index, { - expectedContentTopic: topic, - expectedMessageText: `Message for Topic ${index + 1}`, - expectedPubsubTopic: TestPubsubTopic, - checkTimestamp: false - }); - }); - }); - - it("Error when try to subscribe to more than 101 topics (new limit)", async function () { - const topicCount = 101; - const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); - - try { - const { error, results } = await waku.filter.subscribe( - td.decoders, - messageCollector.callback - ); - if (error) { - throw error; - } - const { failures, successes } = results; - if (failures.length === 0 || successes.length > 0) { - throw new Error( - `Subscribe to ${topicCount} topics was successful but was expected to fail with a specific error.` - ); - } - } catch (err) { - if ( - err instanceof Error && - err.message.includes( - `exceeds maximum content topics: ${topicCount - 1}` - ) - ) { - return; - } else { - throw err; - } - } - }); - - it("Overlapping topic subscription", async function () { - // Define two sets of test data with overlapping topics. - const topicCount1 = 2; - const td1 = generateTestData(topicCount1, { pubsubTopic: TestPubsubTopic }); - const topicCount2 = 4; - const td2 = generateTestData(topicCount2, { pubsubTopic: TestPubsubTopic }); - - // Subscribe to the first set of topics. - await waku.filter.subscribe(td1.decoders, messageCollector.callback); - - // Subscribe to the second set of topics which has overlapping topics with the first set. - await waku.filter.subscribe(td2.decoders, messageCollector.callback); - - // Send messages to the first set of topics. - for (let i = 0; i < topicCount1; i++) { - const messageText = `Topic Set 1: Message Number: ${i + 1}`; - await waku.lightPush.send(td1.encoders[i], { - payload: utf8ToBytes(messageText) - }); - } - - // Send messages to the second set of topics. - for (let i = 0; i < topicCount2; i++) { - const messageText = `Topic Set 2: Message Number: ${i + 1}`; - - await waku.lightPush.send(td2.encoders[i], { - payload: utf8ToBytes(messageText) - }); - } - - // Check if all messages were received. - // Since there are overlapping topics, there should be 6 messages in total (2 from the first set + 4 from the second set). - expect(await messageCollector.waitForMessages(6, { exact: true })).to.eq( - true - ); - }); - - it("Refresh subscription", async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - - // Resubscribe (refresh) to the same topic and send another message. - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); - - // Confirm both messages were received. - expect(await messageCollector.waitForMessages(2, { exact: true })).to.eq( - true - ); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1", - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - messageCollector.verifyReceivedMessage(1, { - expectedMessageText: "M2", - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - }); - - TEST_STRING.forEach((testItem) => { - it(`Subscribe to topic containing ${testItem.description} and receive message`, async function () { - const newContentTopic = testItem.value; - const newEncoder = createEncoder({ - contentTopic: newContentTopic, - pubsubTopic: TestPubsubTopic - }); - const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - - await waku.filter.subscribe([newDecoder], messageCollector.callback); - await waku.lightPush.send(newEncoder, messagePayload); - - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: newContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - }); - }); - - it("Add multiple subscription objects on single nwaku node", async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - - const newContentTopic = "/test/2/waku-filter/default"; - const newEncoder = createEncoder({ - contentTopic: newContentTopic, - pubsubTopic: TestPubsubTopic - }); - const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await waku.filter.subscribe([newDecoder], messageCollector.callback); - - await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); - - // Check if both messages were received - expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1", - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - messageCollector.verifyReceivedMessage(1, { - expectedContentTopic: newContentTopic, - expectedMessageText: "M2", - expectedPubsubTopic: TestPubsubTopic - }); - }); - - it("Subscribe and receive messages from multiple nwaku nodes", async function () { - await waku.filter.subscribe([TestDecoder], messageCollector.callback); - - // Set up and start a new nwaku node - [nwaku2, waku2] = await runNodes(ctx, TestShardInfo); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); - - await nwaku2.ensureSubscriptions([TestPubsubTopic]); - // Send a message using the new subscription - const newContentTopic = "/test/2/waku-filter/default"; - const newEncoder = createEncoder({ - contentTopic: newContentTopic, - pubsubTopic: TestPubsubTopic - }); - const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await waku.filter.subscribe([newDecoder], messageCollector.callback); - - // Making sure that messages are send and reveiced for both subscriptions - while (!(await messageCollector.waitForMessages(2))) { - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); - } - - // Check if both messages were received - expect(messageCollector.hasMessage(TestContentTopic, "M1")).to.eq(true); - expect(messageCollector.hasMessage(newContentTopic, "M2")).to.eq(true); - }); -}); diff --git a/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts b/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts deleted file mode 100644 index 48241441c6..0000000000 --- a/packages/tests/tests/filter/single_node/unsubscribe.node.spec.ts +++ /dev/null @@ -1,209 +0,0 @@ -import { createDecoder, createEncoder } from "@waku/core"; -import { LightNode } from "@waku/interfaces"; -import { utf8ToBytes } from "@waku/sdk"; -import { expect } from "chai"; - -import { - afterEachCustom, - beforeEachCustom, - generateTestData, - MessageCollector, - ServiceNode, - tearDownNodes -} from "../../../src/index.js"; -import { runNodes } from "../../light-push/utils.js"; -import { - messagePayload, - messageText, - TestContentTopic, - TestDecoder, - TestEncoder, - TestPubsubTopic, - TestShardInfo -} from "../utils.js"; - -describe("Waku Filter V2: Unsubscribe", function () { - // Set the timeout for all tests in this suite. Can be overwritten at test level - this.timeout(10000); - let waku: LightNode; - let nwaku: ServiceNode; - let messageCollector: MessageCollector; - - beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - - messageCollector = new MessageCollector(); - await nwaku.ensureSubscriptions([TestPubsubTopic]); - }); - - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () { - const { subscription, error } = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (error) { - throw error; - } - await waku.lightPush.send(TestEncoder, messagePayload); - expect(await messageCollector.waitForMessages(1)).to.eq(true); - - // Unsubscribe from the topic and send again - await subscription.unsubscribe([TestContentTopic]); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - expect(await messageCollector.waitForMessages(2)).to.eq(false); - - // Check that from 2 messages send only the 1st was received - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - expect(messageCollector.count).to.eq(1); - expect((await nwaku.messages()).length).to.eq(2); - }); - - it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () { - // Subscribe to 2 topics and send messages - const { error, subscription } = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (error) { - throw error; - } - const newContentTopic = "/test/2/waku-filter"; - const newEncoder = createEncoder({ - contentTopic: newContentTopic, - pubsubTopic: TestPubsubTopic - }); - const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await subscription.subscribe([newDecoder], messageCollector.callback); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); - expect(await messageCollector.waitForMessages(2)).to.eq(true); - - // Unsubscribe from the first topic and send again - await subscription.unsubscribe([TestContentTopic]); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") }); - await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") }); - expect(await messageCollector.waitForMessages(3)).to.eq(true); - - // Check that from 4 messages send 3 were received - expect(messageCollector.count).to.eq(3); - expect((await nwaku.messages()).length).to.eq(4); - }); - - it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () { - // Subscribe to 2 topics and send messages - const { error, subscription } = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (error) { - throw error; - } - const newContentTopic = "/test/2/waku-filter/default"; - const newEncoder = createEncoder({ - contentTopic: newContentTopic, - pubsubTopic: TestPubsubTopic - }); - const newDecoder = createDecoder(newContentTopic, TestPubsubTopic); - await subscription.subscribe([newDecoder], messageCollector.callback); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); - expect(await messageCollector.waitForMessages(2)).to.eq(true); - - // Unsubscribe from both and send again - await subscription.unsubscribe([TestContentTopic, newContentTopic]); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") }); - await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") }); - expect(await messageCollector.waitForMessages(3)).to.eq(false); - - // Check that from 4 messages send 2 were received - expect(messageCollector.count).to.eq(2); - expect((await nwaku.messages()).length).to.eq(4); - }); - - it("Unsubscribe topics the node is not subscribed to", async function () { - // Subscribe to 1 topic and send message - const { error, subscription } = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (error) { - throw error; - } - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - expect(await messageCollector.waitForMessages(1)).to.eq(true); - - expect(messageCollector.count).to.eq(1); - - // Unsubscribe from topics that the node is not not subscribed to and send again - await subscription.unsubscribe([]); - await subscription.unsubscribe(["/test/2/waku-filter/default"]); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); - expect(await messageCollector.waitForMessages(2)).to.eq(true); - - // Check that both messages were received - expect(messageCollector.count).to.eq(2); - expect((await nwaku.messages()).length).to.eq(2); - }); - - it("Unsubscribes all - node subscribed to 1 topic", async function () { - const { error, subscription } = await waku.filter.subscribe( - [TestDecoder], - messageCollector.callback - ); - if (error) { - throw error; - } - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - expect(await messageCollector.waitForMessages(1)).to.eq(true); - expect(messageCollector.count).to.eq(1); - - // Unsubscribe from all topics and send again - await subscription.unsubscribeAll(); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); - expect(await messageCollector.waitForMessages(2)).to.eq(false); - - // Check that from 2 messages send only the 1st was received - expect(messageCollector.count).to.eq(1); - expect((await nwaku.messages()).length).to.eq(2); - }); - - it("Unsubscribes all - node subscribed to 10 topics", async function () { - // Subscribe to 10 topics and send message - const topicCount = 10; - const td = generateTestData(topicCount, { pubsubTopic: TestPubsubTopic }); - const { error, subscription } = await waku.filter.subscribe( - td.decoders, - messageCollector.callback - ); - if (error) { - throw error; - } - for (let i = 0; i < topicCount; i++) { - await waku.lightPush.send(td.encoders[i], { - payload: utf8ToBytes(`M${i + 1}`) - }); - } - expect(await messageCollector.waitForMessages(10)).to.eq(true); - - // Unsubscribe from all topics and send again - await subscription.unsubscribeAll(); - for (let i = 0; i < topicCount; i++) { - await waku.lightPush.send(td.encoders[i], { - payload: utf8ToBytes(`M${topicCount + i + 1}`) - }); - } - expect(await messageCollector.waitForMessages(11)).to.eq(false); - - // Check that from 20 messages send only 10 were received - expect(messageCollector.count).to.eq(10); - expect((await nwaku.messages()).length).to.eq(20); - }); -}); diff --git a/packages/tests/tests/filter/single_node/utils.ts b/packages/tests/tests/filter/single_node/utils.ts deleted file mode 100644 index 0280bcb5d2..0000000000 --- a/packages/tests/tests/filter/single_node/utils.ts +++ /dev/null @@ -1,22 +0,0 @@ -import { LightNode, NetworkConfig, Protocols } from "@waku/interfaces"; -import { createLightNode } from "@waku/sdk"; -import { Logger } from "@waku/utils"; -import { Context } from "mocha"; - -import { - runNodes as runNodesBuilder, - ServiceNode -} from "../../../src/index.js"; - -export const log = new Logger("test:filter:single_node"); - -export const runNodes = ( - context: Context, - shardInfo: NetworkConfig -): Promise<[ServiceNode, LightNode]> => - runNodesBuilder({ - context, - createNode: createLightNode, - protocols: [Protocols.LightPush, Protocols.Filter], - networkConfig: shardInfo - }); diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index af281c039c..7c77f33b67 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -7,7 +7,7 @@ import { getPublicKey, symmetric } from "@waku/message-encryption"; -import { utf8ToBytes } from "@waku/sdk"; +import { Protocols, utf8ToBytes } from "@waku/sdk"; import { expect } from "chai"; import { @@ -15,14 +15,19 @@ import { beforeEachCustom, delay, generateTestData, + makeLogFileName, + MessageCollector, runMultipleNodes, + ServiceNode, ServiceNodesFleet, + tearDownNodes, teardownNodesWithRedundancy, TEST_STRING, waitForConnections } from "../../src/index.js"; import { + ClusterId, messagePayload, messageText, TestContentTopic, @@ -103,7 +108,7 @@ const runTests = (strictCheckNodes: boolean): void => { expectedPubsubTopic: TestPubsubTopic }); - await serviceNodes.confirmMessageLength(1); + await serviceNodes.confirmMessageLength(2); }); it("Subscribe and receive symmetrically encrypted messages via lightPush", async function () { @@ -136,7 +141,7 @@ const runTests = (strictCheckNodes: boolean): void => { expectedPubsubTopic: TestPubsubTopic }); - await serviceNodes.confirmMessageLength(1); + await serviceNodes.confirmMessageLength(2); }); it("Subscribe and receive messages via waku relay post", async function () { @@ -532,6 +537,98 @@ const runTests = (strictCheckNodes: boolean): void => { expectedContentTopic: TestContentTopic }); }); + + it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { + await waku.filter.subscribe( + [TestDecoder], + serviceNodes.messageCollector.callback + ); + + // Set up and start a new nwaku node with customPubsubTopic1 + const nwaku2 = new ServiceNode(makeLogFileName(this) + "3"); + + try { + const customContentTopic = "/test/4/waku-filter/default"; + const customDecoder = createDecoder(customContentTopic, { + clusterId: ClusterId, + shard: 4 + }); + const customEncoder = createEncoder({ + contentTopic: customContentTopic, + pubsubTopicShardInfo: { clusterId: ClusterId, shard: 4 } + }); + + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + clusterId: ClusterId, + shard: [4] + }); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.Filter, Protocols.LightPush]); + + await nwaku2.ensureSubscriptions([customDecoder.pubsubTopic]); + + const messageCollector2 = new MessageCollector(); + + await waku.filter.subscribe( + [customDecoder], + messageCollector2.callback + ); + + // Making sure that messages are send and reveiced for both subscriptions + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: TestDecoder.pubsubTopic + })) || + !(await messageCollector2.waitForMessages(1, { + pubsubTopic: customDecoder.pubsubTopic + })) + ) { + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes("M1") + }); + await waku.lightPush.send(customEncoder, { + payload: utf8ToBytes("M2") + }); + } + + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: TestDecoder.contentTopic, + expectedPubsubTopic: TestDecoder.pubsubTopic, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: customDecoder.contentTopic, + expectedPubsubTopic: customDecoder.pubsubTopic, + expectedMessageText: "M2" + }); + } catch (e) { + await tearDownNodes([nwaku2], []); + } + }); + + it("Should fail to subscribe with decoder with wrong shard", async function () { + const wrongDecoder = createDecoder(TestDecoder.contentTopic, { + clusterId: ClusterId, + shard: 5 + }); + + // this subscription object is set up with the `customPubsubTopic1` but we're passing it a Decoder with the `customPubsubTopic2` + try { + await waku.filter.subscribe( + [wrongDecoder], + serviceNodes.messageCollector.callback + ); + } catch (error) { + expect((error as Error).message).to.include( + `Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance.` + ); + } + }); }); }; diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts new file mode 100644 index 0000000000..ac82a0cab5 --- /dev/null +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -0,0 +1,141 @@ +import { createEncoder } from "@waku/core"; +import { LightNode, Protocols } from "@waku/interfaces"; +import { contentTopicToPubsubTopic } from "@waku/utils"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + makeLogFileName, + MessageCollector, + runMultipleNodes, + ServiceNode, + ServiceNodesFleet, + tearDownNodes, + teardownNodesWithRedundancy +} from "../../src/index.js"; + +import { ClusterId, TestEncoder } from "./utils.js"; + +describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { + this.timeout(30000); + const numServiceNodes = 2; + + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + + const customEncoder2 = createEncoder({ + contentTopic: "/test/2/waku-light-push/utf8", + pubsubTopic: contentTopicToPubsubTopic( + "/test/2/waku-light-push/utf8", + ClusterId + ) + }); + + beforeEachCustom(this, async () => { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { + clusterId: ClusterId, + contentTopics: [TestEncoder.contentTopic, customEncoder2.contentTopic] + }, + { lightpush: true, filter: true }, + false, + numServiceNodes, + false + ); + }); + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + it("Subscribe and receive messages on 2 different pubsubtopics", async function () { + const pushResponse1 = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes("M1") + }); + const pushResponse2 = await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); + + expect(pushResponse1.successes.length).to.eq(numServiceNodes); + expect(pushResponse2.successes.length).to.eq(numServiceNodes); + + const messageCollector1 = new MessageCollector(serviceNodes.nodes[0]); + const messageCollector2 = new MessageCollector(serviceNodes.nodes[1]); + + expect( + await messageCollector1.waitForMessages(1, { + pubsubTopic: TestEncoder.pubsubTopic + }) + ).to.eq(true); + + expect( + await messageCollector2.waitForMessages(1, { + pubsubTopic: customEncoder2.pubsubTopic + }) + ).to.eq(true); + + messageCollector1.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestEncoder.contentTopic, + expectedPubsubTopic: TestEncoder.pubsubTopic + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedMessageText: "M2", + expectedContentTopic: customEncoder2.contentTopic, + expectedPubsubTopic: customEncoder2.pubsubTopic + }); + }); + + it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { + // Set up and start a new nwaku node with Default PubsubTopic + const nwaku2 = new ServiceNode(makeLogFileName(this) + "3"); + + try { + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + clusterId: ClusterId, + shard: [2] + }); + await nwaku2.ensureSubscriptionsAutosharding([ + customEncoder2.pubsubTopic + ]); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waku.waitForPeers([Protocols.LightPush]); + + const messageCollector2 = new MessageCollector(nwaku2); + + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes("M1") + }); + await waku.lightPush.send(customEncoder2, { + payload: utf8ToBytes("M2") + }); + + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: TestEncoder.pubsubTopic + }); + await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customEncoder2.contentTopic + }); + + serviceNodes.messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestEncoder.contentTopic, + expectedPubsubTopic: TestEncoder.pubsubTopic + }); + messageCollector2.verifyReceivedMessage(0, { + expectedMessageText: "M2", + expectedContentTopic: customEncoder2.contentTopic, + expectedPubsubTopic: customEncoder2.pubsubTopic + }); + } catch (e) { + await tearDownNodes([nwaku2], []); + } + }); +}); diff --git a/packages/tests/tests/light-push/single_node/index.node.spec.ts b/packages/tests/tests/light-push/single_node/index.node.spec.ts deleted file mode 100644 index 939f9500d2..0000000000 --- a/packages/tests/tests/light-push/single_node/index.node.spec.ts +++ /dev/null @@ -1,274 +0,0 @@ -import { createEncoder } from "@waku/core"; -import { IRateLimitProof, LightNode, ProtocolError } from "@waku/interfaces"; -import { utf8ToBytes } from "@waku/sdk"; -import { expect } from "chai"; - -import { - afterEachCustom, - beforeEachCustom, - generateRandomUint8Array, - MessageCollector, - ServiceNode, - tearDownNodes, - TEST_STRING -} from "../../../src/index.js"; -import { - messagePayload, - messageText, - runNodes, - TestContentTopic, - TestEncoder, - TestPubsubTopic, - TestShardInfo -} from "../utils.js"; - -describe("Waku Light Push: Single Node", function () { - // Set the timeout for all tests in this suite. Can be overwritten at test level - this.timeout(15000); - let waku: LightNode; - let nwaku: ServiceNode; - let messageCollector: MessageCollector; - - beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, TestShardInfo); - messageCollector = new MessageCollector(nwaku); - - await nwaku.ensureSubscriptions([TestPubsubTopic]); - }); - - afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - TEST_STRING.forEach((testItem) => { - it(`Push message with ${testItem.description} payload`, async function () { - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes(testItem.value) - }); - expect(pushResponse.successes.length).to.eq(1); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: testItem.value, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - }); - }); - - it("Push 30 different messages", async function () { - const generateMessageText = (index: number): string => `M${index}`; - - for (let i = 0; i < 30; i++) { - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes(generateMessageText(i)) - }); - expect(pushResponse.successes.length).to.eq(1); - } - - expect( - await messageCollector.waitForMessages(30, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - - for (let i = 0; i < 30; i++) { - messageCollector.verifyReceivedMessage(i, { - expectedMessageText: generateMessageText(i), - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - } - }); - - it("Throws when trying to push message with empty payload", async function () { - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: new Uint8Array() - }); - - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - ProtocolError.EMPTY_PAYLOAD - ); - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(false); - }); - - TEST_STRING.forEach((testItem) => { - it(`Push message with content topic containing ${testItem.description}`, async function () { - const customEncoder = createEncoder({ - contentTopic: testItem.value, - pubsubTopic: TestPubsubTopic - }); - const pushResponse = await waku.lightPush.send( - customEncoder, - messagePayload - ); - expect(pushResponse.successes.length).to.eq(1); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: testItem.value, - expectedPubsubTopic: TestPubsubTopic - }); - }); - }); - - it("Fails to push message with empty content topic", async function () { - try { - createEncoder({ contentTopic: "" }); - expect.fail("Expected an error but didn't get one"); - } catch (error) { - expect((error as Error).message).to.equal( - "Content topic must be specified" - ); - } - }); - - it("Push message with meta", async function () { - const customTestEncoder = createEncoder({ - contentTopic: TestContentTopic, - metaSetter: () => new Uint8Array(10), - pubsubTopic: TestPubsubTopic - }); - - const pushResponse = await waku.lightPush.send( - customTestEncoder, - messagePayload - ); - expect(pushResponse.successes.length).to.eq(1); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - }); - - it("Fails to push message with large meta", async function () { - const customTestEncoder = createEncoder({ - contentTopic: TestContentTopic, - pubsubTopic: TestPubsubTopic, - metaSetter: () => new Uint8Array(105024) // see the note below *** - }); - - // *** note: this test used 10 ** 6 when `nwaku` node had MaxWakuMessageSize == 1MiB ( 1*2^20 .) - // `nwaku` establishes the max lightpush msg size as `const MaxRpcSize* = MaxWakuMessageSize + 64 * 1024` - // see: https://github.com/waku-org/nwaku/blob/07beea02095035f4f4c234ec2dec1f365e6955b8/waku/waku_lightpush/rpc_codec.nim#L15 - // In the PR https://github.com/waku-org/nwaku/pull/2298 we reduced the MaxWakuMessageSize - // from 1MiB to 150KiB. Therefore, the 105024 number comes from substracting ( 1*2^20 - 150*2^10 ) - // to the original 10^6 that this test had when MaxWakuMessageSize == 1*2^20 - - const pushResponse = await waku.lightPush.send( - customTestEncoder, - messagePayload - ); - - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - ProtocolError.REMOTE_PEER_REJECTED - ); - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(false); - }); - - it("Push message with rate limit", async function () { - const rateLimitProof: IRateLimitProof = { - proof: utf8ToBytes("proofData"), - merkleRoot: utf8ToBytes("merkleRootData"), - epoch: utf8ToBytes("epochData"), - shareX: utf8ToBytes("shareXData"), - shareY: utf8ToBytes("shareYData"), - nullifier: utf8ToBytes("nullifierData"), - rlnIdentifier: utf8ToBytes("rlnIdentifierData") - }; - - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes(messageText), - rateLimitProof: rateLimitProof - }); - expect(pushResponse.successes.length).to.eq(1); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - }); - - [ - Date.now() - 3600000 * 24 * 356, - Date.now() - 3600000, - Date.now() + 3600000 - ].forEach((testItem) => { - it(`Push message with custom timestamp: ${testItem}`, async function () { - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes(messageText), - timestamp: new Date(testItem) - }); - expect(pushResponse.successes.length).to.eq(1); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedTimestamp: testItem, - expectedContentTopic: TestContentTopic, - expectedPubsubTopic: TestPubsubTopic - }); - }); - }); - - it("Push message equal or less that 1MB", async function () { - const bigPayload = generateRandomUint8Array(65536); - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: bigPayload - }); - expect(pushResponse.successes.length).to.greaterThan(0); - }); - - it("Fails to push message bigger that 1MB", async function () { - const MB = 1024 ** 2; - - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: generateRandomUint8Array(MB + 65536) - }); - expect(pushResponse.successes.length).to.eq(0); - expect(pushResponse.failures?.map((failure) => failure.error)).to.include( - ProtocolError.SIZE_TOO_BIG - ); - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: TestPubsubTopic - }) - ).to.eq(false); - }); -}); diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts deleted file mode 100644 index 02e6c7668a..0000000000 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ /dev/null @@ -1,460 +0,0 @@ -import type { PeerId } from "@libp2p/interface"; -import { createEncoder } from "@waku/core"; -import { - ContentTopicInfo, - LightNode, - NetworkConfig, - Protocols, - ShardInfo, - SingleShardInfo -} from "@waku/interfaces"; -import { - contentTopicToPubsubTopic, - contentTopicToShardIndex, - pubsubTopicToSingleShardInfo, - singleShardInfoToPubsubTopic -} from "@waku/utils"; -import { utf8ToBytes } from "@waku/utils/bytes"; -import { expect } from "chai"; -import { Context } from "mocha"; - -import { - afterEachCustom, - beforeEachCustom, - makeLogFileName, - MessageCollector, - ServiceNode, - tearDownNodes -} from "../../../src/index.js"; -import { messageText, runNodes } from "../utils.js"; - -describe("Waku Light Push : Multiple PubsubTopics", function () { - this.timeout(30000); - let waku: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; - - const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] }; - const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 }; - const singleShardInfo2: SingleShardInfo = { clusterId: 3, shard: 2 }; - - const customPubsubTopic1 = singleShardInfoToPubsubTopic(singleShardInfo1); - const customPubsubTopic2 = singleShardInfoToPubsubTopic(singleShardInfo2); - const customContentTopic1 = "/test/2/waku-light-push/utf8"; - const customContentTopic2 = "/test/3/waku-light-push/utf8"; - const customEncoder1 = createEncoder({ - pubsubTopicShardInfo: singleShardInfo1, - contentTopic: customContentTopic1 - }); - const customEncoder2 = createEncoder({ - pubsubTopicShardInfo: singleShardInfo2, - contentTopic: customContentTopic2 - }); - - let node1PeerId: PeerId; - - beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, shardInfo); - messageCollector = new MessageCollector(nwaku); - node1PeerId = await nwaku.getPeerId(); - }); - - afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], waku); - }); - - it("Push message on custom pubsubTopic", async function () { - const pushResponse = await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes(messageText) - }); - - expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: customPubsubTopic1 - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: customContentTopic1 - }); - }); - - it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - const pushResponse1 = await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes("M1") - }); - const pushResponse2 = await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); - expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); - expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); - - const messageCollector2 = new MessageCollector(nwaku); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: customPubsubTopic1 - }) - ).to.eq(true); - - expect( - await messageCollector2.waitForMessages(1, { - pubsubTopic: customPubsubTopic2 - }) - ).to.eq(true); - - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1", - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: customPubsubTopic1 - }); - messageCollector2.verifyReceivedMessage(0, { - expectedMessageText: "M2", - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: customPubsubTopic2 - }); - }); - - it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { - // Set up and start a new nwaku node with Default PubsubTopic - nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); - await nwaku2.start({ - filter: true, - lightpush: true, - relay: true, - pubsubTopic: [singleShardInfoToPubsubTopic(singleShardInfo2)], - clusterId: singleShardInfo2.clusterId - }); - await nwaku2.ensureSubscriptions([ - singleShardInfoToPubsubTopic(singleShardInfo2) - ]); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); - - const messageCollector2 = new MessageCollector(nwaku2); - - await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes("M1") - }); - await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); - - await messageCollector.waitForMessages(1, { - pubsubTopic: customPubsubTopic1 - }); - - await messageCollector2.waitForMessages(1, { - pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) - }); - - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1", - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: customPubsubTopic1 - }); - messageCollector2.verifyReceivedMessage(0, { - expectedMessageText: "M2", - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) - }); - }); -}); - -describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { - this.timeout(30000); - let waku: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; - - const clusterId = 4; - const customContentTopic1 = "/waku/2/content/test.js"; - const customContentTopic2 = "/myapp/1/latest/proto"; - const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( - customContentTopic1, - clusterId - ); - const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( - customContentTopic2, - clusterId - ); - const shardInfo: ContentTopicInfo = { - clusterId, - contentTopics: [customContentTopic1, customContentTopic2] - }; - const customEncoder1 = createEncoder({ - contentTopic: customContentTopic1, - pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(autoshardingPubsubTopic1) - }); - const customEncoder2 = createEncoder({ - contentTopic: customContentTopic2, - pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(autoshardingPubsubTopic2) - }); - - let node1PeerId: PeerId; - - beforeEachCustom(this, async () => { - [nwaku, waku] = await runNodes(this.ctx, shardInfo); - messageCollector = new MessageCollector(nwaku); - node1PeerId = await nwaku.getPeerId(); - }); - - afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], waku); - }); - - it("Push message on custom pubsubTopic", async function () { - const pushResponse = await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes(messageText) - }); - - expect(pushResponse.failures).to.be.empty; - expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); - - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: customContentTopic1 - }); - }); - - it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - const pushResponse1 = await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes("M1") - }); - const pushResponse2 = await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); - expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); - expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); - - const messageCollector2 = new MessageCollector(nwaku); - - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 - }) - ).to.eq(true); - - expect( - await messageCollector2.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic2 - }) - ).to.eq(true); - - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1", - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: autoshardingPubsubTopic1 - }); - messageCollector2.verifyReceivedMessage(0, { - expectedMessageText: "M2", - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: autoshardingPubsubTopic2 - }); - }); - - it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { - // Set up and start a new nwaku node with Default PubsubTopic - nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); - await nwaku2.start({ - filter: true, - lightpush: true, - relay: true, - pubsubTopic: [autoshardingPubsubTopic2], - clusterId: shardInfo.clusterId - }); - await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); - - const messageCollector2 = new MessageCollector(nwaku2); - - await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes("M1") - }); - await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); - - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 - }); - await messageCollector2.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic2 - }); - - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1", - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: autoshardingPubsubTopic1 - }); - messageCollector2.verifyReceivedMessage(0, { - expectedMessageText: "M2", - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: autoshardingPubsubTopic2 - }); - }); -}); - -describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () { - this.timeout(30000); - let waku: LightNode; - let waku2: LightNode; - let nwaku: ServiceNode; - let nwaku2: ServiceNode; - let messageCollector: MessageCollector; - let ctx: Context; - - const clusterId = 3; - const customContentTopic1 = "/waku/2/content/utf8"; - const customContentTopic2 = "/myapp/1/latest/proto"; - const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( - customContentTopic1, - clusterId - ); - const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( - customContentTopic2, - clusterId - ); - - const shardInfo1 = { - clusterId, - shards: [contentTopicToShardIndex(customContentTopic1)] - }; - const customEncoder1 = createEncoder({ - contentTopic: customContentTopic1, - pubsubTopicShardInfo: shardInfo1 - }); - - const shardInfo2 = { - clusterId, - shards: [contentTopicToShardIndex(customContentTopic2)] - }; - const customEncoder2 = createEncoder({ - contentTopic: customContentTopic2, - pubsubTopicShardInfo: shardInfo2 - }); - - const testShardInfo: NetworkConfig = { - clusterId, - shards: [ - contentTopicToShardIndex(customContentTopic1), - contentTopicToShardIndex(customContentTopic2) - ] - }; - - let node1PeerId: PeerId; - - beforeEachCustom(this, async () => { - ctx = this.ctx; - [nwaku, waku] = await runNodes(ctx, testShardInfo); - messageCollector = new MessageCollector(nwaku); - node1PeerId = await nwaku.getPeerId(); - }); - - afterEachCustom(this, async () => { - await tearDownNodes([nwaku, nwaku2], [waku, waku2]); - }); - - it("Push message on custom pubsubTopic", async function () { - const pushResponse = await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes(messageText) - }); - - expect(pushResponse.successes[0].toString()).to.eq(node1PeerId.toString()); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic1 - }) - ).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: customContentTopic1 - }); - }); - - it("Subscribe and receive messages on 2 different pubsubtopics", async function () { - const pushResponse1 = await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes("M1") - }); - const pushResponse2 = await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); - expect(pushResponse1.successes[0].toString()).to.eq(node1PeerId.toString()); - expect(pushResponse2.successes[0].toString()).to.eq(node1PeerId.toString()); - - const messageCollector2 = new MessageCollector(nwaku); - - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic1 - }) - ).to.eq(true); - - expect( - await messageCollector2.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic2 - }) - ).to.eq(true); - - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1", - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: autoshardingPubsubTopic1 - }); - messageCollector2.verifyReceivedMessage(0, { - expectedMessageText: "M2", - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: autoshardingPubsubTopic2 - }); - }); - - it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { - // Set up and start a new nwaku node with Default PubsubTopic - [nwaku2] = await runNodes(ctx, shardInfo2); - - await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); - - const messageCollector2 = new MessageCollector(nwaku2); - - const { failures: f1 } = await waku.lightPush.send(customEncoder1, { - payload: utf8ToBytes("M1") - }); - const { failures: f2 } = await waku.lightPush.send(customEncoder2, { - payload: utf8ToBytes("M2") - }); - - expect(f1).to.be.empty; - expect(f2).to.be.empty; - - await messageCollector.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic1 - }); - await messageCollector2.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic2 - }); - - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1", - expectedContentTopic: customContentTopic1, - expectedPubsubTopic: autoshardingPubsubTopic1 - }); - messageCollector2.verifyReceivedMessage(0, { - expectedMessageText: "M2", - expectedContentTopic: customContentTopic2, - expectedPubsubTopic: autoshardingPubsubTopic2 - }); - }); -}); diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index 43f1a6dbb9..b77f75a9fa 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -1,8 +1,11 @@ import { createEncoder } from "@waku/core"; +import { LightNode, NetworkConfig, Protocols } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; +import { createLightNode } from "@waku/sdk"; import { contentTopicToPubsubTopic, Logger } from "@waku/utils"; +import { Context } from "mocha"; -import { runNodes } from "../filter/single_node/utils.js"; +import { runNodes as runNodesBuilder, ServiceNode } from "../../src/index.js"; // Constants for test configuration. export const log = new Logger("test:lightpush"); @@ -23,4 +26,13 @@ export const TestEncoder = createEncoder({ export const messageText = "Light Push works!"; export const messagePayload = { payload: utf8ToBytes(messageText) }; -export { runNodes }; +export const runNodes = ( + context: Context, + shardInfo: NetworkConfig +): Promise<[ServiceNode, LightNode]> => + runNodesBuilder({ + context, + createNode: createLightNode, + protocols: [Protocols.LightPush, Protocols.Filter], + networkConfig: shardInfo + }); diff --git a/packages/tests/tests/metadata.spec.ts b/packages/tests/tests/metadata.spec.ts index 80625e2bb3..ee954f09de 100644 --- a/packages/tests/tests/metadata.spec.ts +++ b/packages/tests/tests/metadata.spec.ts @@ -2,7 +2,6 @@ import { MetadataCodec } from "@waku/core"; import type { LightNode, ShardInfo } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { decodeRelayShard } from "@waku/utils"; -import { shardInfoToPubsubTopics } from "@waku/utils"; import chai, { expect } from "chai"; import chaiAsPromised from "chai-as-promised"; @@ -42,7 +41,7 @@ describe("Metadata Protocol", function () { discv5Discovery: true, peerExchange: true, clusterId: shardInfo.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) + shard: shardInfo.shards }); const nwaku1Ma = await nwaku1.getMultiaddrWithId(); @@ -89,7 +88,7 @@ describe("Metadata Protocol", function () { discv5Discovery: true, peerExchange: true, clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1) + shard: shardInfo1.shards }); const nwaku1Ma = await nwaku1.getMultiaddrWithId(); @@ -136,7 +135,7 @@ describe("Metadata Protocol", function () { discv5Discovery: true, peerExchange: true, clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1) + shard: shardInfo1.shards }); const nwaku1Ma = await nwaku1.getMultiaddrWithId(); @@ -174,7 +173,7 @@ describe("Metadata Protocol", function () { discv5Discovery: true, peerExchange: true, clusterId: shardInfo1.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo1) + shard: shardInfo1.shards }); const nwaku1Ma = await nwaku1.getMultiaddrWithId(); @@ -209,7 +208,7 @@ describe("Metadata Protocol", function () { discv5Discovery: true, peerExchange: true, clusterId: shardInfo.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) + shard: shardInfo.shards }); const nwaku1Ma = await nwaku1.getMultiaddrWithId(); @@ -245,7 +244,7 @@ describe("Metadata Protocol", function () { discv5Discovery: true, peerExchange: true, clusterId: shardInfo.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) + shard: shardInfo.shards }); const nwaku1Ma = await nwaku1.getMultiaddrWithId(); diff --git a/packages/tests/tests/peer-exchange/index.spec.ts b/packages/tests/tests/peer-exchange/index.spec.ts index 9cd019e1a9..478892de9e 100644 --- a/packages/tests/tests/peer-exchange/index.spec.ts +++ b/packages/tests/tests/peer-exchange/index.spec.ts @@ -32,13 +32,15 @@ describe("Peer Exchange", function () { nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); await nwaku1.start({ - pubsubTopic: [DefaultTestPubsubTopic], + clusterId: DefaultTestShardInfo.clusterId, + shard: DefaultTestShardInfo.shards, discv5Discovery: true, peerExchange: true, relay: true }); await nwaku2.start({ - pubsubTopic: [DefaultTestPubsubTopic], + clusterId: DefaultTestShardInfo.clusterId, + shard: DefaultTestShardInfo.shards, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: (await nwaku1.info()).enrUri, @@ -130,7 +132,8 @@ describe("Peer Exchange", function () { nwaku3 = new ServiceNode(makeLogFileName(this) + "3"); await nwaku3.start({ - pubsubTopic: [DefaultTestPubsubTopic], + clusterId: DefaultTestShardInfo.clusterId, + shard: DefaultTestShardInfo.shards, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: (await nwaku1.info()).enrUri, diff --git a/packages/tests/tests/peer-exchange/query.spec.ts b/packages/tests/tests/peer-exchange/query.spec.ts index 57be183659..825ef7fb47 100644 --- a/packages/tests/tests/peer-exchange/query.spec.ts +++ b/packages/tests/tests/peer-exchange/query.spec.ts @@ -24,7 +24,13 @@ import { export const log = new Logger("test:pe"); -const pubsubTopic = [singleShardInfoToPubsubTopic({ clusterId: 0, shard: 2 })]; +const ShardInfo = { clusterId: 0, shards: [2] }; +const pubsubTopic = [ + singleShardInfoToPubsubTopic({ + clusterId: ShardInfo.clusterId, + shard: ShardInfo.shards[0] + }) +]; describe("Peer Exchange Query", function () { this.timeout(30_000); @@ -47,21 +53,24 @@ describe("Peer Exchange Query", function () { nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); nwaku3 = new ServiceNode(makeLogFileName(this.ctx) + "3"); await nwaku1.start({ - pubsubTopic: pubsubTopic, + shard: ShardInfo.shards, + clusterId: ShardInfo.clusterId, discv5Discovery: true, peerExchange: true, relay: true }); nwaku1PeerId = await nwaku1.getPeerId(); await nwaku2.start({ - pubsubTopic: pubsubTopic, + shard: ShardInfo.shards, + clusterId: ShardInfo.clusterId, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: (await nwaku1.info()).enrUri, relay: true }); await nwaku3.start({ - pubsubTopic: pubsubTopic, + shard: ShardInfo.shards, + clusterId: ShardInfo.clusterId, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: (await nwaku2.info()).enrUri, diff --git a/packages/tests/tests/sharding/auto_sharding.spec.ts b/packages/tests/tests/sharding/auto_sharding.spec.ts index 4d02260761..6fb522376e 100644 --- a/packages/tests/tests/sharding/auto_sharding.spec.ts +++ b/packages/tests/tests/sharding/auto_sharding.spec.ts @@ -1,4 +1,4 @@ -import { LightNode, ProtocolError, Protocols } from "@waku/interfaces"; +import { LightNode, ProtocolError } from "@waku/interfaces"; import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; import { contentTopicToPubsubTopic, @@ -8,11 +8,9 @@ import { expect } from "chai"; import { afterEachCustom, - beforeEachCustom, - makeLogFileName, - MessageCollector, - ServiceNode, - tearDownNodes + runMultipleNodes, + ServiceNodesFleet, + teardownNodesWithRedundancy } from "../../src/index.js"; const ContentTopic = "/waku/2/content/test.js"; @@ -21,43 +19,104 @@ const ContentTopic2 = "/myapp/1/latest/proto"; describe("Autosharding: Running Nodes", function () { this.timeout(50000); const clusterId = 10; - let waku: LightNode; - let nwaku: ServiceNode; - let messageCollector: MessageCollector; + const numServiceNodes = 2; - beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - messageCollector = new MessageCollector(nwaku); - }); + let waku: LightNode | undefined = undefined; + let serviceNodes: ServiceNodesFleet | undefined = undefined; afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); + if (serviceNodes) { + await teardownNodesWithRedundancy(serviceNodes, waku ?? []); + } }); - describe("Different clusters and topics", function () { - // js-waku allows autosharding for cluster IDs different than 1 - it("Cluster ID 0 - Default/Global Cluster", async function () { - const clusterId = 0; - const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; - await nwaku.start({ - store: true, - lightpush: true, - relay: true, + // js-waku allows autosharding for cluster IDs different than 1 + it("Cluster ID 0 - Default/Global Cluster", async function () { + const clusterId = 0; + + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { clusterId, contentTopics: [ContentTopic] }, + { lightpush: true, filter: true }, + false, + numServiceNodes, + true + ); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { clusterId: clusterId, - pubsubTopic: pubsubTopics - }); + shard: contentTopicToShardIndex(ContentTopic) + } + }); - await nwaku.ensureSubscriptions(pubsubTopics); + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, - contentTopics: [ContentTopic] - } - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + expect(request.successes.length).to.eq(numServiceNodes); + expect( + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: encoder.pubsubTopic + }) + ).to.eq(true); + }); + + it("Non TWN Cluster", async function () { + const clusterId = 5; + + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { clusterId, contentTopics: [ContentTopic] }, + { lightpush: true, filter: true }, + false, + numServiceNodes, + true + ); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic) + } + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.successes.length).to.eq(numServiceNodes); + expect( + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: encoder.pubsubTopic + }) + ).to.eq(true); + }); + + const numTest = 10; + for (let i = 0; i < numTest; i++) { + // Random ContentTopic + const applicationName = `app${Math.floor(Math.random() * 100)}`; // Random application name app0 to app99 + const version = Math.floor(Math.random() * 10) + 1; // Random version between 1 and 10 + const topicName = `topic${Math.floor(Math.random() * 1000)}`; // Random topic name topic0 to topic999 + const encodingList = ["proto", "json", "xml", "test.js", "utf8"]; // Potential encodings + const encoding = + encodingList[Math.floor(Math.random() * encodingList.length)]; // Random encoding + const ContentTopic = `/${applicationName}/${version}/${topicName}/${encoding}`; + + it(`random auto sharding ${ + i + 1 + } - Cluster ID: ${clusterId}, Content Topic: ${ContentTopic}`, async function () { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { clusterId, contentTopics: [ContentTopic] }, + { lightpush: true, filter: true }, + false, + numServiceNodes, + true + ); const encoder = createEncoder({ contentTopic: ContentTopic, @@ -71,256 +130,129 @@ describe("Autosharding: Running Nodes", function () { payload: utf8ToBytes("Hello World") }); - expect(request.successes.length).to.eq(1); + expect(request.successes.length).to.eq(numServiceNodes); expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: ContentTopic + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: encoder.pubsubTopic }) ).to.eq(true); }); + } - it("Non TWN Cluster", async function () { - const clusterId = 5; - const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; - await nwaku.start({ - store: true, - lightpush: true, - relay: true, + // TODO: replace with unit tests + it("Wrong topic", async function () { + const wrongTopic = "wrong_format"; + try { + contentTopicToPubsubTopic(wrongTopic, clusterId); + throw new Error("Wrong topic should've thrown an error"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes("Content topic format is invalid") + ) { + throw err; + } + } + }); + + it("configure the node with multiple content topics", async function () { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { clusterId, contentTopics: [ContentTopic, ContentTopic2] }, + { lightpush: true, filter: true }, + false, + numServiceNodes, + true + ); + + const encoder1 = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: { clusterId: clusterId, - pubsubTopic: pubsubTopics - }); - - await nwaku.ensureSubscriptions(pubsubTopics); - - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, - contentTopics: [ContentTopic] - } - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); - - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic) - } - }); - - const request = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - expect(request.successes.length).to.eq(1); - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: ContentTopic - }) - ).to.eq(true); + shard: contentTopicToShardIndex(ContentTopic) + } }); - const numTest = 10; - for (let i = 0; i < numTest; i++) { - // Random ContentTopic - const applicationName = `app${Math.floor(Math.random() * 100)}`; // Random application name app0 to app99 - const version = Math.floor(Math.random() * 10) + 1; // Random version between 1 and 10 - const topicName = `topic${Math.floor(Math.random() * 1000)}`; // Random topic name topic0 to topic999 - const encodingList = ["proto", "json", "xml", "test.js", "utf8"]; // Potential encodings - const encoding = - encodingList[Math.floor(Math.random() * encodingList.length)]; // Random encoding - const ContentTopic = `/${applicationName}/${version}/${topicName}/${encoding}`; + const encoder2 = createEncoder({ + contentTopic: ContentTopic2, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic2) + } + }); - it(`random auto sharding ${ - i + 1 - } - Cluster ID: ${clusterId}, Content Topic: ${ContentTopic}`, async function () { - const pubsubTopics = [ - contentTopicToPubsubTopic(ContentTopic, clusterId) - ]; + const request1 = await waku.lightPush.send(encoder1, { + payload: utf8ToBytes("Hello World") + }); + expect(request1.successes.length).to.eq(numServiceNodes); + expect( + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: encoder1.pubsubTopic + }) + ).to.eq(true); - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic] - }); + const request2 = await waku.lightPush.send(encoder2, { + payload: utf8ToBytes("Hello World") + }); + expect(request2.successes.length).to.eq(numServiceNodes); + expect( + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: encoder2.pubsubTopic + }) + ).to.eq(true); + }); - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, - contentTopics: [ContentTopic] - } - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); + it("using a protocol with unconfigured pubsub topic should fail", async function () { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { clusterId, contentTopics: [ContentTopic] }, + { lightpush: true, filter: true }, + false, + numServiceNodes, + true + ); - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic) - } - }); + // use a content topic that is not configured + const encoder = createEncoder({ + contentTopic: ContentTopic2, + pubsubTopicShardInfo: { + clusterId: clusterId, + shard: contentTopicToShardIndex(ContentTopic2) + } + }); - const request = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); + const { successes, failures } = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); - expect(request.successes.length).to.eq(1); - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: ContentTopic - }) - ).to.eq(true); - }); + if (successes.length > 0 || failures?.length === 0) { + throw new Error("The request should've thrown an error"); } - it("Wrong topic", async function () { - const wrongTopic = "wrong_format"; - try { - contentTopicToPubsubTopic(wrongTopic, clusterId); - throw new Error("Wrong topic should've thrown an error"); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes("Content topic format is invalid") - ) { - throw err; - } - } - }); + const errors = failures?.map((failure) => failure.error); + expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); }); - describe("Others", function () { - it("configure the node with multiple content topics", async function () { - const pubsubTopics = [ - contentTopicToPubsubTopic(ContentTopic, clusterId), - contentTopicToPubsubTopic(ContentTopic2, clusterId) - ]; - - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic, ContentTopic2] - }); - + it("start node with empty content topic", async function () { + try { waku = await createLightNode({ networkConfig: { clusterId: clusterId, - // For autosharding, we configure multiple pubsub topics by using two content topics that hash to different shards - contentTopics: [ContentTopic, ContentTopic2] + contentTopics: [] } }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); - - const encoder1 = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic) - } - }); - - const encoder2 = createEncoder({ - contentTopic: ContentTopic2, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic2) - } - }); - - const request1 = await waku.lightPush.send(encoder1, { - payload: utf8ToBytes("Hello World") - }); - expect(request1.successes.length).to.eq(1); - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: ContentTopic - }) - ).to.eq(true); - - const request2 = await waku.lightPush.send(encoder2, { - payload: utf8ToBytes("Hello World") - }); - expect(request2.successes.length).to.eq(1); - expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: ContentTopic - }) - ).to.eq(true); - }); - - it("using a protocol with unconfigured pubsub topic should fail", async function () { - const pubsubTopics = [contentTopicToPubsubTopic(ContentTopic, clusterId)]; - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: pubsubTopics, - contentTopic: [ContentTopic] - }); - - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, - contentTopics: [ContentTopic] - } - }); - await waku.start(); - - // use a content topic that is not configured - const encoder = createEncoder({ - contentTopic: ContentTopic2, - pubsubTopicShardInfo: { - clusterId: clusterId, - shard: contentTopicToShardIndex(ContentTopic2) - } - }); - - const { successes, failures } = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - if (successes.length > 0 || failures?.length === 0) { - throw new Error("The request should've thrown an error"); + throw new Error( + "Starting the node with no content topic should've thrown an error" + ); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "Invalid content topics configuration: please provide at least one content topic" + ) + ) { + throw err; } - - const errors = failures?.map((failure) => failure.error); - expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); - }); - - it("start node with empty content topic", async function () { - try { - waku = await createLightNode({ - networkConfig: { - clusterId: clusterId, - contentTopics: [] - } - }); - throw new Error( - "Starting the node with no content topic should've thrown an error" - ); - } catch (err) { - if ( - !(err instanceof Error) || - !err.message.includes( - "Invalid content topics configuration: please provide at least one content topic" - ) - ) { - throw err; - } - } - }); + } }); }); diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index c912f475ae..4efd6ad922 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -10,6 +10,7 @@ import { } from "@waku/sdk"; import { contentTopicToPubsubTopic, + contentTopicToShardIndex, singleShardInfoToPubsubTopic } from "@waku/utils"; import chai, { expect } from "chai"; @@ -57,7 +58,6 @@ describe("Static Sharding: Peer Management", function () { const shardInfo: ShardInfo = { clusterId: clusterId, shards: [2] }; await nwaku1.start({ - pubsubTopic: pubsubTopics, discv5Discovery: true, peerExchange: true, relay: true, @@ -68,7 +68,6 @@ describe("Static Sharding: Peer Management", function () { const enr1 = (await nwaku1.info()).enrUri; await nwaku2.start({ - pubsubTopic: pubsubTopics, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr1, @@ -80,7 +79,6 @@ describe("Static Sharding: Peer Management", function () { const enr2 = (await nwaku2.info()).enrUri; await nwaku3.start({ - pubsubTopic: pubsubTopics, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr2, @@ -133,13 +131,9 @@ describe("Static Sharding: Peer Management", function () { singleShardInfoToPubsubTopic({ clusterId: clusterId, shard: 2 }) ]; const shardInfoToDial: ShardInfo = { clusterId: clusterId, shards: [2] }; - const pubsubTopicsToIgnore = [ - singleShardInfoToPubsubTopic({ clusterId: clusterId, shard: 1 }) - ]; // this service node is not subscribed to the shard await nwaku1.start({ - pubsubTopic: pubsubTopicsToIgnore, relay: true, discv5Discovery: true, peerExchange: true, @@ -150,7 +144,6 @@ describe("Static Sharding: Peer Management", function () { const enr1 = (await nwaku1.info()).enrUri; await nwaku2.start({ - pubsubTopic: pubsubTopicsToDial, relay: true, discv5Discovery: true, peerExchange: true, @@ -162,7 +155,6 @@ describe("Static Sharding: Peer Management", function () { const enr2 = (await nwaku2.info()).enrUri; await nwaku3.start({ - pubsubTopic: pubsubTopicsToDial, relay: true, discv5Discovery: true, peerExchange: true, @@ -213,6 +205,7 @@ describe("Static Sharding: Peer Management", function () { describe("Autosharding: Peer Management", function () { const ContentTopic = "/myapp/1/latest/proto"; const clusterId = 8; + const Shard = [contentTopicToShardIndex(ContentTopic)]; describe("Peer Exchange", function () { let waku: LightNode; @@ -243,35 +236,35 @@ describe("Autosharding: Peer Management", function () { }; await nwaku1.start({ - pubsubTopic: pubsubTopics, discv5Discovery: true, peerExchange: true, relay: true, clusterId: clusterId, + shard: Shard, contentTopic: [ContentTopic] }); const enr1 = (await nwaku1.info()).enrUri; await nwaku2.start({ - pubsubTopic: pubsubTopics, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr1, relay: true, clusterId: clusterId, + shard: Shard, contentTopic: [ContentTopic] }); const enr2 = (await nwaku2.info()).enrUri; await nwaku3.start({ - pubsubTopic: pubsubTopics, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr2, relay: true, clusterId: clusterId, + shard: Shard, contentTopic: [ContentTopic] }); const nwaku3Ma = await nwaku3.getMultiaddrWithId(); @@ -322,38 +315,37 @@ describe("Autosharding: Peer Management", function () { clusterId: clusterId, contentTopics: [ContentTopic] }; - const pubsubTopicsToIgnore = [contentTopicToPubsubTopic(ContentTopic, 3)]; // this service node is not subscribed to the shard await nwaku1.start({ - pubsubTopic: pubsubTopicsToIgnore, relay: true, discv5Discovery: true, peerExchange: true, - clusterId: 3 + clusterId: 3, + shard: Shard }); const enr1 = (await nwaku1.info()).enrUri; await nwaku2.start({ - pubsubTopic: pubsubTopicsToDial, relay: true, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr1, clusterId: clusterId, + shard: Shard, contentTopic: [ContentTopic] }); const enr2 = (await nwaku2.info()).enrUri; await nwaku3.start({ - pubsubTopic: pubsubTopicsToDial, relay: true, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: enr2, clusterId: clusterId, + shard: Shard, contentTopic: [ContentTopic] }); const nwaku3Ma = await nwaku3.getMultiaddrWithId(); diff --git a/packages/tests/tests/sharding/static_sharding.spec.ts b/packages/tests/tests/sharding/static_sharding.spec.ts index 0178dac497..776c106bf6 100644 --- a/packages/tests/tests/sharding/static_sharding.spec.ts +++ b/packages/tests/tests/sharding/static_sharding.spec.ts @@ -1,10 +1,4 @@ -import { - LightNode, - ProtocolError, - Protocols, - ShardInfo, - SingleShardInfo -} from "@waku/interfaces"; +import { LightNode, ProtocolError, SingleShardInfo } from "@waku/interfaces"; import { createEncoder, createLightNode, utf8ToBytes } from "@waku/sdk"; import { shardInfoToPubsubTopics, @@ -16,172 +10,134 @@ import { expect } from "chai"; import { afterEachCustom, beforeEachCustom, - makeLogFileName, - MessageCollector, - ServiceNode, - tearDownNodes + runMultipleNodes, + ServiceNodesFleet, + teardownNodesWithRedundancy } from "../../src/index.js"; const ContentTopic = "/waku/2/content/test.js"; describe("Static Sharding: Running Nodes", function () { this.timeout(15_000); - let waku: LightNode; - let nwaku: ServiceNode; - let messageCollector: MessageCollector; + const numServiceNodes = 2; - beforeEachCustom(this, async () => { - nwaku = new ServiceNode(makeLogFileName(this.ctx)); - messageCollector = new MessageCollector(nwaku); - }); + let waku: LightNode | undefined = undefined; + let serviceNodes: ServiceNodesFleet | undefined = undefined; afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); - }); - - describe("Different clusters and shards", function () { - it("shard 0", async function () { - const singleShardInfo = { clusterId: 0, shard: 0 }; - const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); - - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) - }); - - await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); - - waku = await createLightNode({ - networkConfig: shardInfo - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); - - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: singleShardInfo - }); - expect(encoder.pubsubTopic).to.eq( - singleShardInfoToPubsubTopic(singleShardInfo) - ); - - const request = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - expect(request.successes.length).to.eq(1); - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] - }) - ).to.eq(true); - }); - - // dedicated test for Default Cluster ID 0 - it("Cluster ID 0 - Default/Global Cluster", async function () { - const singleShardInfo = { clusterId: 0, shard: 1 }; - const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); - - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) - }); - - await nwaku.ensureSubscriptions(shardInfoToPubsubTopics(shardInfo)); - - waku = await createLightNode({ - networkConfig: shardInfo - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); - - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: singleShardInfo - }); - - const request = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - expect(request.successes.length).to.eq(1); - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] - }) - ).to.eq(true); - }); - - const numTest = 10; - for (let i = 0; i < numTest; i++) { - // Random clusterId between 2 and 1000 - const clusterId = Math.floor(Math.random() * 999) + 2; - - // Random shardId between 1 and 1000 - const shardId = Math.floor(Math.random() * 1000) + 1; - - it(`random static sharding ${ - i + 1 - } - Cluster ID: ${clusterId}, Shard ID: ${shardId}`, async function () { - afterEach(async () => { - await tearDownNodes(nwaku, waku); - }); - - const singleShardInfo = { clusterId: clusterId, shard: shardId }; - const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); - - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) - }); - - waku = await createLightNode({ - networkConfig: shardInfo - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); - - const encoder = createEncoder({ - contentTopic: ContentTopic, - pubsubTopicShardInfo: singleShardInfo - }); - - const request = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello World") - }); - - expect(request.successes.length).to.eq(1); - expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] - }) - ).to.eq(true); - }); + if (serviceNodes) { + await teardownNodesWithRedundancy(serviceNodes, waku ?? []); } }); + it("shard 0", async function () { + const singleShardInfo = { clusterId: 0, shard: 0 }; + const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); + + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfo, + { lightpush: true, filter: true }, + false, + numServiceNodes, + true + ); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + expect(encoder.pubsubTopic).to.eq( + singleShardInfoToPubsubTopic(singleShardInfo) + ); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.successes.length).to.eq(numServiceNodes); + expect( + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: encoder.pubsubTopic + }) + ).to.eq(true); + }); + + // dedicated test for Default Cluster ID 0 + it("Cluster ID 0 - Default/Global Cluster", async function () { + const singleShardInfo = { clusterId: 0, shard: 1 }; + const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); + + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfo, + { lightpush: true, filter: true }, + false, + numServiceNodes, + true + ); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.successes.length).to.eq(numServiceNodes); + expect( + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }) + ).to.eq(true); + }); + + const numTest = 10; + for (let i = 0; i < numTest; i++) { + // Random clusterId between 2 and 1000 + const clusterId = Math.floor(Math.random() * 999) + 2; + + // Random shardId between 1 and 1000 + const shardId = Math.floor(Math.random() * 1000) + 1; + + it(`random static sharding ${ + i + 1 + } - Cluster ID: ${clusterId}, Shard ID: ${shardId}`, async function () { + const singleShardInfo = { clusterId: clusterId, shard: shardId }; + const shardInfo = singleShardInfosToShardInfo([singleShardInfo]); + + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + shardInfo, + { lightpush: true, filter: true }, + false, + numServiceNodes, + true + ); + + const encoder = createEncoder({ + contentTopic: ContentTopic, + pubsubTopicShardInfo: singleShardInfo + }); + + const request = await waku.lightPush.send(encoder, { + payload: utf8ToBytes("Hello World") + }); + + expect(request.successes.length).to.eq(numServiceNodes); + expect( + await serviceNodes.messageCollector.waitForMessages(1, { + pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + }) + ).to.eq(true); + }); + } + describe("Others", function () { const clusterId = 2; - let shardInfo: ShardInfo; - const shardInfoFirstShard: ShardInfo = { - clusterId: clusterId, - shards: [2] - }; - const shardInfoBothShards: ShardInfo = { - clusterId: clusterId, - shards: [2, 3] - }; const singleShardInfo1: SingleShardInfo = { clusterId: clusterId, shard: 2 @@ -192,32 +148,23 @@ describe("Static Sharding: Running Nodes", function () { }; beforeEachCustom(this, async () => { - shardInfo = { - clusterId: clusterId, - shards: [2] - }; - - await nwaku.start({ - store: true, - lightpush: true, - relay: true, - clusterId: clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo) - }); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + { clusterId, shards: [2, 3] }, + { lightpush: true, filter: true }, + false, + numServiceNodes, + true + ); }); afterEachCustom(this, async () => { - await tearDownNodes(nwaku, waku); + if (serviceNodes) { + await teardownNodesWithRedundancy(serviceNodes, waku ?? []); + } }); it("configure the node with multiple pubsub topics", async function () { - waku = await createLightNode({ - networkConfig: shardInfoBothShards - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.waitForPeers([Protocols.LightPush]); - const encoder1 = createEncoder({ contentTopic: ContentTopic, pubsubTopicShardInfo: singleShardInfo1 @@ -228,49 +175,53 @@ describe("Static Sharding: Running Nodes", function () { pubsubTopicShardInfo: singleShardInfo2 }); - const request1 = await waku.lightPush.send(encoder1, { + const request1 = await waku?.lightPush.send(encoder1, { payload: utf8ToBytes("Hello World2") }); - expect(request1.successes.length).to.eq(1); + + expect(request1?.successes.length).to.eq(numServiceNodes); expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + await serviceNodes?.messageCollector.waitForMessages(1, { + pubsubTopic: encoder1.pubsubTopic }) ).to.eq(true); - const request2 = await waku.lightPush.send(encoder2, { + const request2 = await waku?.lightPush.send(encoder2, { payload: utf8ToBytes("Hello World3") }); - expect(request2.successes.length).to.eq(1); + + expect(request2?.successes.length).to.eq(numServiceNodes); expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: shardInfoToPubsubTopics(shardInfo)[0] + await serviceNodes?.messageCollector.waitForMessages(1, { + pubsubTopic: encoder2.pubsubTopic }) ).to.eq(true); }); it("using a protocol with unconfigured pubsub topic should fail", async function () { this.timeout(15_000); - waku = await createLightNode({ - networkConfig: shardInfoFirstShard - }); - await waku.start(); // use a pubsub topic that is not configured const encoder = createEncoder({ contentTopic: ContentTopic, - pubsubTopicShardInfo: singleShardInfo2 + pubsubTopicShardInfo: { + clusterId, + shard: 4 + } }); - const { successes, failures } = await waku.lightPush.send(encoder, { + const request = await waku?.lightPush.send(encoder, { payload: utf8ToBytes("Hello World") }); - if (successes.length > 0 || failures?.length === 0) { + if ( + (request?.successes.length || 0) > 0 || + request?.failures?.length === 0 + ) { throw new Error("The request should've thrown an error"); } - const errors = failures?.map((failure) => failure.error); + const errors = request?.failures?.map((failure) => failure.error); expect(errors).to.include(ProtocolError.TOPIC_NOT_CONFIGURED); }); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 99878acb96..3788fb62b1 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -109,8 +109,8 @@ describe("Waku Store, custom pubsub topic", function () { nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); await nwaku2.start({ store: true, - pubsubTopic: [TestDecoder2.pubsubTopic], clusterId: TestShardInfo.clusterId, + shard: TestShardInfo.shards, relay: true }); await nwaku2.ensureSubscriptions([TestDecoder2.pubsubTopic]); @@ -153,7 +153,8 @@ describe("Waku Store, custom pubsub topic", function () { }); }); -describe("Waku Store (Autosharding), custom pubsub topic", function () { +// TODO: blocked by https://github.com/waku-org/nwaku/issues/3362 +describe.skip("Waku Store (Autosharding), custom pubsub topic", function () { this.timeout(15000); let waku: LightNode; let nwaku: ServiceNode; @@ -162,6 +163,7 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { const customContentTopic1 = "/waku/2/content/utf8"; const customContentTopic2 = "/myapp/1/latest/proto"; const clusterId = 5; + const Shard2 = [1]; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( customContentTopic1, clusterId @@ -244,10 +246,10 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); await nwaku2.start({ store: true, - pubsubTopic: [autoshardingPubsubTopic2], contentTopic: [customContentTopic2], relay: true, - clusterId + clusterId, + shard: Shard2 }); await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); @@ -368,9 +370,9 @@ describe("Waku Store (named sharding), custom pubsub topic", function () { nwaku2 = new ServiceNode(makeLogFileName(this) + "2"); await nwaku2.start({ store: true, - pubsubTopic: [TestDecoder2.pubsubTopic], relay: true, - clusterId: TestShardInfo.clusterId + clusterId: TestShardInfo.clusterId, + shard: TestShardInfo.shards }); await nwaku2.ensureSubscriptions([TestDecoder2.pubsubTopic]); diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index ee2765b199..a262e8f041 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -13,7 +13,7 @@ import { tearDownNodes } from "../src/index.js"; -import { runNodes } from "./filter/single_node/utils.js"; +import { runNodes } from "./light-push/utils.js"; chai.use(chaiAsPromised); diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index a324b15459..e68cbe347b 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -50,7 +50,8 @@ describe("Wait for remote peer", function () { store: false, filter: false, lightpush: false, - pubsubTopic: [DefaultTestPubsubTopic] + clusterId: DefaultTestShardInfo.clusterId, + shard: DefaultTestShardInfo.shards }); const multiAddrWithId = await nwaku.getMultiaddrWithId();