mirror of
https://github.com/waku-org/js-waku.git
synced 2025-01-11 21:15:01 +00:00
chore(static sharding): add tests for multiple pubsub topics (#1624)
* new tests for static sharding * store tests * small fixes after ci run * small fixes after ci run * small fixes after ci run * multiple pubsubtopics on nwaku node
This commit is contained in:
parent
94b8fd34ca
commit
f1d8d6097f
@ -18,11 +18,7 @@ export class MessageCollector {
|
||||
list: Array<MessageRpcResponse | DecodedMessage> = [];
|
||||
callback: (msg: DecodedMessage) => void = () => {};
|
||||
|
||||
constructor(
|
||||
private contentTopic: string,
|
||||
private nwaku?: NimGoNode,
|
||||
private pubSubTopic = DefaultPubSubTopic
|
||||
) {
|
||||
constructor(private nwaku?: NimGoNode) {
|
||||
if (!this.nwaku) {
|
||||
this.callback = (msg: DecodedMessage): void => {
|
||||
log("Got a message");
|
||||
@ -39,6 +35,12 @@ export class MessageCollector {
|
||||
return this.list[index];
|
||||
}
|
||||
|
||||
hasMessage(topic: string, text: string): boolean {
|
||||
return this.list.some(
|
||||
(message) => message.contentTopic === topic && message.payload === text
|
||||
);
|
||||
}
|
||||
|
||||
// Type guard to determine if a message is of type MessageRpcResponse
|
||||
isMessageRpcResponse(
|
||||
message: MessageRpcResponse | DecodedMessage
|
||||
@ -51,14 +53,21 @@ export class MessageCollector {
|
||||
|
||||
async waitForMessages(
|
||||
numMessages: number,
|
||||
timeoutDuration: number = 400
|
||||
options?: {
|
||||
pubSubTopic?: string;
|
||||
timeoutDuration?: number;
|
||||
exact?: boolean;
|
||||
}
|
||||
): Promise<boolean> {
|
||||
const startTime = Date.now();
|
||||
const pubSubTopic = options?.pubSubTopic || DefaultPubSubTopic;
|
||||
const timeoutDuration = options?.timeoutDuration || 400;
|
||||
const exact = options?.exact || false;
|
||||
|
||||
while (this.count < numMessages) {
|
||||
if (this.nwaku) {
|
||||
try {
|
||||
this.list = await this.nwaku.messages(this.pubSubTopic);
|
||||
this.list = await this.nwaku.messages(pubSubTopic);
|
||||
} catch (error) {
|
||||
log(`Can't retrieve messages because of ${error}`);
|
||||
await delay(10);
|
||||
@ -72,7 +81,16 @@ export class MessageCollector {
|
||||
await delay(10);
|
||||
}
|
||||
|
||||
if (exact) {
|
||||
if (this.count == numMessages) {
|
||||
return true;
|
||||
} else {
|
||||
log(`Was expecting exactly ${numMessages} messages`);
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
// Verifies a received message against expected values.
|
||||
@ -96,10 +114,8 @@ export class MessageCollector {
|
||||
|
||||
const message = this.getMessage(index);
|
||||
expect(message.contentTopic).to.eq(
|
||||
options.expectedContentTopic || this.contentTopic,
|
||||
`Message content topic mismatch. Expected: ${
|
||||
options.expectedContentTopic || this.contentTopic
|
||||
}. Got: ${message.contentTopic}`
|
||||
options.expectedContentTopic,
|
||||
`Message content topic mismatch. Expected: ${options.expectedContentTopic}. Got: ${message.contentTopic}`
|
||||
);
|
||||
|
||||
expect(message.version).to.eq(
|
||||
|
@ -14,7 +14,7 @@ export interface Args {
|
||||
peerExchange?: boolean;
|
||||
discv5Discovery?: boolean;
|
||||
storeMessageDbUrl?: string;
|
||||
topic?: string;
|
||||
topic?: Array<string>;
|
||||
rpcPrivate?: boolean;
|
||||
websocketSupport?: boolean;
|
||||
tcpPort?: number;
|
||||
|
148
packages/tests/tests/filter/multiple_pubsub.node.spec.ts
Normal file
148
packages/tests/tests/filter/multiple_pubsub.node.spec.ts
Normal file
@ -0,0 +1,148 @@
|
||||
import {
|
||||
createDecoder,
|
||||
createEncoder,
|
||||
DefaultPubSubTopic,
|
||||
waitForRemotePeer
|
||||
} from "@waku/core";
|
||||
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
|
||||
import { Protocols } from "@waku/interfaces";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
makeLogFileName,
|
||||
MessageCollector,
|
||||
NimGoNode,
|
||||
tearDownNodes
|
||||
} from "../../src/index.js";
|
||||
|
||||
import {
|
||||
runNodes,
|
||||
TestContentTopic,
|
||||
TestDecoder,
|
||||
TestEncoder
|
||||
} from "./utils.js";
|
||||
|
||||
describe("Waku Filter V2: Multiple PubSubtopics", function () {
|
||||
// Set the timeout for all tests in this suite. Can be overwritten at test level
|
||||
this.timeout(30000);
|
||||
let waku: LightNode;
|
||||
let nwaku: NimGoNode;
|
||||
let nwaku2: NimGoNode;
|
||||
let subscription: IFilterSubscription;
|
||||
let messageCollector: MessageCollector;
|
||||
const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
||||
const customContentTopic = "/test/2/waku-filter";
|
||||
const newEncoder = createEncoder({
|
||||
pubSubTopic: customPubSubTopic,
|
||||
contentTopic: customContentTopic
|
||||
});
|
||||
const newDecoder = createDecoder(customContentTopic, customPubSubTopic);
|
||||
|
||||
this.beforeEach(async function () {
|
||||
this.timeout(15000);
|
||||
[nwaku, waku] = await runNodes(this, [
|
||||
customPubSubTopic,
|
||||
DefaultPubSubTopic
|
||||
]);
|
||||
subscription = await waku.filter.createSubscription(customPubSubTopic);
|
||||
messageCollector = new MessageCollector();
|
||||
});
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes([nwaku, nwaku2], [waku]);
|
||||
});
|
||||
|
||||
it("Subscribe and receive messages on custom pubsubtopic", async function () {
|
||||
await subscription.subscribe([newDecoder], messageCollector.callback);
|
||||
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M1") });
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedContentTopic: customContentTopic,
|
||||
expectedPubSubTopic: customPubSubTopic,
|
||||
expectedMessageText: "M1"
|
||||
});
|
||||
});
|
||||
|
||||
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
|
||||
await subscription.subscribe([newDecoder], messageCollector.callback);
|
||||
|
||||
// Subscribe from the same lightnode to the 2nd pubSubtopic
|
||||
const subscription2 =
|
||||
await waku.filter.createSubscription(DefaultPubSubTopic);
|
||||
|
||||
const messageCollector2 = new MessageCollector();
|
||||
|
||||
await subscription2.subscribe([TestDecoder], messageCollector2.callback);
|
||||
|
||||
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M1") });
|
||||
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
expect(await messageCollector2.waitForMessages(1)).to.eq(true);
|
||||
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedContentTopic: customContentTopic,
|
||||
expectedPubSubTopic: customPubSubTopic,
|
||||
expectedMessageText: "M1"
|
||||
});
|
||||
|
||||
messageCollector2.verifyReceivedMessage(0, {
|
||||
expectedContentTopic: TestContentTopic,
|
||||
expectedPubSubTopic: DefaultPubSubTopic,
|
||||
expectedMessageText: "M2"
|
||||
});
|
||||
});
|
||||
|
||||
it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () {
|
||||
await subscription.subscribe([newDecoder], messageCollector.callback);
|
||||
|
||||
// Set up and start a new nwaku node with Default PubSubtopic
|
||||
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
|
||||
await nwaku2.start({
|
||||
filter: true,
|
||||
lightpush: true,
|
||||
relay: true,
|
||||
topic: [DefaultPubSubTopic]
|
||||
});
|
||||
await waku.dial(await nwaku2.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
|
||||
|
||||
// Subscribe from the same lightnode to the new nwaku on the new pubSubtopic
|
||||
const subscription2 = await waku.filter.createSubscription(
|
||||
DefaultPubSubTopic,
|
||||
await nwaku2.getPeerId()
|
||||
);
|
||||
await nwaku.ensureSubscriptions([DefaultPubSubTopic]);
|
||||
|
||||
const messageCollector2 = new MessageCollector();
|
||||
|
||||
await subscription2.subscribe([TestDecoder], messageCollector2.callback);
|
||||
|
||||
// Making sure that messages are send and reveiced for both subscriptions
|
||||
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
|
||||
while (
|
||||
!(await messageCollector.waitForMessages(1, {
|
||||
pubSubTopic: customPubSubTopic
|
||||
})) ||
|
||||
!(await messageCollector2.waitForMessages(1, {
|
||||
pubSubTopic: DefaultPubSubTopic
|
||||
}))
|
||||
) {
|
||||
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M1") });
|
||||
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
|
||||
}
|
||||
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedContentTopic: customContentTopic,
|
||||
expectedPubSubTopic: customPubSubTopic,
|
||||
expectedMessageText: "M1"
|
||||
});
|
||||
|
||||
messageCollector2.verifyReceivedMessage(0, {
|
||||
expectedContentTopic: TestContentTopic,
|
||||
expectedPubSubTopic: DefaultPubSubTopic,
|
||||
expectedMessageText: "M2"
|
||||
});
|
||||
});
|
||||
});
|
@ -1,3 +1,4 @@
|
||||
import { DefaultPubSubTopic } from "@waku/core";
|
||||
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
@ -22,9 +23,9 @@ describe("Waku Filter V2: Ping", function () {
|
||||
|
||||
this.beforeEach(async function () {
|
||||
this.timeout(15000);
|
||||
[nwaku, waku] = await runNodes(this);
|
||||
[nwaku, waku] = await runNodes(this, [DefaultPubSubTopic]);
|
||||
subscription = await waku.filter.createSubscription();
|
||||
messageCollector = new MessageCollector(TestContentTopic);
|
||||
messageCollector = new MessageCollector();
|
||||
});
|
||||
|
||||
this.afterEach(async function () {
|
||||
|
@ -31,9 +31,9 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||
|
||||
this.beforeEach(async function () {
|
||||
this.timeout(15000);
|
||||
[nwaku, waku] = await runNodes(this);
|
||||
[nwaku, waku] = await runNodes(this, [DefaultPubSubTopic]);
|
||||
subscription = await waku.filter.createSubscription();
|
||||
messageCollector = new MessageCollector(TestContentTopic);
|
||||
messageCollector = new MessageCollector();
|
||||
});
|
||||
|
||||
this.afterEach(async function () {
|
||||
@ -49,7 +49,8 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: testItem.value
|
||||
expectedMessageText: testItem.value,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -71,7 +72,8 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText,
|
||||
checkTimestamp: false
|
||||
checkTimestamp: false,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
|
||||
// Check if the timestamp matches
|
||||
@ -217,7 +219,8 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
});
|
||||
|
||||
@ -245,10 +248,12 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||
// Confirm both messages were received.
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1"
|
||||
expectedMessageText: "M1",
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedMessageText: "M2"
|
||||
expectedMessageText: "M2",
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
});
|
||||
|
||||
@ -268,10 +273,12 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||
// Confirm both messages were received.
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1"
|
||||
expectedMessageText: "M1",
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedMessageText: "M2"
|
||||
expectedMessageText: "M2",
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
});
|
||||
});
|
||||
|
@ -39,9 +39,9 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||
|
||||
this.beforeEach(async function () {
|
||||
this.timeout(15000);
|
||||
[nwaku, waku] = await runNodes(this);
|
||||
[nwaku, waku] = await runNodes(this, [DefaultPubSubTopic]);
|
||||
subscription = await waku.filter.createSubscription();
|
||||
messageCollector = new MessageCollector(TestContentTopic);
|
||||
messageCollector = new MessageCollector();
|
||||
|
||||
// Nwaku subscribe to the default pubsub topic
|
||||
await nwaku.ensureSubscriptions();
|
||||
@ -58,7 +58,8 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
expect((await nwaku.messages()).length).to.eq(1);
|
||||
});
|
||||
@ -78,7 +79,8 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
expect((await nwaku.messages()).length).to.eq(1);
|
||||
});
|
||||
@ -90,7 +92,8 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
|
||||
// Send another message on the same topic.
|
||||
@ -102,7 +105,8 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||
// Verify that the second message was successfully received.
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedMessageText: newMessageText
|
||||
expectedMessageText: newMessageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
expect((await nwaku.messages()).length).to.eq(2);
|
||||
});
|
||||
@ -113,7 +117,8 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||
await waku.lightPush.send(TestEncoder, messagePayload);
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
|
||||
// Modify subscription to include a new content topic and send a message.
|
||||
@ -136,7 +141,8 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||
await waku.lightPush.send(TestEncoder, newMessagePayload);
|
||||
expect(await messageCollector.waitForMessages(3)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(2, {
|
||||
expectedMessageText: newMessageText
|
||||
expectedMessageText: newMessageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
expect((await nwaku.messages()).length).to.eq(3);
|
||||
});
|
||||
@ -258,10 +264,12 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||
// Confirm both messages were received.
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1"
|
||||
expectedMessageText: "M1",
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedMessageText: "M2"
|
||||
expectedMessageText: "M2",
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
});
|
||||
|
||||
@ -298,7 +306,8 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||
// Check if both messages were received
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1"
|
||||
expectedMessageText: "M1",
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedContentTopic: newContentTopic,
|
||||
@ -306,38 +315,32 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||
});
|
||||
});
|
||||
|
||||
// this test fail 50% of times with messageCount being 1. Seems like a message is lost somehow
|
||||
it.skip("Subscribe and receive messages from multiple nwaku nodes", async function () {
|
||||
it("Subscribe and receive messages from multiple nwaku nodes", async function () {
|
||||
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
||||
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
|
||||
// Set up and start a new nwaku node
|
||||
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
|
||||
await nwaku2.start({ filter: true, lightpush: true, relay: true });
|
||||
|
||||
await waku.dial(await nwaku2.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
|
||||
const subscription2 = await waku.filter.createSubscription(
|
||||
DefaultPubSubTopic,
|
||||
await nwaku2.getPeerId()
|
||||
);
|
||||
|
||||
// Send a message using the new subscription
|
||||
const newContentTopic = "/test/2/waku-filter";
|
||||
const newEncoder = createEncoder({ contentTopic: newContentTopic });
|
||||
const newDecoder = createDecoder(newContentTopic);
|
||||
await subscription2.subscribe([newDecoder], messageCollector.callback);
|
||||
|
||||
// Making sure that messages are send and reveiced for both subscriptions
|
||||
while (!(await messageCollector.waitForMessages(2))) {
|
||||
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
|
||||
await waku.lightPush.send(newEncoder, { payload: utf8ToBytes("M2") });
|
||||
}
|
||||
|
||||
// Check if both messages were received
|
||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1"
|
||||
});
|
||||
messageCollector.verifyReceivedMessage(1, {
|
||||
expectedContentTopic: newContentTopic,
|
||||
expectedMessageText: "M2"
|
||||
});
|
||||
expect(messageCollector.hasMessage(TestContentTopic, "M1")).to.be.true;
|
||||
expect(messageCollector.hasMessage(newContentTopic, "M2")).to.be.true;
|
||||
});
|
||||
});
|
||||
|
@ -1,4 +1,4 @@
|
||||
import { createDecoder, createEncoder } from "@waku/core";
|
||||
import { createDecoder, createEncoder, DefaultPubSubTopic } from "@waku/core";
|
||||
import type { IFilterSubscription, LightNode } from "@waku/interfaces";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
@ -25,9 +25,9 @@ describe("Waku Filter V2: Unsubscribe", function () {
|
||||
|
||||
this.beforeEach(async function () {
|
||||
this.timeout(15000);
|
||||
[nwaku, waku] = await runNodes(this);
|
||||
[nwaku, waku] = await runNodes(this, [DefaultPubSubTopic]);
|
||||
subscription = await waku.filter.createSubscription();
|
||||
messageCollector = new MessageCollector(TestContentTopic);
|
||||
messageCollector = new MessageCollector();
|
||||
|
||||
// Nwaku subscribe to the default pubsub topic
|
||||
await nwaku.ensureSubscriptions();
|
||||
@ -49,7 +49,8 @@ describe("Waku Filter V2: Unsubscribe", function () {
|
||||
|
||||
// Check that from 2 messages send only the 1st was received
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
expect(messageCollector.count).to.eq(1);
|
||||
expect((await nwaku.messages()).length).to.eq(2);
|
||||
|
@ -64,14 +64,17 @@ export async function validatePingError(
|
||||
}
|
||||
|
||||
export async function runNodes(
|
||||
currentTest: Context
|
||||
context: Context,
|
||||
pubSubTopics: string[]
|
||||
): Promise<[NimGoNode, LightNode]> {
|
||||
const nwaku = new NimGoNode(makeLogFileName(currentTest));
|
||||
const nwaku = new NimGoNode(makeLogFileName(context));
|
||||
|
||||
await nwaku.startWithRetries(
|
||||
{
|
||||
filter: true,
|
||||
lightpush: true,
|
||||
relay: true
|
||||
relay: true,
|
||||
topic: pubSubTopics
|
||||
},
|
||||
{ retries: 3 }
|
||||
);
|
||||
@ -79,6 +82,7 @@ export async function runNodes(
|
||||
let waku: LightNode | undefined;
|
||||
try {
|
||||
waku = await createLightNode({
|
||||
pubSubTopics: pubSubTopics,
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||
});
|
||||
@ -90,6 +94,7 @@ export async function runNodes(
|
||||
if (waku) {
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
|
||||
await nwaku.ensureSubscriptions(pubSubTopics);
|
||||
return [nwaku, waku];
|
||||
} else {
|
||||
throw new Error("Failed to initialize waku");
|
||||
|
@ -1,52 +0,0 @@
|
||||
import { createEncoder } from "@waku/core";
|
||||
import { LightNode } from "@waku/interfaces";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
|
||||
import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js";
|
||||
|
||||
import { messageText, runNodes, TestContentTopic } from "./utils.js";
|
||||
|
||||
describe("Waku Light Push [node only] - custom pubsub topic", function () {
|
||||
this.timeout(15000);
|
||||
let waku: LightNode;
|
||||
let nwaku: NimGoNode;
|
||||
let messageCollector: MessageCollector;
|
||||
const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
||||
|
||||
beforeEach(async function () {
|
||||
[nwaku, waku] = await runNodes(this, customPubSubTopic);
|
||||
messageCollector = new MessageCollector(
|
||||
TestContentTopic,
|
||||
nwaku,
|
||||
customPubSubTopic
|
||||
);
|
||||
|
||||
await nwaku.ensureSubscriptions([customPubSubTopic]);
|
||||
});
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes([nwaku], [waku]);
|
||||
});
|
||||
|
||||
it("Push message", async function () {
|
||||
const nimPeerId = await nwaku.getPeerId();
|
||||
|
||||
const testEncoder = createEncoder({
|
||||
contentTopic: TestContentTopic,
|
||||
pubSubTopic: customPubSubTopic
|
||||
});
|
||||
|
||||
const pushResponse = await waku.lightPush.send(testEncoder, {
|
||||
payload: utf8ToBytes(messageText)
|
||||
});
|
||||
|
||||
expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString());
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
});
|
||||
});
|
@ -19,7 +19,7 @@ import {
|
||||
TestEncoder
|
||||
} from "./utils.js";
|
||||
|
||||
describe("Waku Light Push [node only]", function () {
|
||||
describe("Waku Light Push", function () {
|
||||
// Set the timeout for all tests in this suite. Can be overwritten at test level
|
||||
this.timeout(15000);
|
||||
let waku: LightNode;
|
||||
@ -28,12 +28,8 @@ describe("Waku Light Push [node only]", function () {
|
||||
|
||||
this.beforeEach(async function () {
|
||||
this.timeout(15000);
|
||||
[nwaku, waku] = await runNodes(this);
|
||||
messageCollector = new MessageCollector(
|
||||
TestContentTopic,
|
||||
nwaku,
|
||||
DefaultPubSubTopic
|
||||
);
|
||||
[nwaku, waku] = await runNodes(this, [DefaultPubSubTopic]);
|
||||
messageCollector = new MessageCollector(nwaku);
|
||||
|
||||
await nwaku.ensureSubscriptions();
|
||||
});
|
||||
@ -51,7 +47,8 @@ describe("Waku Light Push [node only]", function () {
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: testItem.value
|
||||
expectedMessageText: testItem.value,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
});
|
||||
});
|
||||
@ -70,7 +67,8 @@ describe("Waku Light Push [node only]", function () {
|
||||
|
||||
for (let i = 0; i < 30; i++) {
|
||||
messageCollector.verifyReceivedMessage(i, {
|
||||
expectedMessageText: generateMessageText(i)
|
||||
expectedMessageText: generateMessageText(i),
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
}
|
||||
});
|
||||
@ -84,7 +82,8 @@ describe("Waku Light Push [node only]", function () {
|
||||
expect(pushResponse.recipients.length).to.eq(1);
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: undefined
|
||||
expectedMessageText: undefined,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
} else {
|
||||
expect(pushResponse.recipients.length).to.eq(0);
|
||||
@ -138,7 +137,8 @@ describe("Waku Light Push [node only]", function () {
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
});
|
||||
|
||||
@ -157,7 +157,8 @@ describe("Waku Light Push [node only]", function () {
|
||||
expect(pushResponse.recipients.length).to.eq(1);
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
} else {
|
||||
expect(pushResponse.recipients.length).to.eq(0);
|
||||
@ -186,7 +187,8 @@ describe("Waku Light Push [node only]", function () {
|
||||
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
});
|
||||
|
||||
@ -206,7 +208,8 @@ describe("Waku Light Push [node only]", function () {
|
||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText,
|
||||
expectedTimestamp: customTimeNanos
|
||||
expectedTimestamp: customTimeNanos,
|
||||
expectedContentTopic: TestContentTopic
|
||||
});
|
||||
});
|
||||
});
|
||||
|
153
packages/tests/tests/light-push/multiple_pubsub.node.spec.ts
Normal file
153
packages/tests/tests/light-push/multiple_pubsub.node.spec.ts
Normal file
@ -0,0 +1,153 @@
|
||||
import type { PeerId } from "@libp2p/interface/peer-id";
|
||||
import {
|
||||
createEncoder,
|
||||
DefaultPubSubTopic,
|
||||
waitForRemotePeer
|
||||
} from "@waku/core";
|
||||
import { LightNode, Protocols, SendResult } from "@waku/interfaces";
|
||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
makeLogFileName,
|
||||
MessageCollector,
|
||||
NimGoNode,
|
||||
tearDownNodes
|
||||
} from "../../src/index.js";
|
||||
|
||||
import {
|
||||
messageText,
|
||||
runNodes,
|
||||
TestContentTopic,
|
||||
TestEncoder
|
||||
} from "./utils.js";
|
||||
|
||||
describe("Waku Light Push : Multiple PubSubtopics", function () {
|
||||
this.timeout(30000);
|
||||
let waku: LightNode;
|
||||
let nwaku: NimGoNode;
|
||||
let nwaku2: NimGoNode;
|
||||
let messageCollector: MessageCollector;
|
||||
const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
||||
const customContentTopic = "/test/2/waku-light-push/utf8";
|
||||
const customEncoder = createEncoder({
|
||||
contentTopic: customContentTopic,
|
||||
pubSubTopic: customPubSubTopic
|
||||
});
|
||||
let nimPeerId: PeerId;
|
||||
|
||||
beforeEach(async function () {
|
||||
[nwaku, waku] = await runNodes(this, [
|
||||
customPubSubTopic,
|
||||
DefaultPubSubTopic
|
||||
]);
|
||||
messageCollector = new MessageCollector(nwaku);
|
||||
nimPeerId = await nwaku.getPeerId();
|
||||
});
|
||||
|
||||
this.afterEach(async function () {
|
||||
tearDownNodes([nwaku, nwaku2], [waku]);
|
||||
});
|
||||
|
||||
it("Push message on custom pubSubTopic", async function () {
|
||||
const pushResponse = await waku.lightPush.send(customEncoder, {
|
||||
payload: utf8ToBytes(messageText)
|
||||
});
|
||||
|
||||
expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString());
|
||||
|
||||
expect(
|
||||
await messageCollector.waitForMessages(1, {
|
||||
pubSubTopic: customPubSubTopic
|
||||
})
|
||||
).to.eq(true);
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: messageText,
|
||||
expectedContentTopic: customContentTopic
|
||||
});
|
||||
});
|
||||
|
||||
it("Subscribe and receive messages on 2 different pubsubtopics", async function () {
|
||||
const pushResponse1 = await waku.lightPush.send(customEncoder, {
|
||||
payload: utf8ToBytes("M1")
|
||||
});
|
||||
const pushResponse2 = await waku.lightPush.send(TestEncoder, {
|
||||
payload: utf8ToBytes("M2")
|
||||
});
|
||||
expect(pushResponse1.recipients[0].toString()).to.eq(nimPeerId.toString());
|
||||
expect(pushResponse2.recipients[0].toString()).to.eq(nimPeerId.toString());
|
||||
|
||||
const messageCollector2 = new MessageCollector(nwaku);
|
||||
|
||||
expect(
|
||||
await messageCollector.waitForMessages(1, {
|
||||
pubSubTopic: customPubSubTopic
|
||||
})
|
||||
).to.eq(true);
|
||||
|
||||
expect(
|
||||
await messageCollector2.waitForMessages(1, {
|
||||
pubSubTopic: DefaultPubSubTopic
|
||||
})
|
||||
).to.eq(true);
|
||||
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1",
|
||||
expectedContentTopic: customContentTopic,
|
||||
expectedPubSubTopic: customPubSubTopic
|
||||
});
|
||||
messageCollector2.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M2",
|
||||
expectedContentTopic: TestContentTopic,
|
||||
expectedPubSubTopic: DefaultPubSubTopic
|
||||
});
|
||||
});
|
||||
|
||||
it("Light push messages to 2 nwaku nodes each with different pubsubtopics", async function () {
|
||||
// Set up and start a new nwaku node with Default PubSubtopic
|
||||
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
|
||||
await nwaku2.start({
|
||||
filter: true,
|
||||
lightpush: true,
|
||||
relay: true,
|
||||
topic: [DefaultPubSubTopic]
|
||||
});
|
||||
await waku.dial(await nwaku2.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.LightPush]);
|
||||
|
||||
const messageCollector2 = new MessageCollector(nwaku2);
|
||||
|
||||
let pushResponse1: SendResult;
|
||||
let pushResponse2: SendResult;
|
||||
// Making sure that we send messages to both nwaku nodes
|
||||
// While loop is done because of https://github.com/waku-org/js-waku/issues/1606
|
||||
while (
|
||||
!(await messageCollector.waitForMessages(1, {
|
||||
pubSubTopic: customPubSubTopic
|
||||
})) ||
|
||||
!(await messageCollector2.waitForMessages(1, {
|
||||
pubSubTopic: DefaultPubSubTopic
|
||||
})) ||
|
||||
pushResponse1!.recipients[0].toString() ===
|
||||
pushResponse2!.recipients[0].toString()
|
||||
) {
|
||||
pushResponse1 = await waku.lightPush.send(customEncoder, {
|
||||
payload: utf8ToBytes("M1")
|
||||
});
|
||||
pushResponse2 = await waku.lightPush.send(TestEncoder, {
|
||||
payload: utf8ToBytes("M2")
|
||||
});
|
||||
}
|
||||
|
||||
messageCollector.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M1",
|
||||
expectedContentTopic: customContentTopic,
|
||||
expectedPubSubTopic: customPubSubTopic
|
||||
});
|
||||
messageCollector2.verifyReceivedMessage(0, {
|
||||
expectedMessageText: "M2",
|
||||
expectedContentTopic: TestContentTopic,
|
||||
expectedPubSubTopic: DefaultPubSubTopic
|
||||
});
|
||||
});
|
||||
});
|
@ -14,23 +14,18 @@ export const messagePayload = { payload: utf8ToBytes(messageText) };
|
||||
|
||||
export async function runNodes(
|
||||
context: Mocha.Context,
|
||||
pubSubTopic?: string
|
||||
pubSubTopics: string[]
|
||||
): Promise<[NimGoNode, LightNode]> {
|
||||
const nwakuOptional = pubSubTopic ? { topic: pubSubTopic } : {};
|
||||
const nwaku = new NimGoNode(makeLogFileName(context));
|
||||
await nwaku.startWithRetries(
|
||||
{
|
||||
lightpush: true,
|
||||
relay: true,
|
||||
...nwakuOptional
|
||||
},
|
||||
{ lightpush: true, relay: true, topic: pubSubTopics },
|
||||
{ retries: 3 }
|
||||
);
|
||||
|
||||
let waku: LightNode | undefined;
|
||||
try {
|
||||
waku = await createLightNode({
|
||||
pubSubTopics: pubSubTopic ? [pubSubTopic] : undefined,
|
||||
pubSubTopics: pubSubTopics,
|
||||
staticNoiseKey: NOISE_KEY_1
|
||||
});
|
||||
await waku.start();
|
||||
@ -41,6 +36,7 @@ export async function runNodes(
|
||||
if (waku) {
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.LightPush]);
|
||||
await nwaku.ensureSubscriptions(pubSubTopics);
|
||||
return [nwaku, waku];
|
||||
} else {
|
||||
throw new Error("Failed to initialize waku");
|
||||
|
@ -27,6 +27,7 @@ import debug from "debug";
|
||||
import {
|
||||
delay,
|
||||
makeLogFileName,
|
||||
MessageCollector,
|
||||
NOISE_KEY_1,
|
||||
NOISE_KEY_2,
|
||||
NOISE_KEY_3
|
||||
@ -260,13 +261,14 @@ describe("Waku Relay [node only]", () => {
|
||||
let waku2: RelayNode;
|
||||
let waku3: RelayNode;
|
||||
|
||||
const pubSubTopic = "/some/pubsub/topic";
|
||||
const CustomContentTopic = "/test/2/waku-relay/utf8";
|
||||
const CustomPubSubTopic = "/some/pubsub/topic";
|
||||
|
||||
const CustomTopicEncoder = createEncoder({
|
||||
contentTopic: TestContentTopic,
|
||||
pubSubTopic: pubSubTopic
|
||||
const CustomEncoder = createEncoder({
|
||||
contentTopic: CustomContentTopic,
|
||||
pubSubTopic: CustomPubSubTopic
|
||||
});
|
||||
const CustomTopicDecoder = createDecoder(TestContentTopic, pubSubTopic);
|
||||
const CustomDecoder = createDecoder(CustomContentTopic, CustomPubSubTopic);
|
||||
|
||||
afterEach(async function () {
|
||||
!!waku1 &&
|
||||
@ -277,18 +279,196 @@ describe("Waku Relay [node only]", () => {
|
||||
waku3.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
});
|
||||
|
||||
it("Publish", async function () {
|
||||
[
|
||||
{
|
||||
pubsub: CustomPubSubTopic,
|
||||
encoder: CustomEncoder,
|
||||
decoder: CustomDecoder
|
||||
},
|
||||
{
|
||||
pubsub: DefaultPubSubTopic,
|
||||
encoder: TestEncoder,
|
||||
decoder: TestDecoder
|
||||
}
|
||||
].forEach((testItem) => {
|
||||
it(`3 nodes on ${testItem.pubsub} topic`, async function () {
|
||||
this.timeout(10000);
|
||||
|
||||
// 1 and 2 uses a custom pubsub
|
||||
// 3 uses the default pubsub
|
||||
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
|
||||
.fill(null)
|
||||
.map(() => new MessageCollector());
|
||||
|
||||
[waku1, waku2, waku3] = await Promise.all([
|
||||
createRelayNode({
|
||||
pubSubTopics: [pubSubTopic],
|
||||
pubSubTopics: [testItem.pubsub],
|
||||
staticNoiseKey: NOISE_KEY_1
|
||||
}).then((waku) => waku.start().then(() => waku)),
|
||||
createRelayNode({
|
||||
pubSubTopics: [pubSubTopic],
|
||||
pubSubTopics: [testItem.pubsub],
|
||||
staticNoiseKey: NOISE_KEY_2,
|
||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||
}).then((waku) => waku.start().then(() => waku)),
|
||||
createRelayNode({
|
||||
pubSubTopics: [testItem.pubsub],
|
||||
staticNoiseKey: NOISE_KEY_3
|
||||
}).then((waku) => waku.start().then(() => waku))
|
||||
]);
|
||||
|
||||
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||
});
|
||||
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||
});
|
||||
await Promise.all([
|
||||
waku1.dial(waku2.libp2p.peerId),
|
||||
waku3.dial(waku2.libp2p.peerId)
|
||||
]);
|
||||
|
||||
await Promise.all([
|
||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
||||
waitForRemotePeer(waku2, [Protocols.Relay]),
|
||||
waitForRemotePeer(waku3, [Protocols.Relay])
|
||||
]);
|
||||
|
||||
await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback);
|
||||
await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback);
|
||||
await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback);
|
||||
|
||||
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
|
||||
const relayResponse1 = await waku1.relay.send(testItem.encoder, {
|
||||
payload: utf8ToBytes("M1")
|
||||
});
|
||||
const relayResponse2 = await waku2.relay.send(testItem.encoder, {
|
||||
payload: utf8ToBytes("M2")
|
||||
});
|
||||
const relayResponse3 = await waku3.relay.send(testItem.encoder, {
|
||||
payload: utf8ToBytes("M3")
|
||||
});
|
||||
|
||||
expect(relayResponse1.recipients[0].toString()).to.eq(
|
||||
waku2.libp2p.peerId.toString()
|
||||
);
|
||||
expect(relayResponse3.recipients[0].toString()).to.eq(
|
||||
waku2.libp2p.peerId.toString()
|
||||
);
|
||||
expect(relayResponse2.recipients.map((r) => r.toString())).to.include(
|
||||
waku1.libp2p.peerId.toString()
|
||||
);
|
||||
expect(relayResponse2.recipients.map((r) => r.toString())).to.include(
|
||||
waku3.libp2p.peerId.toString()
|
||||
);
|
||||
|
||||
expect(await msgCollector1.waitForMessages(2, { exact: true })).to.eq(
|
||||
true
|
||||
);
|
||||
expect(await msgCollector2.waitForMessages(2, { exact: true })).to.eq(
|
||||
true
|
||||
);
|
||||
expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(
|
||||
true
|
||||
);
|
||||
|
||||
expect(msgCollector1.hasMessage(testItem.pubsub, "M2")).to.be.true;
|
||||
expect(msgCollector1.hasMessage(testItem.pubsub, "M3")).to.be.true;
|
||||
expect(msgCollector2.hasMessage(testItem.pubsub, "M1")).to.be.true;
|
||||
expect(msgCollector2.hasMessage(testItem.pubsub, "M3")).to.be.true;
|
||||
expect(msgCollector3.hasMessage(testItem.pubsub, "M1")).to.be.true;
|
||||
expect(msgCollector3.hasMessage(testItem.pubsub, "M2")).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
it("Nodes with multiple pubsub topic", async function () {
|
||||
this.timeout(10000);
|
||||
|
||||
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
|
||||
.fill(null)
|
||||
.map(() => new MessageCollector());
|
||||
|
||||
// Waku1 and waku2 are using multiple pubsub topis
|
||||
[waku1, waku2, waku3] = await Promise.all([
|
||||
createRelayNode({
|
||||
pubSubTopics: [DefaultPubSubTopic, CustomPubSubTopic],
|
||||
staticNoiseKey: NOISE_KEY_1
|
||||
}).then((waku) => waku.start().then(() => waku)),
|
||||
createRelayNode({
|
||||
pubSubTopics: [DefaultPubSubTopic, CustomPubSubTopic],
|
||||
staticNoiseKey: NOISE_KEY_2,
|
||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||
}).then((waku) => waku.start().then(() => waku)),
|
||||
createRelayNode({
|
||||
pubSubTopics: [DefaultPubSubTopic],
|
||||
staticNoiseKey: NOISE_KEY_3
|
||||
}).then((waku) => waku.start().then(() => waku))
|
||||
]);
|
||||
|
||||
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||
});
|
||||
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||
});
|
||||
await Promise.all([
|
||||
waku1.dial(waku2.libp2p.peerId),
|
||||
waku3.dial(waku2.libp2p.peerId)
|
||||
]);
|
||||
|
||||
await Promise.all([
|
||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
||||
waitForRemotePeer(waku2, [Protocols.Relay]),
|
||||
waitForRemotePeer(waku3, [Protocols.Relay])
|
||||
]);
|
||||
|
||||
await waku1.relay.subscribe(
|
||||
[TestDecoder, CustomDecoder],
|
||||
msgCollector1.callback
|
||||
);
|
||||
await waku2.relay.subscribe(
|
||||
[TestDecoder, CustomDecoder],
|
||||
msgCollector2.callback
|
||||
);
|
||||
await waku3.relay.subscribe([TestDecoder], msgCollector3.callback);
|
||||
|
||||
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
|
||||
// However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
|
||||
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") });
|
||||
await waku1.relay.send(CustomEncoder, { payload: utf8ToBytes("M2") });
|
||||
await waku2.relay.send(TestEncoder, { payload: utf8ToBytes("M3") });
|
||||
await waku2.relay.send(CustomEncoder, { payload: utf8ToBytes("M4") });
|
||||
await waku3.relay.send(TestEncoder, { payload: utf8ToBytes("M5") });
|
||||
await waku3.relay.send(CustomEncoder, { payload: utf8ToBytes("M6") });
|
||||
|
||||
expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq(
|
||||
true
|
||||
);
|
||||
expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq(
|
||||
true
|
||||
);
|
||||
expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(
|
||||
true
|
||||
);
|
||||
|
||||
expect(msgCollector1.hasMessage(DefaultPubSubTopic, "M3")).to.be.true;
|
||||
expect(msgCollector1.hasMessage(CustomPubSubTopic, "M4")).to.be.true;
|
||||
expect(msgCollector1.hasMessage(DefaultPubSubTopic, "M5")).to.be.true;
|
||||
expect(msgCollector1.hasMessage(DefaultPubSubTopic, "M1")).to.be.true;
|
||||
expect(msgCollector1.hasMessage(CustomPubSubTopic, "M2")).to.be.true;
|
||||
expect(msgCollector1.hasMessage(DefaultPubSubTopic, "M5")).to.be.true;
|
||||
expect(msgCollector2.hasMessage(CustomPubSubTopic, "M1")).to.be.true;
|
||||
expect(msgCollector2.hasMessage(DefaultPubSubTopic, "M3")).to.be.true;
|
||||
expect(msgCollector3.hasMessage(DefaultPubSubTopic, "M1")).to.be.true;
|
||||
});
|
||||
|
||||
it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () {
|
||||
this.timeout(10000);
|
||||
|
||||
[waku1, waku2, waku3] = await Promise.all([
|
||||
createRelayNode({
|
||||
pubSubTopics: [CustomPubSubTopic],
|
||||
staticNoiseKey: NOISE_KEY_1
|
||||
}).then((waku) => waku.start().then(() => waku)),
|
||||
createRelayNode({
|
||||
pubSubTopics: [CustomPubSubTopic],
|
||||
staticNoiseKey: NOISE_KEY_2,
|
||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||
}).then((waku) => waku.start().then(() => waku)),
|
||||
@ -317,7 +497,7 @@ describe("Waku Relay [node only]", () => {
|
||||
|
||||
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||
(resolve) => {
|
||||
void waku2.relay.subscribe([CustomTopicDecoder], resolve);
|
||||
void waku2.relay.subscribe([CustomDecoder], resolve);
|
||||
}
|
||||
);
|
||||
|
||||
@ -330,7 +510,7 @@ describe("Waku Relay [node only]", () => {
|
||||
}
|
||||
);
|
||||
|
||||
await waku1.relay.send(CustomTopicEncoder, {
|
||||
await waku1.relay.send(CustomEncoder, {
|
||||
payload: utf8ToBytes(messageText)
|
||||
});
|
||||
|
||||
@ -338,7 +518,7 @@ describe("Waku Relay [node only]", () => {
|
||||
await waku3NoMsgPromise;
|
||||
|
||||
expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText);
|
||||
expect(waku2ReceivedMsg.pubSubTopic).to.eq(pubSubTopic);
|
||||
expect(waku2ReceivedMsg.pubSubTopic).to.eq(CustomPubSubTopic);
|
||||
});
|
||||
|
||||
it("Publishes <= 1 MB and rejects others", async function () {
|
||||
@ -348,11 +528,11 @@ describe("Waku Relay [node only]", () => {
|
||||
// 1 and 2 uses a custom pubsub
|
||||
[waku1, waku2] = await Promise.all([
|
||||
createRelayNode({
|
||||
pubSubTopics: [pubSubTopic],
|
||||
pubSubTopics: [CustomPubSubTopic],
|
||||
staticNoiseKey: NOISE_KEY_1
|
||||
}).then((waku) => waku.start().then(() => waku)),
|
||||
createRelayNode({
|
||||
pubSubTopics: [pubSubTopic],
|
||||
pubSubTopics: [CustomPubSubTopic],
|
||||
staticNoiseKey: NOISE_KEY_2,
|
||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||
}).then((waku) => waku.start().then(() => waku))
|
||||
@ -370,7 +550,7 @@ describe("Waku Relay [node only]", () => {
|
||||
|
||||
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||
(resolve) => {
|
||||
void waku2.relay.subscribe([CustomTopicDecoder], () =>
|
||||
void waku2.relay.subscribe([CustomDecoder], () =>
|
||||
resolve({
|
||||
payload: new Uint8Array([])
|
||||
} as DecodedMessage)
|
||||
@ -378,18 +558,18 @@ describe("Waku Relay [node only]", () => {
|
||||
}
|
||||
);
|
||||
|
||||
let sendResult = await waku1.relay.send(CustomTopicEncoder, {
|
||||
let sendResult = await waku1.relay.send(CustomEncoder, {
|
||||
payload: generateRandomUint8Array(1 * MB)
|
||||
});
|
||||
expect(sendResult.recipients.length).to.eq(1);
|
||||
|
||||
sendResult = await waku1.relay.send(CustomTopicEncoder, {
|
||||
sendResult = await waku1.relay.send(CustomEncoder, {
|
||||
payload: generateRandomUint8Array(1 * MB + 65536)
|
||||
});
|
||||
expect(sendResult.recipients.length).to.eq(0);
|
||||
expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG);
|
||||
|
||||
sendResult = await waku1.relay.send(CustomTopicEncoder, {
|
||||
sendResult = await waku1.relay.send(CustomEncoder, {
|
||||
payload: generateRandomUint8Array(2 * MB)
|
||||
});
|
||||
expect(sendResult.recipients.length).to.eq(0);
|
||||
|
@ -3,6 +3,8 @@ import {
|
||||
createDecoder,
|
||||
createEncoder,
|
||||
DecodedMessage,
|
||||
Decoder,
|
||||
DefaultPubSubTopic,
|
||||
PageDirection,
|
||||
waitForRemotePeer
|
||||
} from "@waku/core";
|
||||
@ -565,9 +567,12 @@ describe("Waku Store, custom pubsub topic", () => {
|
||||
const customPubSubTopic = "/waku/2/custom-dapp/proto";
|
||||
let waku: LightNode;
|
||||
let nwaku: NimGoNode;
|
||||
let nwaku2: NimGoNode;
|
||||
|
||||
const CustomPubSubTestDecoder = createDecoder(
|
||||
TestContentTopic,
|
||||
const customContentTopic = "/test/2/waku-store/utf8";
|
||||
|
||||
const customTestDecoder = createDecoder(
|
||||
customContentTopic,
|
||||
customPubSubTopic
|
||||
);
|
||||
|
||||
@ -576,14 +581,17 @@ describe("Waku Store, custom pubsub topic", () => {
|
||||
nwaku = new NimGoNode(makeLogFileName(this));
|
||||
await nwaku.start({
|
||||
store: true,
|
||||
topic: customPubSubTopic,
|
||||
topic: [customPubSubTopic, DefaultPubSubTopic],
|
||||
relay: true
|
||||
});
|
||||
await nwaku.ensureSubscriptions([customPubSubTopic, DefaultPubSubTopic]);
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
!!nwaku &&
|
||||
nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e));
|
||||
!!nwaku2 &&
|
||||
nwaku2.stop().catch((e) => console.log("Nwaku failed to stop", e));
|
||||
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
|
||||
});
|
||||
|
||||
@ -596,7 +604,7 @@ describe("Waku Store, custom pubsub topic", () => {
|
||||
await nwaku.sendMessage(
|
||||
NimGoNode.toMessageRpcQuery({
|
||||
payload: new Uint8Array([i]),
|
||||
contentTopic: TestContentTopic
|
||||
contentTopic: customContentTopic
|
||||
}),
|
||||
customPubSubTopic
|
||||
)
|
||||
@ -614,7 +622,7 @@ describe("Waku Store, custom pubsub topic", () => {
|
||||
const messages: IMessage[] = [];
|
||||
let promises: Promise<void>[] = [];
|
||||
for await (const msgPromises of waku.store.queryGenerator([
|
||||
CustomPubSubTestDecoder
|
||||
customTestDecoder
|
||||
])) {
|
||||
const _promises = msgPromises.map(async (promise) => {
|
||||
const msg = await promise;
|
||||
@ -634,4 +642,131 @@ describe("Waku Store, custom pubsub topic", () => {
|
||||
});
|
||||
expect(result).to.not.eq(-1);
|
||||
});
|
||||
|
||||
it("Generator, 2 different pubsubtopics", async function () {
|
||||
this.timeout(10000);
|
||||
|
||||
const totalMsgs = 10;
|
||||
await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic);
|
||||
await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
||||
|
||||
waku = await createLightNode({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
pubSubTopics: [customPubSubTopic, DefaultPubSubTopic]
|
||||
});
|
||||
await waku.start();
|
||||
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
const customMessages = await processMessages(
|
||||
waku,
|
||||
[customTestDecoder],
|
||||
customPubSubTopic
|
||||
);
|
||||
expect(customMessages?.length).eq(totalMsgs);
|
||||
const result1 = customMessages?.findIndex((msg) => {
|
||||
return msg.payload![0]! === 0;
|
||||
});
|
||||
expect(result1).to.not.eq(-1);
|
||||
|
||||
const testMessages = await processMessages(
|
||||
waku,
|
||||
[TestDecoder],
|
||||
DefaultPubSubTopic
|
||||
);
|
||||
expect(testMessages?.length).eq(totalMsgs);
|
||||
const result2 = testMessages?.findIndex((msg) => {
|
||||
return msg.payload![0]! === 0;
|
||||
});
|
||||
expect(result2).to.not.eq(-1);
|
||||
});
|
||||
|
||||
it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () {
|
||||
this.timeout(10000);
|
||||
|
||||
// Set up and start a new nwaku node with Default PubSubtopic
|
||||
nwaku2 = new NimGoNode(makeLogFileName(this) + "2");
|
||||
await nwaku2.start({
|
||||
store: true,
|
||||
topic: [DefaultPubSubTopic],
|
||||
relay: true
|
||||
});
|
||||
|
||||
const totalMsgs = 10;
|
||||
await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic);
|
||||
await sendMessages(nwaku2, totalMsgs, TestContentTopic, DefaultPubSubTopic);
|
||||
|
||||
waku = await createLightNode({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
pubSubTopics: [customPubSubTopic, DefaultPubSubTopic]
|
||||
});
|
||||
await waku.start();
|
||||
|
||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||
await waku.dial(await nwaku2.getMultiaddrWithId());
|
||||
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||
|
||||
let customMessages: IMessage[] = [];
|
||||
let testMessages: IMessage[] = [];
|
||||
|
||||
while (
|
||||
customMessages.length != totalMsgs ||
|
||||
testMessages.length != totalMsgs
|
||||
) {
|
||||
customMessages = await processMessages(
|
||||
waku,
|
||||
[customTestDecoder],
|
||||
customPubSubTopic
|
||||
);
|
||||
testMessages = await processMessages(
|
||||
waku,
|
||||
[TestDecoder],
|
||||
DefaultPubSubTopic
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
// will move those 2 reusable functions to store/utils when refactoring store tests but with another PR
|
||||
async function sendMessages(
|
||||
instance: NimGoNode,
|
||||
numMessages: number,
|
||||
contentTopic: string,
|
||||
pubSubTopic: string
|
||||
): Promise<void> {
|
||||
for (let i = 0; i < numMessages; i++) {
|
||||
expect(
|
||||
await instance.sendMessage(
|
||||
NimGoNode.toMessageRpcQuery({
|
||||
payload: new Uint8Array([i]),
|
||||
contentTopic: contentTopic
|
||||
}),
|
||||
pubSubTopic
|
||||
)
|
||||
).to.be.true;
|
||||
}
|
||||
await delay(1); // to ensure each timestamp is unique.
|
||||
}
|
||||
|
||||
async function processMessages(
|
||||
instance: LightNode,
|
||||
decoders: Array<Decoder>,
|
||||
expectedTopic: string
|
||||
): Promise<IMessage[]> {
|
||||
const localMessages: IMessage[] = [];
|
||||
let localPromises: Promise<void>[] = [];
|
||||
for await (const msgPromises of instance.store.queryGenerator(decoders)) {
|
||||
const _promises = msgPromises.map(async (promise) => {
|
||||
const msg = await promise;
|
||||
if (msg) {
|
||||
localMessages.push(msg);
|
||||
expect(msg.pubSubTopic).to.eq(expectedTopic);
|
||||
}
|
||||
});
|
||||
|
||||
localPromises = localPromises.concat(_promises);
|
||||
}
|
||||
await Promise.all(localPromises);
|
||||
return localMessages;
|
||||
}
|
||||
});
|
||||
|
Loading…
x
Reference in New Issue
Block a user