diff --git a/packages/tests/src/message_collector.ts b/packages/tests/src/message_collector.ts index d37bb86e85..d8929dc2a4 100644 --- a/packages/tests/src/message_collector.ts +++ b/packages/tests/src/message_collector.ts @@ -18,11 +18,7 @@ export class MessageCollector { list: Array = []; callback: (msg: DecodedMessage) => void = () => {}; - constructor( - private contentTopic: string, - private nwaku?: NimGoNode, - private pubSubTopic = DefaultPubSubTopic - ) { + constructor(private nwaku?: NimGoNode) { if (!this.nwaku) { this.callback = (msg: DecodedMessage): void => { log("Got a message"); @@ -39,6 +35,12 @@ export class MessageCollector { return this.list[index]; } + hasMessage(topic: string, text: string): boolean { + return this.list.some( + (message) => message.contentTopic === topic && message.payload === text + ); + } + // Type guard to determine if a message is of type MessageRpcResponse isMessageRpcResponse( message: MessageRpcResponse | DecodedMessage @@ -51,14 +53,21 @@ export class MessageCollector { async waitForMessages( numMessages: number, - timeoutDuration: number = 400 + options?: { + pubSubTopic?: string; + timeoutDuration?: number; + exact?: boolean; + } ): Promise { const startTime = Date.now(); + const pubSubTopic = options?.pubSubTopic || DefaultPubSubTopic; + const timeoutDuration = options?.timeoutDuration || 400; + const exact = options?.exact || false; while (this.count < numMessages) { if (this.nwaku) { try { - this.list = await this.nwaku.messages(this.pubSubTopic); + this.list = await this.nwaku.messages(pubSubTopic); } catch (error) { log(`Can't retrieve messages because of ${error}`); await delay(10); @@ -72,7 +81,16 @@ export class MessageCollector { await delay(10); } - return true; + if (exact) { + if (this.count == numMessages) { + return true; + } else { + log(`Was expecting exactly ${numMessages} messages`); + return false; + } + } else { + return true; + } } // Verifies a received message against expected values. @@ -96,10 +114,8 @@ export class MessageCollector { const message = this.getMessage(index); expect(message.contentTopic).to.eq( - options.expectedContentTopic || this.contentTopic, - `Message content topic mismatch. Expected: ${ - options.expectedContentTopic || this.contentTopic - }. Got: ${message.contentTopic}` + options.expectedContentTopic, + `Message content topic mismatch. Expected: ${options.expectedContentTopic}. Got: ${message.contentTopic}` ); expect(message.version).to.eq( diff --git a/packages/tests/src/node/interfaces.ts b/packages/tests/src/node/interfaces.ts index 66ba21c71e..253158b410 100644 --- a/packages/tests/src/node/interfaces.ts +++ b/packages/tests/src/node/interfaces.ts @@ -14,7 +14,7 @@ export interface Args { peerExchange?: boolean; discv5Discovery?: boolean; storeMessageDbUrl?: string; - topic?: string; + topic?: Array; rpcPrivate?: boolean; websocketSupport?: boolean; tcpPort?: number; diff --git a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts new file mode 100644 index 0000000000..03050db036 --- /dev/null +++ b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts @@ -0,0 +1,148 @@ +import { + createDecoder, + createEncoder, + DefaultPubSubTopic, + waitForRemotePeer +} from "@waku/core"; +import type { IFilterSubscription, LightNode } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + makeLogFileName, + MessageCollector, + NimGoNode, + tearDownNodes +} from "../../src/index.js"; + +import { + runNodes, + TestContentTopic, + TestDecoder, + TestEncoder +} from "./utils.js"; + +describe("Waku Filter V2: Multiple PubSubtopics", function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(30000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + let subscription: IFilterSubscription; + let messageCollector: MessageCollector; + const customPubSubTopic = "/waku/2/custom-dapp/proto"; + const customContentTopic = "/test/2/waku-filter"; + const newEncoder = createEncoder({ + pubSubTopic: customPubSubTopic, + contentTopic: customContentTopic + }); + const newDecoder = createDecoder(customContentTopic, customPubSubTopic); + + this.beforeEach(async function () { + this.timeout(15000); + [nwaku, waku] = await runNodes(this, [ + customPubSubTopic, + DefaultPubSubTopic + ]); + subscription = await waku.filter.createSubscription(customPubSubTopic); + messageCollector = new MessageCollector(); + }); + + this.afterEach(async function () { + tearDownNodes([nwaku, nwaku2], [waku]); + }); + + it("Subscribe and receive messages on custom pubsubtopic", async function () { + await subscription.subscribe([newDecoder], messageCollector.callback); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M1") }); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic, + expectedPubSubTopic: customPubSubTopic, + expectedMessageText: "M1" + }); + }); + + it("Subscribe and receive messages on 2 different pubsubtopics", async function () { + await subscription.subscribe([newDecoder], messageCollector.callback); + + // Subscribe from the same lightnode to the 2nd pubSubtopic + const subscription2 = + await waku.filter.createSubscription(DefaultPubSubTopic); + + const messageCollector2 = new MessageCollector(); + + await subscription2.subscribe([TestDecoder], messageCollector2.callback); + + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + expect(await messageCollector2.waitForMessages(1)).to.eq(true); + + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic, + expectedPubSubTopic: customPubSubTopic, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: TestContentTopic, + expectedPubSubTopic: DefaultPubSubTopic, + expectedMessageText: "M2" + }); + }); + + it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { + await subscription.subscribe([newDecoder], messageCollector.callback); + + // Set up and start a new nwaku node with Default PubSubtopic + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + topic: [DefaultPubSubTopic] + }); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + + // Subscribe from the same lightnode to the new nwaku on the new pubSubtopic + const subscription2 = await waku.filter.createSubscription( + DefaultPubSubTopic, + await nwaku2.getPeerId() + ); + await nwaku.ensureSubscriptions([DefaultPubSubTopic]); + + const messageCollector2 = new MessageCollector(); + + await subscription2.subscribe([TestDecoder], messageCollector2.callback); + + // Making sure that messages are send and reveiced for both subscriptions + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await messageCollector.waitForMessages(1, { + pubSubTopic: customPubSubTopic + })) || + !(await messageCollector2.waitForMessages(1, { + pubSubTopic: DefaultPubSubTopic + })) + ) { + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); + } + + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic, + expectedPubSubTopic: customPubSubTopic, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: TestContentTopic, + expectedPubSubTopic: DefaultPubSubTopic, + expectedMessageText: "M2" + }); + }); +}); diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index b35da7281b..d2b17afe59 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -1,3 +1,4 @@ +import { DefaultPubSubTopic } from "@waku/core"; import type { IFilterSubscription, LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -22,9 +23,9 @@ describe("Waku Filter V2: Ping", function () { this.beforeEach(async function () { this.timeout(15000); - [nwaku, waku] = await runNodes(this); + [nwaku, waku] = await runNodes(this, [DefaultPubSubTopic]); subscription = await waku.filter.createSubscription(); - messageCollector = new MessageCollector(TestContentTopic); + messageCollector = new MessageCollector(); }); this.afterEach(async function () { diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index b695193d34..cd32af89a3 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -31,9 +31,9 @@ describe("Waku Filter V2: FilterPush", function () { this.beforeEach(async function () { this.timeout(15000); - [nwaku, waku] = await runNodes(this); + [nwaku, waku] = await runNodes(this, [DefaultPubSubTopic]); subscription = await waku.filter.createSubscription(); - messageCollector = new MessageCollector(TestContentTopic); + messageCollector = new MessageCollector(); }); this.afterEach(async function () { @@ -49,7 +49,8 @@ describe("Waku Filter V2: FilterPush", function () { expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: testItem.value + expectedMessageText: testItem.value, + expectedContentTopic: TestContentTopic }); }); }); @@ -71,7 +72,8 @@ describe("Waku Filter V2: FilterPush", function () { expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, - checkTimestamp: false + checkTimestamp: false, + expectedContentTopic: TestContentTopic }); // Check if the timestamp matches @@ -217,7 +219,8 @@ describe("Waku Filter V2: FilterPush", function () { expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic }); }); @@ -245,10 +248,12 @@ describe("Waku Filter V2: FilterPush", function () { // Confirm both messages were received. expect(await messageCollector.waitForMessages(2)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1" + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic }); messageCollector.verifyReceivedMessage(1, { - expectedMessageText: "M2" + expectedMessageText: "M2", + expectedContentTopic: TestContentTopic }); }); @@ -268,10 +273,12 @@ describe("Waku Filter V2: FilterPush", function () { // Confirm both messages were received. expect(await messageCollector.waitForMessages(2)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1" + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic }); messageCollector.verifyReceivedMessage(1, { - expectedMessageText: "M2" + expectedMessageText: "M2", + expectedContentTopic: TestContentTopic }); }); }); diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index 0ded20d724..22d63b1e3f 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -39,9 +39,9 @@ describe("Waku Filter V2: Subscribe", function () { this.beforeEach(async function () { this.timeout(15000); - [nwaku, waku] = await runNodes(this); + [nwaku, waku] = await runNodes(this, [DefaultPubSubTopic]); subscription = await waku.filter.createSubscription(); - messageCollector = new MessageCollector(TestContentTopic); + messageCollector = new MessageCollector(); // Nwaku subscribe to the default pubsub topic await nwaku.ensureSubscriptions(); @@ -58,7 +58,8 @@ describe("Waku Filter V2: Subscribe", function () { expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic }); expect((await nwaku.messages()).length).to.eq(1); }); @@ -78,7 +79,8 @@ describe("Waku Filter V2: Subscribe", function () { expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic }); expect((await nwaku.messages()).length).to.eq(1); }); @@ -90,7 +92,8 @@ describe("Waku Filter V2: Subscribe", function () { expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic }); // Send another message on the same topic. @@ -102,7 +105,8 @@ describe("Waku Filter V2: Subscribe", function () { // Verify that the second message was successfully received. expect(await messageCollector.waitForMessages(2)).to.eq(true); messageCollector.verifyReceivedMessage(1, { - expectedMessageText: newMessageText + expectedMessageText: newMessageText, + expectedContentTopic: TestContentTopic }); expect((await nwaku.messages()).length).to.eq(2); }); @@ -113,7 +117,8 @@ describe("Waku Filter V2: Subscribe", function () { await waku.lightPush.send(TestEncoder, messagePayload); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic }); // Modify subscription to include a new content topic and send a message. @@ -136,7 +141,8 @@ describe("Waku Filter V2: Subscribe", function () { await waku.lightPush.send(TestEncoder, newMessagePayload); expect(await messageCollector.waitForMessages(3)).to.eq(true); messageCollector.verifyReceivedMessage(2, { - expectedMessageText: newMessageText + expectedMessageText: newMessageText, + expectedContentTopic: TestContentTopic }); expect((await nwaku.messages()).length).to.eq(3); }); @@ -258,10 +264,12 @@ describe("Waku Filter V2: Subscribe", function () { // Confirm both messages were received. expect(await messageCollector.waitForMessages(2)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1" + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic }); messageCollector.verifyReceivedMessage(1, { - expectedMessageText: "M2" + expectedMessageText: "M2", + expectedContentTopic: TestContentTopic }); }); @@ -298,7 +306,8 @@ describe("Waku Filter V2: Subscribe", function () { // Check if both messages were received expect(await messageCollector.waitForMessages(2)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1" + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic }); messageCollector.verifyReceivedMessage(1, { expectedContentTopic: newContentTopic, @@ -306,38 +315,32 @@ describe("Waku Filter V2: Subscribe", function () { }); }); - // this test fail 50% of times with messageCount being 1. Seems like a message is lost somehow - it.skip("Subscribe and receive messages from multiple nwaku nodes", async function () { + it("Subscribe and receive messages from multiple nwaku nodes", async function () { await subscription.subscribe([TestDecoder], messageCollector.callback); - await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); - expect(await messageCollector.waitForMessages(1)).to.eq(true); // Set up and start a new nwaku node nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); await nwaku2.start({ filter: true, lightpush: true, relay: true }); - await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); const subscription2 = await waku.filter.createSubscription( DefaultPubSubTopic, await nwaku2.getPeerId() ); - // Send a message using the new subscription const newContentTopic = "/test/2/waku-filter"; const newEncoder = createEncoder({ contentTopic: newContentTopic }); const newDecoder = createDecoder(newContentTopic); await subscription2.subscribe([newDecoder], messageCollector.callback); - await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + + // Making sure that messages are send and reveiced for both subscriptions + while (!(await messageCollector.waitForMessages(2))) { + await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") }); + } // Check if both messages were received - expect(await messageCollector.waitForMessages(2)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: "M1" - }); - messageCollector.verifyReceivedMessage(1, { - expectedContentTopic: newContentTopic, - expectedMessageText: "M2" - }); + expect(messageCollector.hasMessage(TestContentTopic, "M1")).to.be.true; + expect(messageCollector.hasMessage(newContentTopic, "M2")).to.be.true; }); }); diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index b1591c2911..6ac79010c0 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -1,4 +1,4 @@ -import { createDecoder, createEncoder } from "@waku/core"; +import { createDecoder, createEncoder, DefaultPubSubTopic } from "@waku/core"; import type { IFilterSubscription, LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; @@ -25,9 +25,9 @@ describe("Waku Filter V2: Unsubscribe", function () { this.beforeEach(async function () { this.timeout(15000); - [nwaku, waku] = await runNodes(this); + [nwaku, waku] = await runNodes(this, [DefaultPubSubTopic]); subscription = await waku.filter.createSubscription(); - messageCollector = new MessageCollector(TestContentTopic); + messageCollector = new MessageCollector(); // Nwaku subscribe to the default pubsub topic await nwaku.ensureSubscriptions(); @@ -49,7 +49,8 @@ describe("Waku Filter V2: Unsubscribe", function () { // Check that from 2 messages send only the 1st was received messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic }); expect(messageCollector.count).to.eq(1); expect((await nwaku.messages()).length).to.eq(2); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 988bf49dbb..b1a8c102e4 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -64,14 +64,17 @@ export async function validatePingError( } export async function runNodes( - currentTest: Context + context: Context, + pubSubTopics: string[] ): Promise<[NimGoNode, LightNode]> { - const nwaku = new NimGoNode(makeLogFileName(currentTest)); + const nwaku = new NimGoNode(makeLogFileName(context)); + await nwaku.startWithRetries( { filter: true, lightpush: true, - relay: true + relay: true, + topic: pubSubTopics }, { retries: 3 } ); @@ -79,6 +82,7 @@ export async function runNodes( let waku: LightNode | undefined; try { waku = await createLightNode({ + pubSubTopics: pubSubTopics, staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }); @@ -90,6 +94,7 @@ export async function runNodes( if (waku) { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + await nwaku.ensureSubscriptions(pubSubTopics); return [nwaku, waku]; } else { throw new Error("Failed to initialize waku"); diff --git a/packages/tests/tests/light-push/custom_pubsub.node.spec.ts b/packages/tests/tests/light-push/custom_pubsub.node.spec.ts deleted file mode 100644 index fa8247a786..0000000000 --- a/packages/tests/tests/light-push/custom_pubsub.node.spec.ts +++ /dev/null @@ -1,52 +0,0 @@ -import { createEncoder } from "@waku/core"; -import { LightNode } from "@waku/interfaces"; -import { utf8ToBytes } from "@waku/utils/bytes"; -import { expect } from "chai"; - -import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js"; - -import { messageText, runNodes, TestContentTopic } from "./utils.js"; - -describe("Waku Light Push [node only] - custom pubsub topic", function () { - this.timeout(15000); - let waku: LightNode; - let nwaku: NimGoNode; - let messageCollector: MessageCollector; - const customPubSubTopic = "/waku/2/custom-dapp/proto"; - - beforeEach(async function () { - [nwaku, waku] = await runNodes(this, customPubSubTopic); - messageCollector = new MessageCollector( - TestContentTopic, - nwaku, - customPubSubTopic - ); - - await nwaku.ensureSubscriptions([customPubSubTopic]); - }); - - this.afterEach(async function () { - tearDownNodes([nwaku], [waku]); - }); - - it("Push message", async function () { - const nimPeerId = await nwaku.getPeerId(); - - const testEncoder = createEncoder({ - contentTopic: TestContentTopic, - pubSubTopic: customPubSubTopic - }); - - const pushResponse = await waku.lightPush.send(testEncoder, { - payload: utf8ToBytes(messageText) - }); - - expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); - - expect(await messageCollector.waitForMessages(1)).to.eq(true); - messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText, - expectedContentTopic: TestContentTopic - }); - }); -}); diff --git a/packages/tests/tests/light-push/index.spec.ts b/packages/tests/tests/light-push/index.spec.ts index db2e29680b..d06c7edfc8 100644 --- a/packages/tests/tests/light-push/index.spec.ts +++ b/packages/tests/tests/light-push/index.spec.ts @@ -19,7 +19,7 @@ import { TestEncoder } from "./utils.js"; -describe("Waku Light Push [node only]", function () { +describe("Waku Light Push", function () { // Set the timeout for all tests in this suite. Can be overwritten at test level this.timeout(15000); let waku: LightNode; @@ -28,12 +28,8 @@ describe("Waku Light Push [node only]", function () { this.beforeEach(async function () { this.timeout(15000); - [nwaku, waku] = await runNodes(this); - messageCollector = new MessageCollector( - TestContentTopic, - nwaku, - DefaultPubSubTopic - ); + [nwaku, waku] = await runNodes(this, [DefaultPubSubTopic]); + messageCollector = new MessageCollector(nwaku); await nwaku.ensureSubscriptions(); }); @@ -51,7 +47,8 @@ describe("Waku Light Push [node only]", function () { expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: testItem.value + expectedMessageText: testItem.value, + expectedContentTopic: TestContentTopic }); }); }); @@ -70,7 +67,8 @@ describe("Waku Light Push [node only]", function () { for (let i = 0; i < 30; i++) { messageCollector.verifyReceivedMessage(i, { - expectedMessageText: generateMessageText(i) + expectedMessageText: generateMessageText(i), + expectedContentTopic: TestContentTopic }); } }); @@ -84,7 +82,8 @@ describe("Waku Light Push [node only]", function () { expect(pushResponse.recipients.length).to.eq(1); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: undefined + expectedMessageText: undefined, + expectedContentTopic: TestContentTopic }); } else { expect(pushResponse.recipients.length).to.eq(0); @@ -138,7 +137,8 @@ describe("Waku Light Push [node only]", function () { expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic }); }); @@ -157,7 +157,8 @@ describe("Waku Light Push [node only]", function () { expect(pushResponse.recipients.length).to.eq(1); expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic }); } else { expect(pushResponse.recipients.length).to.eq(0); @@ -186,7 +187,8 @@ describe("Waku Light Push [node only]", function () { expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { - expectedMessageText: messageText + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic }); }); @@ -206,7 +208,8 @@ describe("Waku Light Push [node only]", function () { expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, - expectedTimestamp: customTimeNanos + expectedTimestamp: customTimeNanos, + expectedContentTopic: TestContentTopic }); }); }); diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts new file mode 100644 index 0000000000..f942b654a6 --- /dev/null +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -0,0 +1,153 @@ +import type { PeerId } from "@libp2p/interface/peer-id"; +import { + createEncoder, + DefaultPubSubTopic, + waitForRemotePeer +} from "@waku/core"; +import { LightNode, Protocols, SendResult } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + makeLogFileName, + MessageCollector, + NimGoNode, + tearDownNodes +} from "../../src/index.js"; + +import { + messageText, + runNodes, + TestContentTopic, + TestEncoder +} from "./utils.js"; + +describe("Waku Light Push : Multiple PubSubtopics", function () { + this.timeout(30000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + let messageCollector: MessageCollector; + const customPubSubTopic = "/waku/2/custom-dapp/proto"; + const customContentTopic = "/test/2/waku-light-push/utf8"; + const customEncoder = createEncoder({ + contentTopic: customContentTopic, + pubSubTopic: customPubSubTopic + }); + let nimPeerId: PeerId; + + beforeEach(async function () { + [nwaku, waku] = await runNodes(this, [ + customPubSubTopic, + DefaultPubSubTopic + ]); + messageCollector = new MessageCollector(nwaku); + nimPeerId = await nwaku.getPeerId(); + }); + + this.afterEach(async function () { + tearDownNodes([nwaku, nwaku2], [waku]); + }); + + it("Push message on custom pubSubTopic", async function () { + const pushResponse = await waku.lightPush.send(customEncoder, { + payload: utf8ToBytes(messageText) + }); + + expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); + + expect( + await messageCollector.waitForMessages(1, { + pubSubTopic: customPubSubTopic + }) + ).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: customContentTopic + }); + }); + + it("Subscribe and receive messages on 2 different pubsubtopics", async function () { + const pushResponse1 = await waku.lightPush.send(customEncoder, { + payload: utf8ToBytes("M1") + }); + const pushResponse2 = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes("M2") + }); + expect(pushResponse1.recipients[0].toString()).to.eq(nimPeerId.toString()); + expect(pushResponse2.recipients[0].toString()).to.eq(nimPeerId.toString()); + + const messageCollector2 = new MessageCollector(nwaku); + + expect( + await messageCollector.waitForMessages(1, { + pubSubTopic: customPubSubTopic + }) + ).to.eq(true); + + expect( + await messageCollector2.waitForMessages(1, { + pubSubTopic: DefaultPubSubTopic + }) + ).to.eq(true); + + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: customContentTopic, + expectedPubSubTopic: customPubSubTopic + }); + messageCollector2.verifyReceivedMessage(0, { + expectedMessageText: "M2", + expectedContentTopic: TestContentTopic, + expectedPubSubTopic: DefaultPubSubTopic + }); + }); + + it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () { + // Set up and start a new nwaku node with Default PubSubtopic + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + topic: [DefaultPubSubTopic] + }); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const messageCollector2 = new MessageCollector(nwaku2); + + let pushResponse1: SendResult; + let pushResponse2: SendResult; + // Making sure that we send messages to both nwaku nodes + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await messageCollector.waitForMessages(1, { + pubSubTopic: customPubSubTopic + })) || + !(await messageCollector2.waitForMessages(1, { + pubSubTopic: DefaultPubSubTopic + })) || + pushResponse1!.recipients[0].toString() === + pushResponse2!.recipients[0].toString() + ) { + pushResponse1 = await waku.lightPush.send(customEncoder, { + payload: utf8ToBytes("M1") + }); + pushResponse2 = await waku.lightPush.send(TestEncoder, { + payload: utf8ToBytes("M2") + }); + } + + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: customContentTopic, + expectedPubSubTopic: customPubSubTopic + }); + messageCollector2.verifyReceivedMessage(0, { + expectedMessageText: "M2", + expectedContentTopic: TestContentTopic, + expectedPubSubTopic: DefaultPubSubTopic + }); + }); +}); diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index 9e2d4b03b7..548dd34e4f 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -14,23 +14,18 @@ export const messagePayload = { payload: utf8ToBytes(messageText) }; export async function runNodes( context: Mocha.Context, - pubSubTopic?: string + pubSubTopics: string[] ): Promise<[NimGoNode, LightNode]> { - const nwakuOptional = pubSubTopic ? { topic: pubSubTopic } : {}; const nwaku = new NimGoNode(makeLogFileName(context)); await nwaku.startWithRetries( - { - lightpush: true, - relay: true, - ...nwakuOptional - }, + { lightpush: true, relay: true, topic: pubSubTopics }, { retries: 3 } ); let waku: LightNode | undefined; try { waku = await createLightNode({ - pubSubTopics: pubSubTopic ? [pubSubTopic] : undefined, + pubSubTopics: pubSubTopics, staticNoiseKey: NOISE_KEY_1 }); await waku.start(); @@ -41,6 +36,7 @@ export async function runNodes( if (waku) { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); + await nwaku.ensureSubscriptions(pubSubTopics); return [nwaku, waku]; } else { throw new Error("Failed to initialize waku"); diff --git a/packages/tests/tests/relay.node.spec.ts b/packages/tests/tests/relay.node.spec.ts index ecbae84ce0..8d6fae2f50 100644 --- a/packages/tests/tests/relay.node.spec.ts +++ b/packages/tests/tests/relay.node.spec.ts @@ -27,6 +27,7 @@ import debug from "debug"; import { delay, makeLogFileName, + MessageCollector, NOISE_KEY_1, NOISE_KEY_2, NOISE_KEY_3 @@ -260,13 +261,14 @@ describe("Waku Relay [node only]", () => { let waku2: RelayNode; let waku3: RelayNode; - const pubSubTopic = "/some/pubsub/topic"; + const CustomContentTopic = "/test/2/waku-relay/utf8"; + const CustomPubSubTopic = "/some/pubsub/topic"; - const CustomTopicEncoder = createEncoder({ - contentTopic: TestContentTopic, - pubSubTopic: pubSubTopic + const CustomEncoder = createEncoder({ + contentTopic: CustomContentTopic, + pubSubTopic: CustomPubSubTopic }); - const CustomTopicDecoder = createDecoder(TestContentTopic, pubSubTopic); + const CustomDecoder = createDecoder(CustomContentTopic, CustomPubSubTopic); afterEach(async function () { !!waku1 && @@ -277,18 +279,196 @@ describe("Waku Relay [node only]", () => { waku3.stop().catch((e) => console.log("Waku failed to stop", e)); }); - it("Publish", async function () { + [ + { + pubsub: CustomPubSubTopic, + encoder: CustomEncoder, + decoder: CustomDecoder + }, + { + pubsub: DefaultPubSubTopic, + encoder: TestEncoder, + decoder: TestDecoder + } + ].forEach((testItem) => { + it(`3 nodes on ${testItem.pubsub} topic`, async function () { + this.timeout(10000); + + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + pubSubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubSubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubSubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback); + await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback); + await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + const relayResponse1 = await waku1.relay.send(testItem.encoder, { + payload: utf8ToBytes("M1") + }); + const relayResponse2 = await waku2.relay.send(testItem.encoder, { + payload: utf8ToBytes("M2") + }); + const relayResponse3 = await waku3.relay.send(testItem.encoder, { + payload: utf8ToBytes("M3") + }); + + expect(relayResponse1.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse3.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku1.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku3.libp2p.peerId.toString() + ); + + expect(await msgCollector1.waitForMessages(2, { exact: true })).to.eq( + true + ); + expect(await msgCollector2.waitForMessages(2, { exact: true })).to.eq( + true + ); + expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq( + true + ); + + expect(msgCollector1.hasMessage(testItem.pubsub, "M2")).to.be.true; + expect(msgCollector1.hasMessage(testItem.pubsub, "M3")).to.be.true; + expect(msgCollector2.hasMessage(testItem.pubsub, "M1")).to.be.true; + expect(msgCollector2.hasMessage(testItem.pubsub, "M3")).to.be.true; + expect(msgCollector3.hasMessage(testItem.pubsub, "M1")).to.be.true; + expect(msgCollector3.hasMessage(testItem.pubsub, "M2")).to.be.true; + }); + }); + + it("Nodes with multiple pubsub topic", async function () { this.timeout(10000); - // 1 and 2 uses a custom pubsub - // 3 uses the default pubsub + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + // Waku1 and waku2 are using multiple pubsub topis [waku1, waku2, waku3] = await Promise.all([ createRelayNode({ - pubSubTopics: [pubSubTopic], + pubSubTopics: [DefaultPubSubTopic, CustomPubSubTopic], staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubSubTopics: [pubSubTopic], + pubSubTopics: [DefaultPubSubTopic, CustomPubSubTopic], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubSubTopics: [DefaultPubSubTopic], + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe( + [TestDecoder, CustomDecoder], + msgCollector1.callback + ); + await waku2.relay.subscribe( + [TestDecoder, CustomDecoder], + msgCollector2.callback + ); + await waku3.relay.subscribe([TestDecoder], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic + await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") }); + await waku1.relay.send(CustomEncoder, { payload: utf8ToBytes("M2") }); + await waku2.relay.send(TestEncoder, { payload: utf8ToBytes("M3") }); + await waku2.relay.send(CustomEncoder, { payload: utf8ToBytes("M4") }); + await waku3.relay.send(TestEncoder, { payload: utf8ToBytes("M5") }); + await waku3.relay.send(CustomEncoder, { payload: utf8ToBytes("M6") }); + + expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq( + true + ); + expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq( + true + ); + expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq( + true + ); + + expect(msgCollector1.hasMessage(DefaultPubSubTopic, "M3")).to.be.true; + expect(msgCollector1.hasMessage(CustomPubSubTopic, "M4")).to.be.true; + expect(msgCollector1.hasMessage(DefaultPubSubTopic, "M5")).to.be.true; + expect(msgCollector1.hasMessage(DefaultPubSubTopic, "M1")).to.be.true; + expect(msgCollector1.hasMessage(CustomPubSubTopic, "M2")).to.be.true; + expect(msgCollector1.hasMessage(DefaultPubSubTopic, "M5")).to.be.true; + expect(msgCollector2.hasMessage(CustomPubSubTopic, "M1")).to.be.true; + expect(msgCollector2.hasMessage(DefaultPubSubTopic, "M3")).to.be.true; + expect(msgCollector3.hasMessage(DefaultPubSubTopic, "M1")).to.be.true; + }); + + it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () { + this.timeout(10000); + + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + pubSubTopics: [CustomPubSubTopic], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubSubTopics: [CustomPubSubTopic], staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)), @@ -317,7 +497,7 @@ describe("Waku Relay [node only]", () => { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe([CustomTopicDecoder], resolve); + void waku2.relay.subscribe([CustomDecoder], resolve); } ); @@ -330,7 +510,7 @@ describe("Waku Relay [node only]", () => { } ); - await waku1.relay.send(CustomTopicEncoder, { + await waku1.relay.send(CustomEncoder, { payload: utf8ToBytes(messageText) }); @@ -338,7 +518,7 @@ describe("Waku Relay [node only]", () => { await waku3NoMsgPromise; expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText); - expect(waku2ReceivedMsg.pubSubTopic).to.eq(pubSubTopic); + expect(waku2ReceivedMsg.pubSubTopic).to.eq(CustomPubSubTopic); }); it("Publishes <= 1 MB and rejects others", async function () { @@ -348,11 +528,11 @@ describe("Waku Relay [node only]", () => { // 1 and 2 uses a custom pubsub [waku1, waku2] = await Promise.all([ createRelayNode({ - pubSubTopics: [pubSubTopic], + pubSubTopics: [CustomPubSubTopic], staticNoiseKey: NOISE_KEY_1 }).then((waku) => waku.start().then(() => waku)), createRelayNode({ - pubSubTopics: [pubSubTopic], + pubSubTopics: [CustomPubSubTopic], staticNoiseKey: NOISE_KEY_2, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } }).then((waku) => waku.start().then(() => waku)) @@ -370,7 +550,7 @@ describe("Waku Relay [node only]", () => { const waku2ReceivedMsgPromise: Promise = new Promise( (resolve) => { - void waku2.relay.subscribe([CustomTopicDecoder], () => + void waku2.relay.subscribe([CustomDecoder], () => resolve({ payload: new Uint8Array([]) } as DecodedMessage) @@ -378,18 +558,18 @@ describe("Waku Relay [node only]", () => { } ); - let sendResult = await waku1.relay.send(CustomTopicEncoder, { + let sendResult = await waku1.relay.send(CustomEncoder, { payload: generateRandomUint8Array(1 * MB) }); expect(sendResult.recipients.length).to.eq(1); - sendResult = await waku1.relay.send(CustomTopicEncoder, { + sendResult = await waku1.relay.send(CustomEncoder, { payload: generateRandomUint8Array(1 * MB + 65536) }); expect(sendResult.recipients.length).to.eq(0); expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG); - sendResult = await waku1.relay.send(CustomTopicEncoder, { + sendResult = await waku1.relay.send(CustomEncoder, { payload: generateRandomUint8Array(2 * MB) }); expect(sendResult.recipients.length).to.eq(0); diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 09f0ddd324..41c4249153 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -3,6 +3,8 @@ import { createDecoder, createEncoder, DecodedMessage, + Decoder, + DefaultPubSubTopic, PageDirection, waitForRemotePeer } from "@waku/core"; @@ -565,9 +567,12 @@ describe("Waku Store, custom pubsub topic", () => { const customPubSubTopic = "/waku/2/custom-dapp/proto"; let waku: LightNode; let nwaku: NimGoNode; + let nwaku2: NimGoNode; - const CustomPubSubTestDecoder = createDecoder( - TestContentTopic, + const customContentTopic = "/test/2/waku-store/utf8"; + + const customTestDecoder = createDecoder( + customContentTopic, customPubSubTopic ); @@ -576,14 +581,17 @@ describe("Waku Store, custom pubsub topic", () => { nwaku = new NimGoNode(makeLogFileName(this)); await nwaku.start({ store: true, - topic: customPubSubTopic, + topic: [customPubSubTopic, DefaultPubSubTopic], relay: true }); + await nwaku.ensureSubscriptions([customPubSubTopic, DefaultPubSubTopic]); }); afterEach(async function () { !!nwaku && nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e)); + !!nwaku2 && + nwaku2.stop().catch((e) => console.log("Nwaku failed to stop", e)); !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); }); @@ -596,7 +604,7 @@ describe("Waku Store, custom pubsub topic", () => { await nwaku.sendMessage( NimGoNode.toMessageRpcQuery({ payload: new Uint8Array([i]), - contentTopic: TestContentTopic + contentTopic: customContentTopic }), customPubSubTopic ) @@ -614,7 +622,7 @@ describe("Waku Store, custom pubsub topic", () => { const messages: IMessage[] = []; let promises: Promise[] = []; for await (const msgPromises of waku.store.queryGenerator([ - CustomPubSubTestDecoder + customTestDecoder ])) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; @@ -634,4 +642,131 @@ describe("Waku Store, custom pubsub topic", () => { }); expect(result).to.not.eq(-1); }); + + it("Generator, 2 different pubsubtopics", async function () { + this.timeout(10000); + + const totalMsgs = 10; + await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic); + await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + pubSubTopics: [customPubSubTopic, DefaultPubSubTopic] + }); + await waku.start(); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const customMessages = await processMessages( + waku, + [customTestDecoder], + customPubSubTopic + ); + expect(customMessages?.length).eq(totalMsgs); + const result1 = customMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result1).to.not.eq(-1); + + const testMessages = await processMessages( + waku, + [TestDecoder], + DefaultPubSubTopic + ); + expect(testMessages?.length).eq(totalMsgs); + const result2 = testMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result2).to.not.eq(-1); + }); + + it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () { + this.timeout(10000); + + // Set up and start a new nwaku node with Default PubSubtopic + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + store: true, + topic: [DefaultPubSubTopic], + relay: true + }); + + const totalMsgs = 10; + await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic); + await sendMessages(nwaku2, totalMsgs, TestContentTopic, DefaultPubSubTopic); + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + pubSubTopics: [customPubSubTopic, DefaultPubSubTopic] + }); + await waku.start(); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + let customMessages: IMessage[] = []; + let testMessages: IMessage[] = []; + + while ( + customMessages.length != totalMsgs || + testMessages.length != totalMsgs + ) { + customMessages = await processMessages( + waku, + [customTestDecoder], + customPubSubTopic + ); + testMessages = await processMessages( + waku, + [TestDecoder], + DefaultPubSubTopic + ); + } + }); + + // will move those 2 reusable functions to store/utils when refactoring store tests but with another PR + async function sendMessages( + instance: NimGoNode, + numMessages: number, + contentTopic: string, + pubSubTopic: string + ): Promise { + for (let i = 0; i < numMessages; i++) { + expect( + await instance.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: contentTopic + }), + pubSubTopic + ) + ).to.be.true; + } + await delay(1); // to ensure each timestamp is unique. + } + + async function processMessages( + instance: LightNode, + decoders: Array, + expectedTopic: string + ): Promise { + const localMessages: IMessage[] = []; + let localPromises: Promise[] = []; + for await (const msgPromises of instance.store.queryGenerator(decoders)) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + localMessages.push(msg); + expect(msg.pubSubTopic).to.eq(expectedTopic); + } + }); + + localPromises = localPromises.concat(_promises); + } + await Promise.all(localPromises); + return localMessages; + } });