From b42601d156ec07025df4a6816e794faa8d87b142 Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Wed, 13 Sep 2023 00:31:35 +0300 Subject: [PATCH] chore: new filter tests (#1552) * add bulk of tests * refactored and improved tests * add more comments * fixes after CI run * split filter tests into mulitple suites * split filter tests into mulitple suites --- .cspell.json | 9 +- package-lock.json | 36 ++ packages/tests/package.json | 1 + packages/tests/src/constants.ts | 26 ++ packages/tests/src/node/node.ts | 2 +- packages/tests/tests/filter.node.spec.ts | 213 ----------- .../tests/tests/filter/filter_test_utils.ts | 254 +++++++++++++ packages/tests/tests/filter/ping.node.spec.ts | 64 ++++ packages/tests/tests/filter/push.node.spec.ts | 282 ++++++++++++++ .../tests/tests/filter/subscribe.node.spec.ts | 345 ++++++++++++++++++ .../tests/filter/unsubscribe.node.spec.ts | 160 ++++++++ 11 files changed, 1176 insertions(+), 216 deletions(-) delete mode 100644 packages/tests/tests/filter.node.spec.ts create mode 100644 packages/tests/tests/filter/filter_test_utils.ts create mode 100644 packages/tests/tests/filter/ping.node.spec.ts create mode 100644 packages/tests/tests/filter/push.node.spec.ts create mode 100644 packages/tests/tests/filter/subscribe.node.spec.ts create mode 100644 packages/tests/tests/filter/unsubscribe.node.spec.ts diff --git a/.cspell.json b/.cspell.json index 5f5de115bf..5eb847a036 100644 --- a/.cspell.json +++ b/.cspell.json @@ -117,7 +117,9 @@ "webfonts", "websockets", "wifi", - "xsalsa20" + "xsalsa20", + "Привет", + "مرحبا" ], "flagWords": [], "ignorePaths": [ @@ -142,5 +144,8 @@ "pattern": "//dns4/.*/" } ], - "ignoreRegExpList": ["import", "multiaddr"] + "ignoreRegExpList": [ + "import", + "multiaddr" + ] } diff --git a/package-lock.json b/package-lock.json index faa74d788f..afe6c2fa7c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -27388,6 +27388,7 @@ "chai-as-promised": "^7.1.1", "debug": "^4.3.4", "dockerode": "^3.3.5", + "p-retry": "^6.0.0", "p-timeout": "^6.1.0", "portfinder": "^1.0.32", "sinon": "^15.2.0", @@ -27420,6 +27421,11 @@ "node": ">=16" } }, + "packages/tests/node_modules/@types/retry": { + "version": "0.12.2", + "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.2.tgz", + "integrity": "sha512-XISRgDJ2Tc5q4TRqvgJtzsRkFYNJzZrhTdtMoGVBttwzzQJkPnS3WWTFc7kuDRoPtPakl+T+OfdEUjYJj7Jbow==" + }, "packages/tests/node_modules/@typescript-eslint/eslint-plugin": { "version": "5.62.0", "dev": true, @@ -27627,6 +27633,21 @@ "url": "https://github.com/sponsors/sindresorhus" } }, + "packages/tests/node_modules/p-retry": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-6.0.0.tgz", + "integrity": "sha512-6NuuXu8Upembd4sNdo4PRbs+M6aHgBTrFE6lkH0YKjVzne3cDW4gkncB98ty/bkMxLxLVNeD5bX9FyWjM7WZ+A==", + "dependencies": { + "@types/retry": "0.12.2", + "retry": "^0.13.1" + }, + "engines": { + "node": ">=16.17" + }, + "funding": { + "url": "https://github.com/sponsors/sindresorhus" + } + }, "packages/utils": { "name": "@waku/utils", "version": "0.0.11", @@ -31465,6 +31486,7 @@ "libp2p": "^0.46.9", "mocha": "^10.2.0", "npm-run-all": "^4.1.5", + "p-retry": "^6.0.0", "p-timeout": "^6.1.0", "portfinder": "^1.0.32", "sinon": "^15.2.0", @@ -31472,6 +31494,11 @@ "typescript": "^5.0.4" }, "dependencies": { + "@types/retry": { + "version": "0.12.2", + "resolved": "https://registry.npmjs.org/@types/retry/-/retry-0.12.2.tgz", + "integrity": "sha512-XISRgDJ2Tc5q4TRqvgJtzsRkFYNJzZrhTdtMoGVBttwzzQJkPnS3WWTFc7kuDRoPtPakl+T+OfdEUjYJj7Jbow==" + }, "@typescript-eslint/eslint-plugin": { "version": "5.62.0", "dev": true, @@ -31570,6 +31597,15 @@ "merge2": "^1.4.1", "slash": "^3.0.0" } + }, + "p-retry": { + "version": "6.0.0", + "resolved": "https://registry.npmjs.org/p-retry/-/p-retry-6.0.0.tgz", + "integrity": "sha512-6NuuXu8Upembd4sNdo4PRbs+M6aHgBTrFE6lkH0YKjVzne3cDW4gkncB98ty/bkMxLxLVNeD5bX9FyWjM7WZ+A==", + "requires": { + "@types/retry": "0.12.2", + "retry": "^0.13.1" + } } } }, diff --git a/packages/tests/package.json b/packages/tests/package.json index 55f82b4838..269af9f644 100644 --- a/packages/tests/package.json +++ b/packages/tests/package.json @@ -60,6 +60,7 @@ "chai-as-promised": "^7.1.1", "debug": "^4.3.4", "dockerode": "^3.3.5", + "p-retry": "^6.0.0", "p-timeout": "^6.1.0", "portfinder": "^1.0.32", "sinon": "^15.2.0", diff --git a/packages/tests/src/constants.ts b/packages/tests/src/constants.ts index 82dc1e01fd..1efa573f8b 100644 --- a/packages/tests/src/constants.ts +++ b/packages/tests/src/constants.ts @@ -34,3 +34,29 @@ export const NOISE_KEY_3 = new Uint8Array( return b; })() ); + +export const TEST_STRING = [ + { description: "short", value: "hi" }, + { description: "long", value: "A".repeat(10000) }, + { description: "numeric", value: "1234567890" }, + { description: "special chars", value: "!@#$%^&*()_+" }, + { description: "Chinese", value: "你好" }, + { description: "Arabic", value: "مرحبا" }, + { description: "Russian", value: "Привет" }, + { description: "SQL Injection", value: "'; DROP TABLE users; --" }, + { description: "Script", value: '' }, + { description: "XML", value: "Some content" }, + { description: "Basic HTML tag", value: "

Heading

" }, + { description: "JSON", value: '{"user":"admin","password":"123456"}' }, + { description: "shell command", value: "`rm -rf /`" }, + { description: "escaped characters", value: "\\n\\t\\0" }, + { description: "unicode special characters", value: "\u202Ereverse" } +]; + +export const TEST_TIMESTAMPS = [ + BigInt(Date.now()) * BigInt(1000000), + Date.now(), + 1649153314, + 1949153314000, + undefined +]; diff --git a/packages/tests/src/node/node.ts b/packages/tests/src/node/node.ts index 10fdffab67..d03f7b2c1d 100644 --- a/packages/tests/src/node/node.ts +++ b/packages/tests/src/node/node.ts @@ -348,7 +348,7 @@ export class NimGoNode { return `http://127.0.0.1:${this.rpcPort}/`; } - private async rpcCall( + async rpcCall( method: string, params: Array ): Promise { diff --git a/packages/tests/tests/filter.node.spec.ts b/packages/tests/tests/filter.node.spec.ts deleted file mode 100644 index 9155bfd91a..0000000000 --- a/packages/tests/tests/filter.node.spec.ts +++ /dev/null @@ -1,213 +0,0 @@ -import { - createDecoder, - createEncoder, - DecodedMessage, - DefaultPubSubTopic, - waitForRemotePeer -} from "@waku/core"; -import type { IFilterSubscription, LightNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; -import { createLightNode } from "@waku/sdk"; -import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; -import { expect } from "chai"; -import debug from "debug"; - -import { - delay, - makeLogFileName, - NimGoNode, - NOISE_KEY_1 -} from "../src/index.js"; - -const log = debug("waku:test"); - -const TestContentTopic = "/test/1/waku-filter"; -const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); -const TestDecoder = createDecoder(TestContentTopic); - -describe("Waku Filter: V2", () => { - let waku: LightNode; - let nwaku: NimGoNode; - - let subscription: IFilterSubscription; - - afterEach(async function () { - !!nwaku && - nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e)); - !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); - }); - - beforeEach(async function () { - this.timeout(15000); - nwaku = new NimGoNode(makeLogFileName(this)); - await nwaku.start({ - filter: true, - lightpush: true, - relay: true - }); - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - subscription = await waku.filter.createSubscription(); - }); - - it("creates a subscription", async function () { - this.timeout(10000); - - let messageCount = 0; - const messageText = "Filtering works!"; - const message = { payload: utf8ToBytes(messageText) }; - - const callback = (msg: DecodedMessage): void => { - log("Got a message"); - messageCount++; - expect(msg.contentTopic).to.eq(TestContentTopic); - expect(msg.pubSubTopic).to.eq(DefaultPubSubTopic); - expect(bytesToUtf8(msg.payload)).to.eq(messageText); - }; - - await subscription.subscribe([TestDecoder], callback); - - await waku.lightPush.send(TestEncoder, message); - while (messageCount === 0) { - await delay(250); - } - expect(messageCount).to.eq(1); - }); - - it("modifies subscription", async function () { - this.timeout(10000); - - let messageCount = 0; - const messageText = "Filtering works!"; - const message = { payload: utf8ToBytes(messageText) }; - - const callback = (msg: DecodedMessage): void => { - log("Got a message"); - messageCount++; - expect(msg.contentTopic).to.eq(TestContentTopic); - expect(msg.pubSubTopic).to.eq(DefaultPubSubTopic); - expect(bytesToUtf8(msg.payload)).to.eq(messageText); - }; - - await subscription.subscribe([TestDecoder], callback); - - await delay(200); - - await waku.lightPush.send(TestEncoder, message); - while (messageCount === 0) { - await delay(250); - } - expect(messageCount).to.eq(1); - - // Modify subscription - messageCount = 0; - const newMessageText = "Filtering still works!"; - const newMessage = { payload: utf8ToBytes(newMessageText) }; - - const newContentTopic = "/test/2/waku-filter"; - const newEncoder = createEncoder({ contentTopic: newContentTopic }); - const newDecoder = createDecoder(newContentTopic); - const newCallback = (msg: DecodedMessage): void => { - log("Got a message"); - messageCount++; - expect(msg.contentTopic).to.eq(newContentTopic); - expect(msg.pubSubTopic).to.eq(DefaultPubSubTopic); - expect(bytesToUtf8(msg.payload)).to.eq(newMessageText); - }; - - await subscription.subscribe([newDecoder], newCallback); - - await waku.lightPush.send(newEncoder, newMessage); - while (messageCount === 0) { - await delay(250); - } - expect(messageCount).to.eq(1); - }); - - it("handles multiple messages", async function () { - this.timeout(10000); - - let messageCount = 0; - const callback = (msg: DecodedMessage): void => { - messageCount++; - expect(msg.contentTopic).to.eq(TestContentTopic); - }; - await subscription.subscribe(TestDecoder, callback); - - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("Filtering works!") - }); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("Filtering still works!") - }); - while (messageCount < 2) { - await delay(250); - } - expect(messageCount).to.eq(2); - }); - - it("unsubscribes", async function () { - let messageCount = 0; - const callback = (): void => { - messageCount++; - }; - await subscription.subscribe([TestDecoder], callback); - - await delay(200); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("This should be received") - }); - await delay(100); - await subscription.unsubscribe([TestContentTopic]); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("This should not be received") - }); - await delay(100); - expect(messageCount).to.eq(1); - }); - - it("ping", async function () { - let messageCount = 0; - const callback = (): void => { - messageCount++; - }; - await subscription.subscribe([TestDecoder], callback); - - await delay(200); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("This should be received") - }); - await delay(100); - await subscription.ping(); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("This should also be received") - }); - await delay(100); - expect(messageCount).to.eq(2); - }); - - it("unsubscribes all", async function () { - let messageCount = 0; - const callback = (): void => { - messageCount++; - }; - await subscription.subscribe([TestDecoder], callback); - - await delay(200); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("This should be received") - }); - await delay(100); - await subscription.unsubscribeAll(); - await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes("This should not be received") - }); - await delay(100); - expect(messageCount).to.eq(1); - }); -}); diff --git a/packages/tests/tests/filter/filter_test_utils.ts b/packages/tests/tests/filter/filter_test_utils.ts new file mode 100644 index 0000000000..7037063da9 --- /dev/null +++ b/packages/tests/tests/filter/filter_test_utils.ts @@ -0,0 +1,254 @@ +import { + createDecoder, + createEncoder, + DecodedMessage, + Decoder, + DefaultPubSubTopic, + Encoder, + waitForRemotePeer +} from "@waku/core"; +import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces"; +// import { createLightNode } from "@waku/sdk"; +import { createLightNode } from "@waku/sdk"; +import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import debug from "debug"; +import { Context } from "mocha"; +import pRetry from "p-retry"; + +import { + delay, + makeLogFileName, + NimGoNode, + NOISE_KEY_1 +} from "../../src/index.js"; + +// Constants for test configuration. +export const log = debug("waku:test:filter"); +export const TestContentTopic = "/test/1/waku-filter"; +export const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); +export const TestDecoder = createDecoder(TestContentTopic); +export const messageText = "Filtering works!"; +export const messagePayload = { payload: utf8ToBytes(messageText) }; + +/** + * Class responsible for collecting messages. + * It provides utility methods to interact with the collected messages, + * and offers a way to wait for incoming messages. + */ +export class MessageCollector { + list: Array = []; + + // Callback to handle incoming messages. + callback = (msg: DecodedMessage): void => { + log("Got a message"); + this.list.push(msg); + }; + + get count(): number { + return this.list.length; + } + + getMessage(index: number): DecodedMessage { + return this.list[index]; + } + + async waitForMessages( + numMessages: number, + timeoutDuration: number = 400 + ): Promise { + const startTime = Date.now(); + + while (this.count < numMessages) { + if (Date.now() - startTime > timeoutDuration * numMessages) { + return false; + } + await delay(10); + } + + return true; + } + + // Verifies a received message against expected values. + verifyReceivedMessage(options: { + index: number; + expectedContentTopic?: string; + expectedPubSubTopic?: string; + expectedMessageText?: string | Uint8Array; + expectedVersion?: number; + expectedMeta?: Uint8Array; + expectedEphemeral?: boolean; + checkTimestamp?: boolean; // Used to determine if we need to check the timestamp + }): void { + expect(this.list.length).to.be.greaterThan( + options.index, + `The message list does not have a message at index ${options.index}` + ); + + const message = this.getMessage(options.index); + expect(message.contentTopic).to.eq( + options.expectedContentTopic || TestContentTopic, + `Message content topic mismatch. Expected: ${ + options.expectedContentTopic || TestContentTopic + }. Got: ${message.contentTopic}` + ); + + expect(message.pubSubTopic).to.eq( + options.expectedPubSubTopic || DefaultPubSubTopic, + `Message pub/sub topic mismatch. Expected: ${ + options.expectedPubSubTopic || DefaultPubSubTopic + }. Got: ${message.pubSubTopic}` + ); + + expect(bytesToUtf8(message.payload)).to.eq( + options.expectedMessageText || messageText, + `Message text mismatch. Expected: ${ + options.expectedMessageText || messageText + }. Got: ${bytesToUtf8(message.payload)}` + ); + + expect(message.version).to.eq( + options.expectedVersion || 0, + `Message version mismatch. Expected: ${ + options.expectedVersion || 0 + }. Got: ${message.version}` + ); + + const shouldCheckTimestamp = + options.checkTimestamp !== undefined ? options.checkTimestamp : true; + if (shouldCheckTimestamp && message.timestamp) { + const now = Date.now(); + const tenSecondsAgo = now - 10000; + expect(message.timestamp.getTime()).to.be.within( + tenSecondsAgo, + now, + `Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp.getTime()}` + ); + } + + expect([ + options.expectedMeta, + undefined, + new Uint8Array(0) + ]).to.deep.include( + message.meta, + `Message meta mismatch. Expected: ${ + options.expectedMeta + ? JSON.stringify(options.expectedMeta) + : "undefined or " + JSON.stringify(new Uint8Array(0)) + }. Got: ${JSON.stringify(message.meta)}` + ); + + expect(message.ephemeral).to.eq( + options.expectedEphemeral !== undefined + ? options.expectedEphemeral + : false, + `Message ephemeral value mismatch. Expected: ${ + options.expectedEphemeral !== undefined + ? options.expectedEphemeral + : false + }. Got: ${message.ephemeral}` + ); + } +} + +// Utility to generate test data for multiple topics tests. +export function generateTestData(topicCount: number): { + contentTopics: string[]; + encoders: Encoder[]; + decoders: Decoder[]; +} { + const contentTopics = Array.from( + { length: topicCount }, + (_, i) => `/test/${i + 1}/waku-multi` + ); + const encoders = contentTopics.map((topic) => + createEncoder({ contentTopic: topic }) + ); + const decoders = contentTopics.map((topic) => createDecoder(topic)); + return { + contentTopics, + encoders, + decoders + }; +} + +// Utility to validate errors related to pings in the subscription. +export async function validatePingError( + subscription: IFilterSubscription +): Promise { + try { + await subscription.ping(); + throw new Error( + "Ping was successful but was expected to fail with a specific error." + ); + } catch (err) { + if ( + err instanceof Error && + err.message.includes("peer has no subscriptions") + ) { + return; + } else { + throw err; + } + } +} + +interface SetupReturn { + nwaku: NimGoNode; + waku: LightNode; + subscription: IFilterSubscription; + messageCollector: MessageCollector; +} + +// Setup before each test to initialize nodes and message collector. +export async function setupNodes(currentTest: Context): Promise { + const nwaku = new NimGoNode(makeLogFileName(currentTest)); + // Sometimes the node setup fails, when that happens we retry it max 3 times. + await pRetry( + async () => { + try { + await nwaku.start({ + filter: true, + lightpush: true, + relay: true + }); + } catch (error) { + log("nwaku node failed to start:", error); + throw error; + } + }, + { retries: 3 } + ); + + let waku: LightNode | undefined; + try { + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }); + await waku.start(); + } catch (error) { + log("jswaku node failed to start:", error); + } + if (waku) { + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + const subscription = await waku.filter.createSubscription(); + const messageCollector = new MessageCollector(); + + return { nwaku, waku, subscription, messageCollector }; + } else { + throw new Error("Failed to initialize waku"); + } +} + +export function tearDownNodes( + nwaku: NimGoNode, + waku: LightNode, + nwaku2?: NimGoNode +): void { + !!nwaku && nwaku.stop().catch((e) => log("Nwaku failed to stop", e)); + !!nwaku2 && nwaku2.stop().catch((e) => log("Nwaku2 failed to stop", e)); + !!waku && waku.stop().catch((e) => log("Waku failed to stop", e)); +} diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts new file mode 100644 index 0000000000..7097800f05 --- /dev/null +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -0,0 +1,64 @@ +import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { NimGoNode } from "../../src/index.js"; + +import { + MessageCollector, + setupNodes, + tearDownNodes, + TestContentTopic, + TestDecoder, + TestEncoder, + validatePingError +} from "./filter_test_utils.js"; + +describe("Waku Filter V2: Ping", function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(10000); + let waku: LightNode; + let nwaku: NimGoNode; + let subscription: IFilterSubscription; + let messageCollector: MessageCollector; + + this.afterEach(async function () { + tearDownNodes(nwaku, waku); + }); + + this.beforeEach(async function () { + this.timeout(15000); + const setup = await setupNodes(this); + nwaku = setup.nwaku; + waku = setup.waku; + subscription = setup.subscription; + messageCollector = setup.messageCollector; + }); + + it("Ping on subscribed peer", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + + // If ping is successfull(node has active subscription) we receive a success status code. + await subscription.ping(); + + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + // Confirm new messages are received after a ping. + expect(await messageCollector.waitForMessages(2)).to.eq(true); + }); + + it("Ping on peer without subscriptions", async function () { + await validatePingError(subscription); + }); + + it("Ping on unsubscribed peer", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await subscription.ping(); + await subscription.unsubscribe([TestContentTopic]); + + // Ping imediately after unsubscribe + await validatePingError(subscription); + }); +}); diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts new file mode 100644 index 0000000000..43609314eb --- /dev/null +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -0,0 +1,282 @@ +import { DefaultPubSubTopic, waitForRemotePeer } from "@waku/core"; +import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + delay, + NimGoNode, + TEST_STRING, + TEST_TIMESTAMPS +} from "../../src/index.js"; + +import { + MessageCollector, + messageText, + setupNodes, + tearDownNodes, + TestContentTopic, + TestDecoder, + TestEncoder +} from "./filter_test_utils.js"; + +describe("Waku Filter V2: FilterPush", function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(10000); + let waku: LightNode; + let nwaku: NimGoNode; + let subscription: IFilterSubscription; + let messageCollector: MessageCollector; + + this.afterEach(async function () { + tearDownNodes(nwaku, waku); + }); + + this.beforeEach(async function () { + this.timeout(15000); + const setup = await setupNodes(this); + nwaku = setup.nwaku; + waku = setup.waku; + subscription = setup.subscription; + messageCollector = setup.messageCollector; + }); + + TEST_STRING.forEach((testItem) => { + it(`Check received message containing ${testItem.description}`, async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(testItem.value) + }); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage({ + index: 0, + expectedMessageText: testItem.value + }); + }); + }); + + TEST_TIMESTAMPS.forEach((testItem) => { + it(`Check received message with timestamp: ${testItem} `, async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await delay(400); + + await nwaku.rpcCall("post_waku_v2_relay_v1_message", [ + DefaultPubSubTopic, + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: testItem + } + ]); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage({ + index: 0, + checkTimestamp: false + }); + + // Check if the timestamp matches + const timestamp = messageCollector.getMessage(0).timestamp; + if (testItem == undefined) { + expect(timestamp).to.eq(undefined); + } + if (timestamp !== undefined) { + expect(testItem?.toString()).to.contain(timestamp.getTime().toString()); + } + }); + }); + + it("Check received message with invalid timestamp is not received", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await delay(400); + + await nwaku.rpcCall("post_waku_v2_relay_v1_message", [ + DefaultPubSubTopic, + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: "2023-09-06T12:05:38.609Z" + } + ]); + + // Verify that no message was received + expect(await messageCollector.waitForMessages(1)).to.eq(false); + }); + + it("Check received message on other pubsub topic is not received", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await delay(400); + + await nwaku.rpcCall("post_waku_v2_relay_v1_message", [ + "DefaultPubSubTopic", + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000) + } + ]); + + expect(await messageCollector.waitForMessages(1)).to.eq(false); + }); + + it("Check received message with no pubsub topic is not received", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await delay(400); + + await nwaku.rpcCall("post_waku_v2_relay_v1_message", [ + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000) + } + ]); + + expect(await messageCollector.waitForMessages(1)).to.eq(false); + }); + + it("Check received message with no content topic is not received", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await delay(400); + + await nwaku.rpcCall("post_waku_v2_relay_v1_message", [ + DefaultPubSubTopic, + { + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000) + } + ]); + + expect(await messageCollector.waitForMessages(1)).to.eq(false); + }); + + it("Check received message with no payload is not received", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await delay(400); + + await nwaku.rpcCall("post_waku_v2_relay_v1_message", [ + DefaultPubSubTopic, + { + contentTopic: TestContentTopic, + timestamp: BigInt(Date.now()) * BigInt(1000000) + } + ]); + + // For go-waku the message is received (it is possible to send a message with no payload) + if (nwaku.type() == "go-waku") { + expect(await messageCollector.waitForMessages(1)).to.eq(true); + } else { + expect(await messageCollector.waitForMessages(1)).to.eq(false); + } + }); + + it("Check received message with non string payload is not received", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await delay(400); + + await nwaku.rpcCall("post_waku_v2_relay_v1_message", [ + DefaultPubSubTopic, + { + contentTopic: TestContentTopic, + payload: 12345, + timestamp: BigInt(Date.now()) * BigInt(1000000) + } + ]); + + expect(await messageCollector.waitForMessages(1)).to.eq(false); + }); + + it("Check received message with extra parameter is not received", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await delay(400); + + await nwaku.rpcCall("post_waku_v2_relay_v1_message", [ + DefaultPubSubTopic, + "extraField", + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000) + } + ]); + + expect(await messageCollector.waitForMessages(1)).to.eq(false); + }); + + it("Check received message with extra option is received", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await delay(400); + + await nwaku.rpcCall("post_waku_v2_relay_v1_message", [ + DefaultPubSubTopic, + { + contentTopic: TestContentTopic, + payload: Buffer.from(utf8ToBytes(messageText)).toString("base64"), + timestamp: BigInt(Date.now()) * BigInt(1000000), + extraOption: "extraOption" + } + ]); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage({ index: 0 }); + }); + + // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done + it.skip("Check received message received after jswaku node is restarted", async function () { + // Subscribe and send message + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + + // Restart js-waku node + await waku.stop(); + expect(waku.isStarted()).to.eq(false); + await waku.start(); + expect(waku.isStarted()).to.eq(true); + + // Redo the connection and create a new subscription + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + subscription = await waku.filter.createSubscription(); + await subscription.subscribe([TestDecoder], messageCollector.callback); + + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + // Confirm both messages were received. + expect(await messageCollector.waitForMessages(2)).to.eq(true); + messageCollector.verifyReceivedMessage({ + index: 0, + expectedMessageText: "M1" + }); + messageCollector.verifyReceivedMessage({ + index: 1, + expectedMessageText: "M2" + }); + }); + + // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done + it.skip("Check received message received after nwaku node is restarted", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + + // Restart nwaku node + await nwaku.stop(); + await nwaku.start(); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + // Confirm both messages were received. + expect(await messageCollector.waitForMessages(2)).to.eq(true); + messageCollector.verifyReceivedMessage({ + index: 0, + expectedMessageText: "M1" + }); + messageCollector.verifyReceivedMessage({ + index: 1, + expectedMessageText: "M2" + }); + }); +}); diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts new file mode 100644 index 0000000000..d8d57595b3 --- /dev/null +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -0,0 +1,345 @@ +import { + createDecoder, + createEncoder, + DefaultPubSubTopic, + waitForRemotePeer +} from "@waku/core"; +import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + delay, + makeLogFileName, + NimGoNode, + TEST_STRING +} from "../../src/index.js"; + +import { + generateTestData, + MessageCollector, + messagePayload, + messageText, + setupNodes, + tearDownNodes, + TestContentTopic, + TestDecoder, + TestEncoder +} from "./filter_test_utils.js"; + +describe("Waku Filter V2: Subscribe", function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(10000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + let subscription: IFilterSubscription; + let messageCollector: MessageCollector; + + this.afterEach(async function () { + tearDownNodes(nwaku, waku, nwaku2); + }); + + this.beforeEach(async function () { + this.timeout(15000); + const setup = await setupNodes(this); + nwaku = setup.nwaku; + waku = setup.waku; + subscription = setup.subscription; + messageCollector = setup.messageCollector; + }); + + it("Subscribe and receive messages via lightPush", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + + await waku.lightPush.send(TestEncoder, messagePayload); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage({ index: 0 }); + expect((await nwaku.messages()).length).to.eq(1); + }); + + it("Subscribe and receive messages via waku relay post", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + + await delay(400); + + // Send a test message using the relay post method. + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + contentTopic: TestContentTopic, + payload: utf8ToBytes(messageText) + }) + ); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage({ index: 0 }); + expect((await nwaku.messages()).length).to.eq(1); + }); + + it("Subscribe and receive 2 messages on the same topic", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + + await waku.lightPush.send(TestEncoder, messagePayload); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage({ index: 0 }); + + // Send another message on the same topic. + const newMessageText = "Filtering still works!"; + await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(newMessageText) + }); + + // Verify that the second message was successfully received. + expect(await messageCollector.waitForMessages(2)).to.eq(true); + messageCollector.verifyReceivedMessage({ + expectedMessageText: newMessageText, + index: 1 + }); + expect((await nwaku.messages()).length).to.eq(2); + }); + + it("Subscribe and receive messages on 2 different content topics", async function () { + // Subscribe to the first content topic and send a message. + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, messagePayload); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage({ index: 0 }); + + // Modify subscription to include a new content topic and send a message. + const newMessageText = "Filtering still works!"; + const newMessagePayload = { payload: utf8ToBytes(newMessageText) }; + const newContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + await subscription.subscribe([newDecoder], messageCollector.callback); + await waku.lightPush.send(newEncoder, { + payload: utf8ToBytes(newMessageText) + }); + expect(await messageCollector.waitForMessages(2)).to.eq(true); + messageCollector.verifyReceivedMessage({ + expectedContentTopic: newContentTopic, + expectedMessageText: newMessageText, + index: 1 + }); + + // Send another message on the initial content topic to verify it still works. + await waku.lightPush.send(TestEncoder, newMessagePayload); + expect(await messageCollector.waitForMessages(3)).to.eq(true); + messageCollector.verifyReceivedMessage({ + expectedMessageText: newMessageText, + index: 2 + }); + expect((await nwaku.messages()).length).to.eq(3); + }); + + it("Subscribe and receives messages on 20 topics", async function () { + const topicCount = 20; + const td = generateTestData(topicCount); + + // Subscribe to all 20 topics. + for (let i = 0; i < topicCount; i++) { + await subscription.subscribe([td.decoders[i]], messageCollector.callback); + } + + // Send a unique message on each topic. + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`Message for Topic ${i + 1}`) + }); + } + + // Verify that each message was received on the corresponding topic. + expect(await messageCollector.waitForMessages(20)).to.eq(true); + td.contentTopics.forEach((topic, index) => { + messageCollector.verifyReceivedMessage({ + expectedContentTopic: topic, + expectedMessageText: `Message for Topic ${index + 1}`, + index: index + }); + }); + }); + + it("Subscribe to 30 topics at once and receives messages", async function () { + const topicCount = 30; + const td = generateTestData(topicCount); + + // Subscribe to all 30 topics. + await subscription.subscribe(td.decoders, messageCollector.callback); + + // Send a unique message on each topic. + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`Message for Topic ${i + 1}`) + }); + } + + // Verify that each message was received on the corresponding topic. + expect(await messageCollector.waitForMessages(30)).to.eq(true); + td.contentTopics.forEach((topic, index) => { + messageCollector.verifyReceivedMessage({ + expectedContentTopic: topic, + expectedMessageText: `Message for Topic ${index + 1}`, + index: index + }); + }); + }); + + it("Error when try to subscribe to more than 30 topics", async function () { + const topicCount = 31; + const td = generateTestData(topicCount); + + // Attempt to subscribe to 31 topics + try { + await subscription.subscribe(td.decoders, messageCollector.callback); + throw new Error( + "Subscribe to 31 topics was successful but was expected to fail with a specific error." + ); + } catch (err) { + if ( + err instanceof Error && + err.message.includes("exceeds maximum content topics: 30") + ) { + return; + } else { + throw err; + } + } + }); + + it("Overlapping topic subscription", async function () { + // Define two sets of test data with overlapping topics. + const topicCount1 = 2; + const td1 = generateTestData(topicCount1); + const topicCount2 = 4; + const td2 = generateTestData(topicCount2); + + // Subscribe to the first set of topics. + await subscription.subscribe(td1.decoders, messageCollector.callback); + + // Subscribe to the second set of topics which has overlapping topics with the first set. + await subscription.subscribe(td2.decoders, messageCollector.callback); + + // Send messages to the first set of topics. + for (let i = 0; i < topicCount1; i++) { + const messageText = `Message for Topic ${i + 1}`; + await waku.lightPush.send(td1.encoders[i], { + payload: utf8ToBytes(messageText) + }); + } + + // Send messages to the second set of topics. + for (let i = 0; i < topicCount2; i++) { + const messageText = `Message for Topic ${i + 1}`; + await waku.lightPush.send(td2.encoders[i], { + payload: utf8ToBytes(messageText) + }); + } + + // Check if all messages were received. + // Since there are overlapping topics, there should be 6 messages in total (2 from the first set + 4 from the second set). + expect(await messageCollector.waitForMessages(6)).to.eq(true); + }); + + it("Refresh subscription", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + + // Resubscribe (refresh) to the same topic and send another message. + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + // Confirm both messages were received. + expect(await messageCollector.waitForMessages(2)).to.eq(true); + messageCollector.verifyReceivedMessage({ + index: 0, + expectedMessageText: "M1" + }); + messageCollector.verifyReceivedMessage({ + index: 1, + expectedMessageText: "M2" + }); + }); + + TEST_STRING.forEach((testItem) => { + it(`Subscribe to topic containing ${testItem.description} and receive message`, async function () { + const newContentTopic = testItem.value; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + + await subscription.subscribe([newDecoder], messageCollector.callback); + await waku.lightPush.send(newEncoder, messagePayload); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage({ + index: 0, + expectedContentTopic: newContentTopic + }); + }); + }); + + it("Add multiple subscription objects on single nwaku node", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + + // Create a second subscription on a different topic + const subscription2 = await waku.filter.createSubscription(); + const newContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + await subscription2.subscribe([newDecoder], messageCollector.callback); + + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + + // Check if both messages were received + expect(await messageCollector.waitForMessages(2)).to.eq(true); + messageCollector.verifyReceivedMessage({ + index: 0, + expectedMessageText: "M1" + }); + messageCollector.verifyReceivedMessage({ + index: 1, + expectedContentTopic: newContentTopic, + expectedMessageText: "M2" + }); + }); + + // this test fail 50% of times with messageCount being 1. Seems like a message is lost somehow + it.skip("Subscribe and receive messages from multiple nwaku nodes", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + + // Set up and start a new nwaku node + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ filter: true, lightpush: true, relay: true }); + + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + const subscription2 = await waku.filter.createSubscription( + DefaultPubSubTopic, + await nwaku2.getPeerId() + ); + + // Send a message using the new subscription + const newContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + await subscription2.subscribe([newDecoder], messageCollector.callback); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + + // Check if both messages were received + expect(await messageCollector.waitForMessages(2)).to.eq(true); + messageCollector.verifyReceivedMessage({ + index: 0, + expectedMessageText: "M1" + }); + messageCollector.verifyReceivedMessage({ + index: 1, + expectedContentTopic: newContentTopic, + expectedMessageText: "M2" + }); + }); +}); diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts new file mode 100644 index 0000000000..6e2b7355b5 --- /dev/null +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -0,0 +1,160 @@ +import { createDecoder, createEncoder } from "@waku/core"; +import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { NimGoNode } from "../../src/index.js"; + +import { + generateTestData, + MessageCollector, + messagePayload, + setupNodes, + tearDownNodes, + TestContentTopic, + TestDecoder, + TestEncoder +} from "./filter_test_utils.js"; + +describe("Waku Filter V2: Unsubscribe", function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(10000); + let waku: LightNode; + let nwaku: NimGoNode; + let subscription: IFilterSubscription; + let messageCollector: MessageCollector; + + this.afterEach(async function () { + tearDownNodes(nwaku, waku); + }); + + this.beforeEach(async function () { + this.timeout(15000); + const setup = await setupNodes(this); + nwaku = setup.nwaku; + waku = setup.waku; + subscription = setup.subscription; + messageCollector = setup.messageCollector; + }); + + it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, messagePayload); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + + // Unsubscribe from the topic and send again + await subscription.unsubscribe([TestContentTopic]); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await messageCollector.waitForMessages(2)).to.eq(false); + + // Check that from 2 messages send only the 1st was received + messageCollector.verifyReceivedMessage({ index: 0 }); + expect(messageCollector.count).to.eq(1); + expect((await nwaku.messages()).length).to.eq(2); + }); + + it("Unsubscribe 1 topic - node subscribed to 2 topics", async function () { + // Subscribe to 2 topics and send messages + await subscription.subscribe([TestDecoder], messageCollector.callback); + const newContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + await subscription.subscribe([newDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + expect(await messageCollector.waitForMessages(2)).to.eq(true); + + // Unsubscribe from the first topic and send again + await subscription.unsubscribe([TestContentTopic]); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") }); + expect(await messageCollector.waitForMessages(3)).to.eq(true); + + // Check that from 4 messages send 3 were received + expect(messageCollector.count).to.eq(3); + expect((await nwaku.messages()).length).to.eq(4); + }); + + it("Unsubscribe 2 topics - node subscribed to 2 topics", async function () { + // Subscribe to 2 topics and send messages + await subscription.subscribe([TestDecoder], messageCollector.callback); + const newContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + await subscription.subscribe([newDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + expect(await messageCollector.waitForMessages(2)).to.eq(true); + + // Unsubscribe from both and send again + await subscription.unsubscribe([TestContentTopic, newContentTopic]); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M3") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M4") }); + expect(await messageCollector.waitForMessages(3)).to.eq(false); + + // Check that from 4 messages send 2 were received + expect(messageCollector.count).to.eq(2); + expect((await nwaku.messages()).length).to.eq(4); + }); + + it("Unsubscribe topics the node is not subscribed to", async function () { + // Subscribe to 1 topic and send message + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + + expect(messageCollector.count).to.eq(1); + + // Unsubscribe from topics that the node is not not subscribed to and send again + await subscription.unsubscribe([]); + await subscription.unsubscribe(["/test/2/waku-filter"]); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + expect(await messageCollector.waitForMessages(2)).to.eq(true); + + // Check that both messages were received + expect(messageCollector.count).to.eq(2); + expect((await nwaku.messages()).length).to.eq(2); + }); + + it("Unsubscribes all - node subscribed to 1 topic", async function () { + await subscription.subscribe([TestDecoder], messageCollector.callback); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + expect(messageCollector.count).to.eq(1); + + // Unsubscribe from all topics and send again + await subscription.unsubscribeAll(); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + expect(await messageCollector.waitForMessages(2)).to.eq(false); + + // Check that from 2 messages send only the 1st was received + expect(messageCollector.count).to.eq(1); + expect((await nwaku.messages()).length).to.eq(2); + }); + + it("Unsubscribes all - node subscribed to 10 topics", async function () { + // Subscribe to 10 topics and send message + const topicCount = 10; + const td = generateTestData(topicCount); + await subscription.subscribe(td.decoders, messageCollector.callback); + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`M${i + 1}`) + }); + } + expect(await messageCollector.waitForMessages(10)).to.eq(true); + + // Unsubscribe from all topics and send again + await subscription.unsubscribeAll(); + for (let i = 0; i < topicCount; i++) { + await waku.lightPush.send(td.encoders[i], { + payload: utf8ToBytes(`M${topicCount + i + 1}`) + }); + } + expect(await messageCollector.waitForMessages(11)).to.eq(false); + + // Check that from 20 messages send only 10 were received + expect(messageCollector.count).to.eq(10); + expect((await nwaku.messages()).length).to.eq(20); + }); +});