diff --git a/packages/tests/src/index.ts b/packages/tests/src/index.ts index 977389d4a9..746345a2b0 100644 --- a/packages/tests/src/index.ts +++ b/packages/tests/src/index.ts @@ -10,3 +10,5 @@ export * from "./constants.js"; export * from "./delay.js"; export * from "./log_file.js"; export * from "./node/node.js"; +export * from "./teardown.js"; +export * from "./message_collector.js"; diff --git a/packages/tests/src/message_collector.ts b/packages/tests/src/message_collector.ts new file mode 100644 index 0000000000..d37bb86e85 --- /dev/null +++ b/packages/tests/src/message_collector.ts @@ -0,0 +1,208 @@ +import { DecodedMessage, DefaultPubSubTopic } from "@waku/core"; +import { bytesToUtf8 } from "@waku/utils/bytes"; +import { AssertionError, expect } from "chai"; +import debug from "debug"; + +import { MessageRpcResponse } from "./node/interfaces.js"; + +import { base64ToUtf8, delay, NimGoNode } from "./index.js"; + +const log = debug("waku:test"); + +/** + * Class responsible for collecting messages. + * It provides utility methods to interact with the collected messages, + * and offers a way to wait for incoming messages. + */ +export class MessageCollector { + list: Array = []; + callback: (msg: DecodedMessage) => void = () => {}; + + constructor( + private contentTopic: string, + private nwaku?: NimGoNode, + private pubSubTopic = DefaultPubSubTopic + ) { + if (!this.nwaku) { + this.callback = (msg: DecodedMessage): void => { + log("Got a message"); + this.list.push(msg); + }; + } + } + + get count(): number { + return this.list.length; + } + + getMessage(index: number): MessageRpcResponse | DecodedMessage { + return this.list[index]; + } + + // Type guard to determine if a message is of type MessageRpcResponse + isMessageRpcResponse( + message: MessageRpcResponse | DecodedMessage + ): message is MessageRpcResponse { + return ( + ("payload" in message && typeof message.payload === "string") || + !!this.nwaku + ); + } + + async waitForMessages( + numMessages: number, + timeoutDuration: number = 400 + ): Promise { + const startTime = Date.now(); + + while (this.count < numMessages) { + if (this.nwaku) { + try { + this.list = await this.nwaku.messages(this.pubSubTopic); + } catch (error) { + log(`Can't retrieve messages because of ${error}`); + await delay(10); + } + } + + if (Date.now() - startTime > timeoutDuration * numMessages) { + return false; + } + + await delay(10); + } + + return true; + } + + // Verifies a received message against expected values. + verifyReceivedMessage( + index: number, + options: { + expectedMessageText: string | Uint8Array | undefined; + expectedContentTopic?: string; + expectedPubSubTopic?: string; + expectedVersion?: number; + expectedMeta?: Uint8Array; + expectedEphemeral?: boolean; + expectedTimestamp?: bigint; + checkTimestamp?: boolean; // Used to determine if we need to check the timestamp + } + ): void { + expect(this.list.length).to.be.greaterThan( + index, + `The message list does not have a message at index ${index}` + ); + + const message = this.getMessage(index); + expect(message.contentTopic).to.eq( + options.expectedContentTopic || this.contentTopic, + `Message content topic mismatch. Expected: ${ + options.expectedContentTopic || this.contentTopic + }. Got: ${message.contentTopic}` + ); + + expect(message.version).to.eq( + options.expectedVersion || 0, + `Message version mismatch. Expected: ${ + options.expectedVersion || 0 + }. Got: ${message.version}` + ); + + if (message.ephemeral !== undefined) { + expect(message.ephemeral).to.eq( + options.expectedEphemeral !== undefined + ? options.expectedEphemeral + : false, + `Message ephemeral value mismatch. Expected: ${ + options.expectedEphemeral !== undefined + ? options.expectedEphemeral + : false + }. Got: ${message.ephemeral}` + ); + } + + if (this.isMessageRpcResponse(message)) { + // nwaku message specific assertions + const receivedMessageText = message.payload + ? base64ToUtf8(message.payload) + : undefined; + + expect(receivedMessageText).to.eq( + options.expectedMessageText, + `Message text mismatch. Expected: ${options.expectedMessageText}. Got: ${receivedMessageText}` + ); + + if (message.timestamp) { + // In we send timestamp in the request we assert that it matches the timestamp in the response +- 1 sec + // We take the 1s deviation because there are some ms diffs in timestamps, probably because of conversions + if (options.expectedTimestamp !== undefined) { + const lowerBound = + BigInt(options.expectedTimestamp) - BigInt(1000000000); + const upperBound = + BigInt(options.expectedTimestamp) + BigInt(1000000000); + + if ( + message.timestamp < lowerBound || + message.timestamp > upperBound + ) { + throw new AssertionError( + `Message timestamp not within the expected range. Expected between: ${lowerBound} and ${upperBound}. Got: ${message.timestamp}` + ); + } + } + // In we don't send timestamp in the request we assert that the timestamp in the response is between now and (now-10s) + else { + const now = BigInt(Date.now()) * BigInt(1_000_000); + const tenSecondsAgo = now - BigInt(10_000_000_000); + + if (message.timestamp < tenSecondsAgo || message.timestamp > now) { + throw new AssertionError( + `Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp}` + ); + } + } + } + } else { + // js-waku message specific assertions + expect(message.pubSubTopic).to.eq( + options.expectedPubSubTopic || DefaultPubSubTopic, + `Message pub/sub topic mismatch. Expected: ${ + options.expectedPubSubTopic || DefaultPubSubTopic + }. Got: ${message.pubSubTopic}` + ); + + expect(bytesToUtf8(message.payload)).to.eq( + options.expectedMessageText, + `Message text mismatch. Expected: ${ + options.expectedMessageText + }. Got: ${bytesToUtf8(message.payload)}` + ); + + const shouldCheckTimestamp = + options.checkTimestamp !== undefined ? options.checkTimestamp : true; + if (shouldCheckTimestamp && message.timestamp) { + const now = Date.now(); + const tenSecondsAgo = now - 10000; + expect(message.timestamp.getTime()).to.be.within( + tenSecondsAgo, + now, + `Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp.getTime()}` + ); + } + + expect([ + options.expectedMeta, + undefined, + new Uint8Array(0) + ]).to.deep.include( + message.meta, + `Message meta mismatch. Expected: ${ + options.expectedMeta + ? JSON.stringify(options.expectedMeta) + : "undefined or " + JSON.stringify(new Uint8Array(0)) + }. Got: ${JSON.stringify(message.meta)}` + ); + } + } +} diff --git a/packages/tests/src/node/interfaces.ts b/packages/tests/src/node/interfaces.ts index 742a8e85c0..66ba21c71e 100644 --- a/packages/tests/src/node/interfaces.ts +++ b/packages/tests/src/node/interfaces.ts @@ -52,4 +52,5 @@ export interface MessageRpcResponse { contentTopic?: string; version?: number; timestamp?: bigint; // Unix epoch time in nanoseconds as a 64-bits integer value. + ephemeral?: boolean; } diff --git a/packages/tests/src/node/node.ts b/packages/tests/src/node/node.ts index d03f7b2c1d..27cc44b6ea 100644 --- a/packages/tests/src/node/node.ts +++ b/packages/tests/src/node/node.ts @@ -5,6 +5,7 @@ import { DefaultPubSubTopic } from "@waku/core"; import { isDefined } from "@waku/utils"; import { bytesToHex, hexToBytes } from "@waku/utils/bytes"; import debug from "debug"; +import pRetry from "p-retry"; import portfinder from "portfinder"; import { existsAsync, mkdirAsync, openAsync } from "../async_fs.js"; @@ -164,6 +165,25 @@ export class NimGoNode { } } + async startWithRetries( + args: Args, + options: { + retries: number; + } + ): Promise { + await pRetry( + async () => { + try { + await this.start(args); + } catch (error) { + log("Nwaku node failed to start:", error); + throw error; + } + }, + { retries: options.retries } + ); + } + public async stop(): Promise { await this.docker?.container?.stop(); delete this.docker; diff --git a/packages/tests/src/teardown.ts b/packages/tests/src/teardown.ts new file mode 100644 index 0000000000..caecec6e44 --- /dev/null +++ b/packages/tests/src/teardown.ts @@ -0,0 +1,23 @@ +import { LightNode } from "@waku/interfaces"; +import debug from "debug"; + +import { NimGoNode } from "./index.js"; + +const log = debug("waku:test"); + +export function tearDownNodes( + nwakuNodes: NimGoNode[], + wakuNodes: LightNode[] +): void { + nwakuNodes.forEach((nwaku) => { + if (nwaku) { + nwaku.stop().catch((e) => log("Nwaku failed to stop", e)); + } + }); + + wakuNodes.forEach((waku) => { + if (waku) { + waku.stop().catch((e) => log("Waku failed to stop", e)); + } + }); +} diff --git a/packages/tests/tests/filter/filter_test_utils.ts b/packages/tests/tests/filter/filter_test_utils.ts deleted file mode 100644 index 7037063da9..0000000000 --- a/packages/tests/tests/filter/filter_test_utils.ts +++ /dev/null @@ -1,254 +0,0 @@ -import { - createDecoder, - createEncoder, - DecodedMessage, - Decoder, - DefaultPubSubTopic, - Encoder, - waitForRemotePeer -} from "@waku/core"; -import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces"; -// import { createLightNode } from "@waku/sdk"; -import { createLightNode } from "@waku/sdk"; -import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; -import { expect } from "chai"; -import debug from "debug"; -import { Context } from "mocha"; -import pRetry from "p-retry"; - -import { - delay, - makeLogFileName, - NimGoNode, - NOISE_KEY_1 -} from "../../src/index.js"; - -// Constants for test configuration. -export const log = debug("waku:test:filter"); -export const TestContentTopic = "/test/1/waku-filter"; -export const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); -export const TestDecoder = createDecoder(TestContentTopic); -export const messageText = "Filtering works!"; -export const messagePayload = { payload: utf8ToBytes(messageText) }; - -/** - * Class responsible for collecting messages. - * It provides utility methods to interact with the collected messages, - * and offers a way to wait for incoming messages. - */ -export class MessageCollector { - list: Array = []; - - // Callback to handle incoming messages. - callback = (msg: DecodedMessage): void => { - log("Got a message"); - this.list.push(msg); - }; - - get count(): number { - return this.list.length; - } - - getMessage(index: number): DecodedMessage { - return this.list[index]; - } - - async waitForMessages( - numMessages: number, - timeoutDuration: number = 400 - ): Promise { - const startTime = Date.now(); - - while (this.count < numMessages) { - if (Date.now() - startTime > timeoutDuration * numMessages) { - return false; - } - await delay(10); - } - - return true; - } - - // Verifies a received message against expected values. - verifyReceivedMessage(options: { - index: number; - expectedContentTopic?: string; - expectedPubSubTopic?: string; - expectedMessageText?: string | Uint8Array; - expectedVersion?: number; - expectedMeta?: Uint8Array; - expectedEphemeral?: boolean; - checkTimestamp?: boolean; // Used to determine if we need to check the timestamp - }): void { - expect(this.list.length).to.be.greaterThan( - options.index, - `The message list does not have a message at index ${options.index}` - ); - - const message = this.getMessage(options.index); - expect(message.contentTopic).to.eq( - options.expectedContentTopic || TestContentTopic, - `Message content topic mismatch. Expected: ${ - options.expectedContentTopic || TestContentTopic - }. Got: ${message.contentTopic}` - ); - - expect(message.pubSubTopic).to.eq( - options.expectedPubSubTopic || DefaultPubSubTopic, - `Message pub/sub topic mismatch. Expected: ${ - options.expectedPubSubTopic || DefaultPubSubTopic - }. Got: ${message.pubSubTopic}` - ); - - expect(bytesToUtf8(message.payload)).to.eq( - options.expectedMessageText || messageText, - `Message text mismatch. Expected: ${ - options.expectedMessageText || messageText - }. Got: ${bytesToUtf8(message.payload)}` - ); - - expect(message.version).to.eq( - options.expectedVersion || 0, - `Message version mismatch. Expected: ${ - options.expectedVersion || 0 - }. Got: ${message.version}` - ); - - const shouldCheckTimestamp = - options.checkTimestamp !== undefined ? options.checkTimestamp : true; - if (shouldCheckTimestamp && message.timestamp) { - const now = Date.now(); - const tenSecondsAgo = now - 10000; - expect(message.timestamp.getTime()).to.be.within( - tenSecondsAgo, - now, - `Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp.getTime()}` - ); - } - - expect([ - options.expectedMeta, - undefined, - new Uint8Array(0) - ]).to.deep.include( - message.meta, - `Message meta mismatch. Expected: ${ - options.expectedMeta - ? JSON.stringify(options.expectedMeta) - : "undefined or " + JSON.stringify(new Uint8Array(0)) - }. Got: ${JSON.stringify(message.meta)}` - ); - - expect(message.ephemeral).to.eq( - options.expectedEphemeral !== undefined - ? options.expectedEphemeral - : false, - `Message ephemeral value mismatch. Expected: ${ - options.expectedEphemeral !== undefined - ? options.expectedEphemeral - : false - }. Got: ${message.ephemeral}` - ); - } -} - -// Utility to generate test data for multiple topics tests. -export function generateTestData(topicCount: number): { - contentTopics: string[]; - encoders: Encoder[]; - decoders: Decoder[]; -} { - const contentTopics = Array.from( - { length: topicCount }, - (_, i) => `/test/${i + 1}/waku-multi` - ); - const encoders = contentTopics.map((topic) => - createEncoder({ contentTopic: topic }) - ); - const decoders = contentTopics.map((topic) => createDecoder(topic)); - return { - contentTopics, - encoders, - decoders - }; -} - -// Utility to validate errors related to pings in the subscription. -export async function validatePingError( - subscription: IFilterSubscription -): Promise { - try { - await subscription.ping(); - throw new Error( - "Ping was successful but was expected to fail with a specific error." - ); - } catch (err) { - if ( - err instanceof Error && - err.message.includes("peer has no subscriptions") - ) { - return; - } else { - throw err; - } - } -} - -interface SetupReturn { - nwaku: NimGoNode; - waku: LightNode; - subscription: IFilterSubscription; - messageCollector: MessageCollector; -} - -// Setup before each test to initialize nodes and message collector. -export async function setupNodes(currentTest: Context): Promise { - const nwaku = new NimGoNode(makeLogFileName(currentTest)); - // Sometimes the node setup fails, when that happens we retry it max 3 times. - await pRetry( - async () => { - try { - await nwaku.start({ - filter: true, - lightpush: true, - relay: true - }); - } catch (error) { - log("nwaku node failed to start:", error); - throw error; - } - }, - { retries: 3 } - ); - - let waku: LightNode | undefined; - try { - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } - }); - await waku.start(); - } catch (error) { - log("jswaku node failed to start:", error); - } - if (waku) { - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - const subscription = await waku.filter.createSubscription(); - const messageCollector = new MessageCollector(); - - return { nwaku, waku, subscription, messageCollector }; - } else { - throw new Error("Failed to initialize waku"); - } -} - -export function tearDownNodes( - nwaku: NimGoNode, - waku: LightNode, - nwaku2?: NimGoNode -): void { - !!nwaku && nwaku.stop().catch((e) => log("Nwaku failed to stop", e)); - !!nwaku2 && nwaku2.stop().catch((e) => log("Nwaku2 failed to stop", e)); - !!waku && waku.stop().catch((e) => log("Waku failed to stop", e)); -} diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index 7097800f05..b35da7281b 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -2,17 +2,15 @@ import type { IFilterSubscription, LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; -import { NimGoNode } from "../../src/index.js"; +import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js"; import { - MessageCollector, - setupNodes, - tearDownNodes, + runNodes, TestContentTopic, TestDecoder, TestEncoder, validatePingError -} from "./filter_test_utils.js"; +} from "./utils.js"; describe("Waku Filter V2: Ping", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level @@ -22,17 +20,15 @@ describe("Waku Filter V2: Ping", function () { let subscription: IFilterSubscription; let messageCollector: MessageCollector; - this.afterEach(async function () { - tearDownNodes(nwaku, waku); - }); - this.beforeEach(async function () { this.timeout(15000); - const setup = await setupNodes(this); - nwaku = setup.nwaku; - waku = setup.waku; - subscription = setup.subscription; - messageCollector = setup.messageCollector; + [nwaku, waku] = await runNodes(this); + subscription = await waku.filter.createSubscription(); + messageCollector = new MessageCollector(TestContentTopic); + }); + + this.afterEach(async function () { + tearDownNodes([nwaku], [waku]); }); it("Ping on subscribed peer", async function () { diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index 43609314eb..b695193d34 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -6,20 +6,20 @@ import { expect } from "chai"; import { delay, + MessageCollector, NimGoNode, + tearDownNodes, TEST_STRING, TEST_TIMESTAMPS } from "../../src/index.js"; import { - MessageCollector, messageText, - setupNodes, - tearDownNodes, + runNodes, TestContentTopic, TestDecoder, TestEncoder -} from "./filter_test_utils.js"; +} from "./utils.js"; describe("Waku Filter V2: FilterPush", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level @@ -29,17 +29,15 @@ describe("Waku Filter V2: FilterPush", function () { let subscription: IFilterSubscription; let messageCollector: MessageCollector; - this.afterEach(async function () { - tearDownNodes(nwaku, waku); - }); - this.beforeEach(async function () { this.timeout(15000); - const setup = await setupNodes(this); - nwaku = setup.nwaku; - waku = setup.waku; - subscription = setup.subscription; - messageCollector = setup.messageCollector; + [nwaku, waku] = await runNodes(this); + subscription = await waku.filter.createSubscription(); + messageCollector = new MessageCollector(TestContentTopic); + }); + + this.afterEach(async function () { + tearDownNodes([nwaku], [waku]); }); TEST_STRING.forEach((testItem) => { @@ -50,8 +48,7 @@ describe("Waku Filter V2: FilterPush", function () { }); expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage({ - index: 0, + messageCollector.verifyReceivedMessage(0, { expectedMessageText: testItem.value }); }); @@ -72,8 +69,8 @@ describe("Waku Filter V2: FilterPush", function () { ]); expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage({ - index: 0, + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, checkTimestamp: false }); @@ -82,7 +79,7 @@ describe("Waku Filter V2: FilterPush", function () { if (testItem == undefined) { expect(timestamp).to.eq(undefined); } - if (timestamp !== undefined) { + if (timestamp !== undefined && timestamp instanceof Date) { expect(testItem?.toString()).to.contain(timestamp.getTime().toString()); } }); @@ -219,7 +216,9 @@ describe("Waku Filter V2: FilterPush", function () { ]); expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage({ index: 0 }); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText + }); }); // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done @@ -245,12 +244,10 @@ describe("Waku Filter V2: FilterPush", function () { // Confirm both messages were received. expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage({ - index: 0, + messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1" }); - messageCollector.verifyReceivedMessage({ - index: 1, + messageCollector.verifyReceivedMessage(1, { expectedMessageText: "M2" }); }); @@ -270,12 +267,10 @@ describe("Waku Filter V2: FilterPush", function () { // Confirm both messages were received. expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage({ - index: 0, + messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1" }); - messageCollector.verifyReceivedMessage({ - index: 1, + messageCollector.verifyReceivedMessage(1, { expectedMessageText: "M2" }); }); diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index d8d57595b3..8dfdda677e 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -12,21 +12,21 @@ import { expect } from "chai"; import { delay, makeLogFileName, + MessageCollector, NimGoNode, + tearDownNodes, TEST_STRING } from "../../src/index.js"; import { generateTestData, - MessageCollector, messagePayload, messageText, - setupNodes, - tearDownNodes, + runNodes, TestContentTopic, TestDecoder, TestEncoder -} from "./filter_test_utils.js"; +} from "./utils.js"; describe("Waku Filter V2: Subscribe", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level @@ -37,17 +37,15 @@ describe("Waku Filter V2: Subscribe", function () { let subscription: IFilterSubscription; let messageCollector: MessageCollector; - this.afterEach(async function () { - tearDownNodes(nwaku, waku, nwaku2); - }); - this.beforeEach(async function () { this.timeout(15000); - const setup = await setupNodes(this); - nwaku = setup.nwaku; - waku = setup.waku; - subscription = setup.subscription; - messageCollector = setup.messageCollector; + [nwaku, waku] = await runNodes(this); + subscription = await waku.filter.createSubscription(); + messageCollector = new MessageCollector(TestContentTopic); + }); + + this.afterEach(async function () { + tearDownNodes([nwaku, nwaku2], [waku]); }); it("Subscribe and receive messages via lightPush", async function () { @@ -56,7 +54,9 @@ describe("Waku Filter V2: Subscribe", function () { await waku.lightPush.send(TestEncoder, messagePayload); expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage({ index: 0 }); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText + }); expect((await nwaku.messages()).length).to.eq(1); }); @@ -74,7 +74,9 @@ describe("Waku Filter V2: Subscribe", function () { ); expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage({ index: 0 }); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText + }); expect((await nwaku.messages()).length).to.eq(1); }); @@ -84,7 +86,9 @@ describe("Waku Filter V2: Subscribe", function () { await waku.lightPush.send(TestEncoder, messagePayload); expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage({ index: 0 }); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText + }); // Send another message on the same topic. const newMessageText = "Filtering still works!"; @@ -94,9 +98,8 @@ describe("Waku Filter V2: Subscribe", function () { // Verify that the second message was successfully received. expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage({ - expectedMessageText: newMessageText, - index: 1 + messageCollector.verifyReceivedMessage(1, { + expectedMessageText: newMessageText }); expect((await nwaku.messages()).length).to.eq(2); }); @@ -106,7 +109,9 @@ describe("Waku Filter V2: Subscribe", function () { await subscription.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, messagePayload); expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage({ index: 0 }); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText + }); // Modify subscription to include a new content topic and send a message. const newMessageText = "Filtering still works!"; @@ -119,18 +124,16 @@ describe("Waku Filter V2: Subscribe", function () { payload: utf8ToBytes(newMessageText) }); expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage({ + messageCollector.verifyReceivedMessage(1, { expectedContentTopic: newContentTopic, - expectedMessageText: newMessageText, - index: 1 + expectedMessageText: newMessageText }); // Send another message on the initial content topic to verify it still works. await waku.lightPush.send(TestEncoder, newMessagePayload); expect(await messageCollector.waitForMessages(3)).to.eq(true); - messageCollector.verifyReceivedMessage({ - expectedMessageText: newMessageText, - index: 2 + messageCollector.verifyReceivedMessage(2, { + expectedMessageText: newMessageText }); expect((await nwaku.messages()).length).to.eq(3); }); @@ -154,10 +157,9 @@ describe("Waku Filter V2: Subscribe", function () { // Verify that each message was received on the corresponding topic. expect(await messageCollector.waitForMessages(20)).to.eq(true); td.contentTopics.forEach((topic, index) => { - messageCollector.verifyReceivedMessage({ + messageCollector.verifyReceivedMessage(index, { expectedContentTopic: topic, - expectedMessageText: `Message for Topic ${index + 1}`, - index: index + expectedMessageText: `Message for Topic ${index + 1}` }); }); }); @@ -179,10 +181,9 @@ describe("Waku Filter V2: Subscribe", function () { // Verify that each message was received on the corresponding topic. expect(await messageCollector.waitForMessages(30)).to.eq(true); td.contentTopics.forEach((topic, index) => { - messageCollector.verifyReceivedMessage({ + messageCollector.verifyReceivedMessage(index, { expectedContentTopic: topic, - expectedMessageText: `Message for Topic ${index + 1}`, - index: index + expectedMessageText: `Message for Topic ${index + 1}` }); }); }); @@ -253,12 +254,10 @@ describe("Waku Filter V2: Subscribe", function () { // Confirm both messages were received. expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage({ - index: 0, + messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1" }); - messageCollector.verifyReceivedMessage({ - index: 1, + messageCollector.verifyReceivedMessage(1, { expectedMessageText: "M2" }); }); @@ -273,8 +272,8 @@ describe("Waku Filter V2: Subscribe", function () { await waku.lightPush.send(newEncoder, messagePayload); expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage({ - index: 0, + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, expectedContentTopic: newContentTopic }); }); @@ -295,12 +294,10 @@ describe("Waku Filter V2: Subscribe", function () { // Check if both messages were received expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage({ - index: 0, + messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1" }); - messageCollector.verifyReceivedMessage({ - index: 1, + messageCollector.verifyReceivedMessage(1, { expectedContentTopic: newContentTopic, expectedMessageText: "M2" }); @@ -332,12 +329,10 @@ describe("Waku Filter V2: Subscribe", function () { // Check if both messages were received expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage({ - index: 0, + messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1" }); - messageCollector.verifyReceivedMessage({ - index: 1, + messageCollector.verifyReceivedMessage(1, { expectedContentTopic: newContentTopic, expectedMessageText: "M2" }); diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index 6e2b7355b5..df73ec59c7 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -3,18 +3,17 @@ import type { IFilterSubscription, LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; -import { NimGoNode } from "../../src/index.js"; +import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js"; import { generateTestData, - MessageCollector, messagePayload, - setupNodes, - tearDownNodes, + messageText, + runNodes, TestContentTopic, TestDecoder, TestEncoder -} from "./filter_test_utils.js"; +} from "./utils.js"; describe("Waku Filter V2: Unsubscribe", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level @@ -24,17 +23,15 @@ describe("Waku Filter V2: Unsubscribe", function () { let subscription: IFilterSubscription; let messageCollector: MessageCollector; - this.afterEach(async function () { - tearDownNodes(nwaku, waku); - }); - this.beforeEach(async function () { this.timeout(15000); - const setup = await setupNodes(this); - nwaku = setup.nwaku; - waku = setup.waku; - subscription = setup.subscription; - messageCollector = setup.messageCollector; + [nwaku, waku] = await runNodes(this); + subscription = await waku.filter.createSubscription(); + messageCollector = new MessageCollector(TestContentTopic); + }); + + this.afterEach(async function () { + tearDownNodes([nwaku], [waku]); }); it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () { @@ -48,7 +45,9 @@ describe("Waku Filter V2: Unsubscribe", function () { expect(await messageCollector.waitForMessages(2)).to.eq(false); // Check that from 2 messages send only the 1st was received - messageCollector.verifyReceivedMessage({ index: 0 }); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText + }); expect(messageCollector.count).to.eq(1); expect((await nwaku.messages()).length).to.eq(2); }); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts new file mode 100644 index 0000000000..988bf49dbb --- /dev/null +++ b/packages/tests/tests/filter/utils.ts @@ -0,0 +1,97 @@ +import { + createDecoder, + createEncoder, + Decoder, + Encoder, + waitForRemotePeer +} from "@waku/core"; +import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import debug from "debug"; +import { Context } from "mocha"; + +import { makeLogFileName, NimGoNode, NOISE_KEY_1 } from "../../src/index.js"; + +// Constants for test configuration. +export const log = debug("waku:test:filter"); +export const TestContentTopic = "/test/1/waku-filter"; +export const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); +export const TestDecoder = createDecoder(TestContentTopic); +export const messageText = "Filtering works!"; +export const messagePayload = { payload: utf8ToBytes(messageText) }; + +// Utility to generate test data for multiple topics tests. +export function generateTestData(topicCount: number): { + contentTopics: string[]; + encoders: Encoder[]; + decoders: Decoder[]; +} { + const contentTopics = Array.from( + { length: topicCount }, + (_, i) => `/test/${i + 1}/waku-multi` + ); + const encoders = contentTopics.map((topic) => + createEncoder({ contentTopic: topic }) + ); + const decoders = contentTopics.map((topic) => createDecoder(topic)); + return { + contentTopics, + encoders, + decoders + }; +} + +// Utility to validate errors related to pings in the subscription. +export async function validatePingError( + subscription: IFilterSubscription +): Promise { + try { + await subscription.ping(); + throw new Error( + "Ping was successful but was expected to fail with a specific error." + ); + } catch (err) { + if ( + err instanceof Error && + err.message.includes("peer has no subscriptions") + ) { + return; + } else { + throw err; + } + } +} + +export async function runNodes( + currentTest: Context +): Promise<[NimGoNode, LightNode]> { + const nwaku = new NimGoNode(makeLogFileName(currentTest)); + await nwaku.startWithRetries( + { + filter: true, + lightpush: true, + relay: true + }, + { retries: 3 } + ); + + let waku: LightNode | undefined; + try { + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }); + await waku.start(); + } catch (error) { + log("jswaku node failed to start:", error); + } + + if (waku) { + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + return [nwaku, waku]; + } else { + throw new Error("Failed to initialize waku"); + } +} diff --git a/packages/tests/tests/light-push/custom_pubsub.node.spec.ts b/packages/tests/tests/light-push/custom_pubsub.node.spec.ts new file mode 100644 index 0000000000..fa36bad14d --- /dev/null +++ b/packages/tests/tests/light-push/custom_pubsub.node.spec.ts @@ -0,0 +1,49 @@ +import { LightNode } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js"; + +import { + messageText, + runNodes, + TestContentTopic, + TestEncoder +} from "./utils.js"; + +describe("Waku Light Push [node only] - custom pubsub topic", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + let messageCollector: MessageCollector; + const customPubSubTopic = "/waku/2/custom-dapp/proto"; + + beforeEach(async function () { + [nwaku, waku] = await runNodes(this, customPubSubTopic); + messageCollector = new MessageCollector( + TestContentTopic, + nwaku, + customPubSubTopic + ); + }); + + this.afterEach(async function () { + tearDownNodes([nwaku], [waku]); + }); + + it("Push message", async function () { + const nimPeerId = await nwaku.getPeerId(); + + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(messageText) + }); + + expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + }); +}); diff --git a/packages/tests/tests/light-push/index.spec.ts b/packages/tests/tests/light-push/index.spec.ts new file mode 100644 index 0000000000..3334cd76fb --- /dev/null +++ b/packages/tests/tests/light-push/index.spec.ts @@ -0,0 +1,234 @@ +import { createEncoder, DefaultPubSubTopic } from "@waku/core"; +import { IRateLimitProof, LightNode, SendError } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + MessageCollector, + NimGoNode, + tearDownNodes, + TEST_STRING +} from "../../src/index.js"; +import { generateRandomUint8Array } from "../../src/random_array.js"; + +import { + messagePayload, + messageText, + runNodes, + TestContentTopic, + TestEncoder +} from "./utils.js"; + +describe("Waku Light Push [node only]", function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + let messageCollector: MessageCollector; + + this.beforeEach(async function () { + this.timeout(15000); + [nwaku, waku] = await runNodes(this); + messageCollector = new MessageCollector( + TestContentTopic, + nwaku, + DefaultPubSubTopic + ); + }); + + this.afterEach(async function () { + tearDownNodes([nwaku], [waku]); + }); + + TEST_STRING.forEach((testItem) => { + it(`Push message with ${testItem.description} payload`, async function () { + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(testItem.value) + }); + expect(pushResponse.recipients.length).to.eq(1); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: testItem.value + }); + }); + }); + + it("Push 30 different messages", async function () { + const generateMessageText = (index: number): string => `M${index}`; + + for (let i = 0; i < 30; i++) { + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(generateMessageText(i)) + }); + expect(pushResponse.recipients.length).to.eq(1); + } + + expect(await messageCollector.waitForMessages(30)).to.eq(true); + + for (let i = 0; i < 30; i++) { + messageCollector.verifyReceivedMessage(i, { + expectedMessageText: generateMessageText(i) + }); + } + }); + + it("Fails to push message with empty payload", async function () { + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes("") + }); + + if (nwaku.type() == "go-waku") { + expect(pushResponse.recipients.length).to.eq(1); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: undefined + }); + } else { + expect(pushResponse.recipients.length).to.eq(0); + expect(pushResponse.errors).to.include(SendError.NO_RPC_RESPONSE); + expect(await messageCollector.waitForMessages(1)).to.eq(false); + } + }); + + TEST_STRING.forEach((testItem) => { + it(`Push message with content topic containing ${testItem.description}`, async function () { + const customEncoder = createEncoder({ + contentTopic: testItem.value + }); + const pushResponse = await waku.lightPush.send( + customEncoder, + messagePayload + ); + expect(pushResponse.recipients.length).to.eq(1); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: testItem.value + }); + }); + }); + + it("Fails to push message with empty content topic", async function () { + try { + createEncoder({ contentTopic: "" }); + expect.fail("Expected an error but didn't get one"); + } catch (error) { + expect((error as Error).message).to.equal( + "Content topic must be specified" + ); + } + }); + + it("Push message with meta", async function () { + const customTestEncoder = createEncoder({ + contentTopic: TestContentTopic, + metaSetter: () => new Uint8Array(10) + }); + + const pushResponse = await waku.lightPush.send( + customTestEncoder, + messagePayload + ); + expect(pushResponse.recipients.length).to.eq(1); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText + }); + }); + + it("Fails to push message with large meta", async function () { + const customTestEncoder = createEncoder({ + contentTopic: TestContentTopic, + metaSetter: () => new Uint8Array(10 ** 6) + }); + + const pushResponse = await waku.lightPush.send( + customTestEncoder, + messagePayload + ); + + if (nwaku.type() == "go-waku") { + expect(pushResponse.recipients.length).to.eq(1); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText + }); + } else { + expect(pushResponse.recipients.length).to.eq(0); + expect(pushResponse.errors).to.include(SendError.NO_RPC_RESPONSE); + expect(await messageCollector.waitForMessages(1)).to.eq(false); + } + }); + + it("Push message with rate limit", async function () { + const rateLimitProof: IRateLimitProof = { + proof: utf8ToBytes("proofData"), + merkleRoot: utf8ToBytes("merkleRootData"), + epoch: utf8ToBytes("epochData"), + shareX: utf8ToBytes("shareXData"), + shareY: utf8ToBytes("shareYData"), + nullifier: utf8ToBytes("nullifierData"), + rlnIdentifier: utf8ToBytes("rlnIdentifierData") + }; + + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(messageText), + rateLimitProof: rateLimitProof + }); + expect(pushResponse.recipients.length).to.eq(1); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText + }); + }); + + [ + Date.now() - 3600000 * 24 * 356, + Date.now() - 3600000, + Date.now() + 3600000 + ].forEach((testItem) => { + it(`Push message with custom timestamp: ${testItem}`, async function () { + const customTimeNanos = BigInt(testItem) * BigInt(1000000); + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes(messageText), + timestamp: new Date(testItem) + }); + expect(pushResponse.recipients.length).to.eq(1); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedTimestamp: customTimeNanos + }); + }); + }); + + it("Push message equal or less that 1MB", async function () { + const oneMbPayload = generateRandomUint8Array(1024 ** 2); + let pushResponse = await waku.lightPush.send(TestEncoder, { + payload: oneMbPayload + }); + expect(pushResponse.recipients.length).to.greaterThan(0); + + const bigPayload = generateRandomUint8Array(65536); + pushResponse = await waku.lightPush.send(TestEncoder, { + payload: bigPayload + }); + expect(pushResponse.recipients.length).to.greaterThan(0); + }); + + it("Fails to push message bigger that 1MB", async function () { + const MB = 1024 ** 2; + + const pushResponse = await waku.lightPush.send(TestEncoder, { + payload: generateRandomUint8Array(MB + 65536) + }); + expect(pushResponse.recipients.length).to.eq(0); + expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG); + expect(await messageCollector.waitForMessages(1)).to.eq(false); + }); +}); diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts new file mode 100644 index 0000000000..cbe4cc32cb --- /dev/null +++ b/packages/tests/tests/light-push/utils.ts @@ -0,0 +1,48 @@ +import { createEncoder, waitForRemotePeer } from "@waku/core"; +import { LightNode, Protocols } from "@waku/interfaces"; +import { createLightNode, utf8ToBytes } from "@waku/sdk"; +import debug from "debug"; + +import { makeLogFileName, NimGoNode, NOISE_KEY_1 } from "../../src/index.js"; + +// Constants for test configuration. +export const log = debug("waku:test:lightpush"); +export const TestContentTopic = "/test/1/waku-light-push/utf8"; +export const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); +export const messageText = "Light Push works!"; +export const messagePayload = { payload: utf8ToBytes(messageText) }; + +export async function runNodes( + context: Mocha.Context, + pubSubTopic?: string +): Promise<[NimGoNode, LightNode]> { + const nwakuOptional = pubSubTopic ? { topic: pubSubTopic } : {}; + const nwaku = new NimGoNode(makeLogFileName(context)); + await nwaku.startWithRetries( + { + lightpush: true, + relay: true, + ...nwakuOptional + }, + { retries: 3 } + ); + + let waku: LightNode | undefined; + try { + waku = await createLightNode({ + pubSubTopic, + staticNoiseKey: NOISE_KEY_1 + }); + await waku.start(); + } catch (error) { + log("jswaku node failed to start:", error); + } + + if (waku) { + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + return [nwaku, waku]; + } else { + throw new Error("Failed to initialize waku"); + } +} diff --git a/packages/tests/tests/light_push.node.spec.ts b/packages/tests/tests/light_push.node.spec.ts deleted file mode 100644 index 5b14f78676..0000000000 --- a/packages/tests/tests/light_push.node.spec.ts +++ /dev/null @@ -1,158 +0,0 @@ -import { createEncoder, waitForRemotePeer } from "@waku/core"; -import { LightNode, SendError } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; -import { createLightNode } from "@waku/sdk"; -import { utf8ToBytes } from "@waku/utils/bytes"; -import { expect } from "chai"; -import debug from "debug"; - -import { - base64ToUtf8, - delay, - makeLogFileName, - NimGoNode, - NOISE_KEY_1 -} from "../src/index.js"; -import { MessageRpcResponse } from "../src/node/interfaces.js"; -import { generateRandomUint8Array } from "../src/random_array.js"; - -const log = debug("waku:test:lightpush"); - -const TestContentTopic = "/test/1/waku-light-push/utf8"; -const TestEncoder = createEncoder({ - contentTopic: TestContentTopic -}); - -async function runNodes( - context: Mocha.Context, - pubSubTopic?: string -): Promise<[NimGoNode, LightNode]> { - const nwakuOptional = pubSubTopic ? { topic: pubSubTopic } : {}; - const nwaku = new NimGoNode(makeLogFileName(context)); - await nwaku.start({ - lightpush: true, - relay: true, - ...nwakuOptional - }); - - const waku = await createLightNode({ - pubSubTopic, - staticNoiseKey: NOISE_KEY_1 - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.LightPush]); - - return [nwaku, waku]; -} - -describe("Waku Light Push [node only]", () => { - let waku: LightNode; - let nwaku: NimGoNode; - - beforeEach(async function () { - this.timeout(15_000); - [nwaku, waku] = await runNodes(this); - }); - - afterEach(async function () { - try { - await nwaku?.stop(); - await waku?.stop(); - } catch (e) { - console.error("Failed to stop nodes: ", e); - } - }); - - it("Push successfully", async function () { - this.timeout(15_000); - - const messageText = "Light Push works!"; - - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes(messageText) - }); - expect(pushResponse.recipients.length).to.eq(1); - - let msgs: MessageRpcResponse[] = []; - - while (msgs.length === 0) { - await delay(200); - msgs = await nwaku.messages(); - } - - expect(msgs[0].contentTopic).to.equal(TestContentTopic); - expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText); - }); - - it("Pushes messages equal or less that 1MB", async function () { - this.timeout(15_000); - const MB = 1024 ** 2; - - let pushResponse = await waku.lightPush.send(TestEncoder, { - payload: generateRandomUint8Array(MB) - }); - expect(pushResponse.recipients.length).to.greaterThan(0); - - pushResponse = await waku.lightPush.send(TestEncoder, { - payload: generateRandomUint8Array(65536) - }); - expect(pushResponse.recipients.length).to.greaterThan(0); - }); - - it("Fails to push message bigger that 1MB", async function () { - this.timeout(15_000); - const MB = 1024 ** 2; - - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: generateRandomUint8Array(MB + 65536) - }); - expect(pushResponse.recipients.length).to.eq(0); - expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG); - }); -}); - -describe("Waku Light Push [node only] - custom pubsub topic", () => { - let waku: LightNode; - let nwaku: NimGoNode; - const customPubSubTopic = "/waku/2/custom-dapp/proto"; - - beforeEach(async function () { - this.timeout(15_000); - [nwaku, waku] = await runNodes(this, customPubSubTopic); - }); - - afterEach(async function () { - try { - await nwaku?.stop(); - await waku?.stop(); - } catch (e) { - console.error("Failed to stop nodes: ", e); - } - }); - - it("Push message", async function () { - this.timeout(15_000); - - const nimPeerId = await nwaku.getPeerId(); - const messageText = "Light Push works!"; - - log("Send message via lightpush"); - const pushResponse = await waku.lightPush.send(TestEncoder, { - payload: utf8ToBytes(messageText) - }); - log("Ack received", pushResponse); - expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); - - let msgs: MessageRpcResponse[] = []; - - log("Waiting for message to show in nwaku"); - while (msgs.length === 0) { - await delay(200); - msgs = await nwaku.messages(customPubSubTopic); - } - - expect(msgs[0].contentTopic).to.equal(TestContentTopic); - expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText); - }); -});