mirror of https://github.com/waku-org/js-waku.git
chore: new relay tests (#1649)
* make relay folder * make relay folder * adjust message collector for relay * small fix * small fix * small fix * split tests more * small fixes * small fix * new test * fix pubsubtopic name * new subscribe tests * new subscribe tests * new tests * small fix after ci run * small fix after ci run2 * fix skipped test * added issue for skipped test
This commit is contained in:
parent
80a33b9c2b
commit
1ec0c200ca
|
@ -50,7 +50,8 @@ export const TEST_STRING = [
|
||||||
{ description: "JSON", value: '{"user":"admin","password":"123456"}' },
|
{ description: "JSON", value: '{"user":"admin","password":"123456"}' },
|
||||||
{ description: "shell command", value: "`rm -rf /`" },
|
{ description: "shell command", value: "`rm -rf /`" },
|
||||||
{ description: "escaped characters", value: "\\n\\t\\0" },
|
{ description: "escaped characters", value: "\\n\\t\\0" },
|
||||||
{ description: "unicode special characters", value: "\u202Ereverse" }
|
{ description: "unicode special characters", value: "\u202Ereverse" },
|
||||||
|
{ description: "emoji", value: "🤫 🤥 😶 😶🌫️ 😐 😑 😬 🫨 🫠 🙄 😯 😦 😧 😮" }
|
||||||
];
|
];
|
||||||
|
|
||||||
export const TEST_TIMESTAMPS = [
|
export const TEST_TIMESTAMPS = [
|
||||||
|
|
|
@ -12,3 +12,4 @@ export * from "./log_file.js";
|
||||||
export * from "./node/node.js";
|
export * from "./node/node.js";
|
||||||
export * from "./teardown.js";
|
export * from "./teardown.js";
|
||||||
export * from "./message_collector.js";
|
export * from "./message_collector.js";
|
||||||
|
export * from "./utils.js";
|
||||||
|
|
|
@ -113,7 +113,7 @@ export class MessageCollector {
|
||||||
expectedVersion?: number;
|
expectedVersion?: number;
|
||||||
expectedMeta?: Uint8Array;
|
expectedMeta?: Uint8Array;
|
||||||
expectedEphemeral?: boolean;
|
expectedEphemeral?: boolean;
|
||||||
expectedTimestamp?: bigint;
|
expectedTimestamp?: bigint | number;
|
||||||
checkTimestamp?: boolean; // Used to determine if we need to check the timestamp
|
checkTimestamp?: boolean; // Used to determine if we need to check the timestamp
|
||||||
}
|
}
|
||||||
): void {
|
): void {
|
||||||
|
@ -148,6 +148,38 @@ export class MessageCollector {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const shouldCheckTimestamp =
|
||||||
|
options.checkTimestamp !== undefined ? options.checkTimestamp : true;
|
||||||
|
if (shouldCheckTimestamp && message.timestamp) {
|
||||||
|
// In we send timestamp in the request we assert that it matches the timestamp in the response +- 1 sec
|
||||||
|
// We take the 1s deviation because there are some ms diffs in timestamps, probably because of conversions
|
||||||
|
let timestampAsNumber: number;
|
||||||
|
|
||||||
|
if (message.timestamp instanceof Date) {
|
||||||
|
timestampAsNumber = message.timestamp.getTime();
|
||||||
|
} else {
|
||||||
|
timestampAsNumber = Number(message.timestamp) / 1_000_000;
|
||||||
|
}
|
||||||
|
|
||||||
|
let lowerBound: number;
|
||||||
|
let upperBound: number;
|
||||||
|
|
||||||
|
// Define the bounds based on the expectedTimestamp
|
||||||
|
if (options.expectedTimestamp !== undefined) {
|
||||||
|
lowerBound = Number(options.expectedTimestamp) - 1000;
|
||||||
|
upperBound = Number(options.expectedTimestamp) + 1000;
|
||||||
|
} else {
|
||||||
|
upperBound = Date.now();
|
||||||
|
lowerBound = upperBound - 10000;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (timestampAsNumber < lowerBound || timestampAsNumber > upperBound) {
|
||||||
|
throw new AssertionError(
|
||||||
|
`Message timestamp not within the expected range. Expected between: ${lowerBound} and ${upperBound}. Got: ${timestampAsNumber}`
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (this.isMessageRpcResponse(message)) {
|
if (this.isMessageRpcResponse(message)) {
|
||||||
// nwaku message specific assertions
|
// nwaku message specific assertions
|
||||||
const receivedMessageText = message.payload
|
const receivedMessageText = message.payload
|
||||||
|
@ -158,37 +190,6 @@ export class MessageCollector {
|
||||||
options.expectedMessageText,
|
options.expectedMessageText,
|
||||||
`Message text mismatch. Expected: ${options.expectedMessageText}. Got: ${receivedMessageText}`
|
`Message text mismatch. Expected: ${options.expectedMessageText}. Got: ${receivedMessageText}`
|
||||||
);
|
);
|
||||||
|
|
||||||
if (message.timestamp) {
|
|
||||||
// In we send timestamp in the request we assert that it matches the timestamp in the response +- 1 sec
|
|
||||||
// We take the 1s deviation because there are some ms diffs in timestamps, probably because of conversions
|
|
||||||
if (options.expectedTimestamp !== undefined) {
|
|
||||||
const lowerBound =
|
|
||||||
BigInt(options.expectedTimestamp) - BigInt(1000000000);
|
|
||||||
const upperBound =
|
|
||||||
BigInt(options.expectedTimestamp) + BigInt(1000000000);
|
|
||||||
|
|
||||||
if (
|
|
||||||
message.timestamp < lowerBound ||
|
|
||||||
message.timestamp > upperBound
|
|
||||||
) {
|
|
||||||
throw new AssertionError(
|
|
||||||
`Message timestamp not within the expected range. Expected between: ${lowerBound} and ${upperBound}. Got: ${message.timestamp}`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// In we don't send timestamp in the request we assert that the timestamp in the response is between now and (now-10s)
|
|
||||||
else {
|
|
||||||
const now = BigInt(Date.now()) * BigInt(1_000_000);
|
|
||||||
const tenSecondsAgo = now - BigInt(10_000_000_000);
|
|
||||||
|
|
||||||
if (message.timestamp < tenSecondsAgo || message.timestamp > now) {
|
|
||||||
throw new AssertionError(
|
|
||||||
`Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp}`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
// js-waku message specific assertions
|
// js-waku message specific assertions
|
||||||
expect(message.pubsubTopic).to.eq(
|
expect(message.pubsubTopic).to.eq(
|
||||||
|
@ -205,18 +206,6 @@ export class MessageCollector {
|
||||||
}. Got: ${bytesToUtf8(message.payload)}`
|
}. Got: ${bytesToUtf8(message.payload)}`
|
||||||
);
|
);
|
||||||
|
|
||||||
const shouldCheckTimestamp =
|
|
||||||
options.checkTimestamp !== undefined ? options.checkTimestamp : true;
|
|
||||||
if (shouldCheckTimestamp && message.timestamp) {
|
|
||||||
const now = Date.now();
|
|
||||||
const tenSecondsAgo = now - 10000;
|
|
||||||
expect(message.timestamp.getTime()).to.be.within(
|
|
||||||
tenSecondsAgo,
|
|
||||||
now,
|
|
||||||
`Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp.getTime()}`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
expect([
|
expect([
|
||||||
options.expectedMeta,
|
options.expectedMeta,
|
||||||
undefined,
|
undefined,
|
||||||
|
|
|
@ -0,0 +1,22 @@
|
||||||
|
import { createDecoder, createEncoder, Decoder, Encoder } from "@waku/core";
|
||||||
|
|
||||||
|
// Utility to generate test data for multiple topics tests.
|
||||||
|
export function generateTestData(topicCount: number): {
|
||||||
|
contentTopics: string[];
|
||||||
|
encoders: Encoder[];
|
||||||
|
decoders: Decoder[];
|
||||||
|
} {
|
||||||
|
const contentTopics = Array.from(
|
||||||
|
{ length: topicCount },
|
||||||
|
(_, i) => `/test/${i + 1}/waku-multi`
|
||||||
|
);
|
||||||
|
const encoders = contentTopics.map((topic) =>
|
||||||
|
createEncoder({ contentTopic: topic })
|
||||||
|
);
|
||||||
|
const decoders = contentTopics.map((topic) => createDecoder(topic));
|
||||||
|
return {
|
||||||
|
contentTopics,
|
||||||
|
encoders,
|
||||||
|
decoders
|
||||||
|
};
|
||||||
|
}
|
|
@ -88,7 +88,7 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Check received message with invalid timestamp is not received", async function () {
|
it("Check message with invalid timestamp is not received", async function () {
|
||||||
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
||||||
await delay(400);
|
await delay(400);
|
||||||
|
|
||||||
|
@ -105,7 +105,7 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||||
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Check received message on other pubsub topic is not received", async function () {
|
it("Check message on other pubsub topic is not received", async function () {
|
||||||
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
||||||
await delay(400);
|
await delay(400);
|
||||||
|
|
||||||
|
@ -121,7 +121,7 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||||
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Check received message with no pubsub topic is not received", async function () {
|
it("Check message with no pubsub topic is not received", async function () {
|
||||||
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
||||||
await delay(400);
|
await delay(400);
|
||||||
|
|
||||||
|
@ -136,7 +136,7 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||||
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Check received message with no content topic is not received", async function () {
|
it("Check message with no content topic is not received", async function () {
|
||||||
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
||||||
await delay(400);
|
await delay(400);
|
||||||
|
|
||||||
|
@ -151,7 +151,7 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||||
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Check received message with no payload is not received", async function () {
|
it("Check message with no payload is not received", async function () {
|
||||||
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
||||||
await delay(400);
|
await delay(400);
|
||||||
|
|
||||||
|
@ -171,7 +171,7 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Check received message with non string payload is not received", async function () {
|
it("Check message with non string payload is not received", async function () {
|
||||||
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
||||||
await delay(400);
|
await delay(400);
|
||||||
|
|
||||||
|
@ -187,7 +187,7 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||||
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Check received message with extra parameter is not received", async function () {
|
it("Check message with extra parameter is not received", async function () {
|
||||||
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
||||||
await delay(400);
|
await delay(400);
|
||||||
|
|
||||||
|
@ -226,7 +226,7 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||||
});
|
});
|
||||||
|
|
||||||
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
|
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
|
||||||
it.skip("Check received message received after jswaku node is restarted", async function () {
|
it.skip("Check message received after jswaku node is restarted", async function () {
|
||||||
// Subscribe and send message
|
// Subscribe and send message
|
||||||
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
||||||
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
|
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
|
||||||
|
@ -259,7 +259,7 @@ describe("Waku Filter V2: FilterPush", function () {
|
||||||
});
|
});
|
||||||
|
|
||||||
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
|
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
|
||||||
it.skip("Check received message received after nwaku node is restarted", async function () {
|
it.skip("Check message received after nwaku node is restarted", async function () {
|
||||||
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
await subscription.subscribe([TestDecoder], messageCollector.callback);
|
||||||
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
|
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") });
|
||||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||||
|
|
|
@ -12,6 +12,7 @@ import { expect } from "chai";
|
||||||
|
|
||||||
import {
|
import {
|
||||||
delay,
|
delay,
|
||||||
|
generateTestData,
|
||||||
makeLogFileName,
|
makeLogFileName,
|
||||||
MessageCollector,
|
MessageCollector,
|
||||||
NimGoNode,
|
NimGoNode,
|
||||||
|
@ -20,7 +21,6 @@ import {
|
||||||
} from "../../src/index.js";
|
} from "../../src/index.js";
|
||||||
|
|
||||||
import {
|
import {
|
||||||
generateTestData,
|
|
||||||
messagePayload,
|
messagePayload,
|
||||||
messageText,
|
messageText,
|
||||||
runNodes,
|
runNodes,
|
||||||
|
@ -295,7 +295,9 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||||
|
|
||||||
// Check if all messages were received.
|
// Check if all messages were received.
|
||||||
// Since there are overlapping topics, there should be 6 messages in total (2 from the first set + 4 from the second set).
|
// Since there are overlapping topics, there should be 6 messages in total (2 from the first set + 4 from the second set).
|
||||||
expect(await messageCollector.waitForMessages(6)).to.eq(true);
|
expect(await messageCollector.waitForMessages(6, { exact: true })).to.eq(
|
||||||
|
true
|
||||||
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
it("Refresh subscription", async function () {
|
it("Refresh subscription", async function () {
|
||||||
|
@ -307,7 +309,9 @@ describe("Waku Filter V2: Subscribe", function () {
|
||||||
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
|
await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") });
|
||||||
|
|
||||||
// Confirm both messages were received.
|
// Confirm both messages were received.
|
||||||
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
expect(await messageCollector.waitForMessages(2, { exact: true })).to.eq(
|
||||||
|
true
|
||||||
|
);
|
||||||
messageCollector.verifyReceivedMessage(0, {
|
messageCollector.verifyReceivedMessage(0, {
|
||||||
expectedMessageText: "M1",
|
expectedMessageText: "M1",
|
||||||
expectedContentTopic: TestContentTopic
|
expectedContentTopic: TestContentTopic
|
||||||
|
|
|
@ -3,10 +3,14 @@ import type { IFilterSubscription, LightNode } from "@waku/interfaces";
|
||||||
import { utf8ToBytes } from "@waku/utils/bytes";
|
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||||
import { expect } from "chai";
|
import { expect } from "chai";
|
||||||
|
|
||||||
import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js";
|
|
||||||
|
|
||||||
import {
|
import {
|
||||||
generateTestData,
|
generateTestData,
|
||||||
|
MessageCollector,
|
||||||
|
NimGoNode,
|
||||||
|
tearDownNodes
|
||||||
|
} from "../../src/index.js";
|
||||||
|
|
||||||
|
import {
|
||||||
messagePayload,
|
messagePayload,
|
||||||
messageText,
|
messageText,
|
||||||
runNodes,
|
runNodes,
|
||||||
|
|
|
@ -1,10 +1,4 @@
|
||||||
import {
|
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
|
||||||
createDecoder,
|
|
||||||
createEncoder,
|
|
||||||
Decoder,
|
|
||||||
Encoder,
|
|
||||||
waitForRemotePeer
|
|
||||||
} from "@waku/core";
|
|
||||||
import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces";
|
import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces";
|
||||||
import { createLightNode } from "@waku/sdk";
|
import { createLightNode } from "@waku/sdk";
|
||||||
import { Logger } from "@waku/utils";
|
import { Logger } from "@waku/utils";
|
||||||
|
@ -21,27 +15,6 @@ export const TestDecoder = createDecoder(TestContentTopic);
|
||||||
export const messageText = "Filtering works!";
|
export const messageText = "Filtering works!";
|
||||||
export const messagePayload = { payload: utf8ToBytes(messageText) };
|
export const messagePayload = { payload: utf8ToBytes(messageText) };
|
||||||
|
|
||||||
// Utility to generate test data for multiple topics tests.
|
|
||||||
export function generateTestData(topicCount: number): {
|
|
||||||
contentTopics: string[];
|
|
||||||
encoders: Encoder[];
|
|
||||||
decoders: Decoder[];
|
|
||||||
} {
|
|
||||||
const contentTopics = Array.from(
|
|
||||||
{ length: topicCount },
|
|
||||||
(_, i) => `/test/${i + 1}/waku-multi`
|
|
||||||
);
|
|
||||||
const encoders = contentTopics.map((topic) =>
|
|
||||||
createEncoder({ contentTopic: topic })
|
|
||||||
);
|
|
||||||
const decoders = contentTopics.map((topic) => createDecoder(topic));
|
|
||||||
return {
|
|
||||||
contentTopics,
|
|
||||||
encoders,
|
|
||||||
decoders
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
// Utility to validate errors related to pings in the subscription.
|
// Utility to validate errors related to pings in the subscription.
|
||||||
export async function validatePingError(
|
export async function validatePingError(
|
||||||
subscription: IFilterSubscription
|
subscription: IFilterSubscription
|
||||||
|
|
|
@ -188,7 +188,6 @@ describe("Waku Light Push", function () {
|
||||||
Date.now() + 3600000
|
Date.now() + 3600000
|
||||||
].forEach((testItem) => {
|
].forEach((testItem) => {
|
||||||
it(`Push message with custom timestamp: ${testItem}`, async function () {
|
it(`Push message with custom timestamp: ${testItem}`, async function () {
|
||||||
const customTimeNanos = BigInt(testItem) * BigInt(1000000);
|
|
||||||
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
const pushResponse = await waku.lightPush.send(TestEncoder, {
|
||||||
payload: utf8ToBytes(messageText),
|
payload: utf8ToBytes(messageText),
|
||||||
timestamp: new Date(testItem)
|
timestamp: new Date(testItem)
|
||||||
|
@ -198,7 +197,7 @@ describe("Waku Light Push", function () {
|
||||||
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||||
messageCollector.verifyReceivedMessage(0, {
|
messageCollector.verifyReceivedMessage(0, {
|
||||||
expectedMessageText: messageText,
|
expectedMessageText: messageText,
|
||||||
expectedTimestamp: customTimeNanos,
|
expectedTimestamp: testItem,
|
||||||
expectedContentTopic: TestContentTopic
|
expectedContentTopic: TestContentTopic
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
|
@ -1,740 +0,0 @@
|
||||||
import type { PeerId } from "@libp2p/interface/peer-id";
|
|
||||||
import {
|
|
||||||
createDecoder,
|
|
||||||
createEncoder,
|
|
||||||
DecodedMessage,
|
|
||||||
DefaultPubSubTopic,
|
|
||||||
waitForRemotePeer
|
|
||||||
} from "@waku/core";
|
|
||||||
import { RelayNode, SendError } from "@waku/interfaces";
|
|
||||||
import { Protocols } from "@waku/interfaces";
|
|
||||||
import {
|
|
||||||
createDecoder as createEciesDecoder,
|
|
||||||
createEncoder as createEciesEncoder,
|
|
||||||
generatePrivateKey,
|
|
||||||
getPublicKey
|
|
||||||
} from "@waku/message-encryption/ecies";
|
|
||||||
import {
|
|
||||||
createDecoder as createSymDecoder,
|
|
||||||
createEncoder as createSymEncoder,
|
|
||||||
generateSymmetricKey
|
|
||||||
} from "@waku/message-encryption/symmetric";
|
|
||||||
import { createRelayNode } from "@waku/sdk";
|
|
||||||
import { Logger } from "@waku/utils";
|
|
||||||
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
|
||||||
import { expect } from "chai";
|
|
||||||
|
|
||||||
import {
|
|
||||||
delay,
|
|
||||||
makeLogFileName,
|
|
||||||
MessageCollector,
|
|
||||||
NOISE_KEY_1,
|
|
||||||
NOISE_KEY_2,
|
|
||||||
NOISE_KEY_3,
|
|
||||||
tearDownNodes
|
|
||||||
} from "../src/index.js";
|
|
||||||
import { MessageRpcResponse } from "../src/node/interfaces.js";
|
|
||||||
import { base64ToUtf8, NimGoNode } from "../src/node/node.js";
|
|
||||||
import { generateRandomUint8Array } from "../src/random_array.js";
|
|
||||||
|
|
||||||
const log = new Logger("test:relay");
|
|
||||||
|
|
||||||
const TestContentTopic = "/test/1/waku-relay/utf8";
|
|
||||||
const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
|
|
||||||
const TestDecoder = createDecoder(TestContentTopic);
|
|
||||||
|
|
||||||
describe("Waku Relay [node only]", () => {
|
|
||||||
// Node needed as we don't have a way to connect 2 js waku
|
|
||||||
// nodes in the browser yet
|
|
||||||
describe("2 js nodes", () => {
|
|
||||||
afterEach(function () {
|
|
||||||
if (this.currentTest?.state === "failed") {
|
|
||||||
console.log(`Test failed, log file name is ${makeLogFileName(this)}`);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let waku1: RelayNode;
|
|
||||||
let waku2: RelayNode;
|
|
||||||
beforeEach(async function () {
|
|
||||||
this.timeout(10000);
|
|
||||||
|
|
||||||
log.info("Starting JS Waku instances");
|
|
||||||
[waku1, waku2] = await Promise.all([
|
|
||||||
createRelayNode({ staticNoiseKey: NOISE_KEY_1 }).then((waku) =>
|
|
||||||
waku.start().then(() => waku)
|
|
||||||
),
|
|
||||||
createRelayNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_2,
|
|
||||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
|
||||||
}).then((waku) => waku.start().then(() => waku))
|
|
||||||
]);
|
|
||||||
log.info("Instances started, adding waku2 to waku1's address book");
|
|
||||||
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
|
||||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
|
||||||
});
|
|
||||||
await waku1.dial(waku2.libp2p.peerId);
|
|
||||||
|
|
||||||
log.info("Wait for mutual pubsub subscription");
|
|
||||||
await Promise.all([
|
|
||||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
|
||||||
waitForRemotePeer(waku2, [Protocols.Relay])
|
|
||||||
]);
|
|
||||||
log.info("before each hook done");
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(async function () {
|
|
||||||
!!waku1 &&
|
|
||||||
waku1.stop().catch((e) => console.log("Waku failed to stop", e));
|
|
||||||
!!waku2 &&
|
|
||||||
waku2.stop().catch((e) => console.log("Waku failed to stop", e));
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Subscribe", async function () {
|
|
||||||
log.info("Getting subscribers");
|
|
||||||
const subscribers1 = waku1.libp2p.services
|
|
||||||
.pubsub!.getSubscribers(DefaultPubSubTopic)
|
|
||||||
.map((p) => p.toString());
|
|
||||||
const subscribers2 = waku2.libp2p.services
|
|
||||||
.pubsub!.getSubscribers(DefaultPubSubTopic)
|
|
||||||
.map((p) => p.toString());
|
|
||||||
|
|
||||||
log.info("Asserting mutual subscription");
|
|
||||||
expect(subscribers1).to.contain(waku2.libp2p.peerId.toString());
|
|
||||||
expect(subscribers2).to.contain(waku1.libp2p.peerId.toString());
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Register correct protocols", async function () {
|
|
||||||
const protocols = waku1.libp2p.getProtocols();
|
|
||||||
|
|
||||||
expect(protocols).to.contain("/vac/waku/relay/2.0.0");
|
|
||||||
expect(protocols.findIndex((value) => value.match(/sub/))).to.eq(-1);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Publish", async function () {
|
|
||||||
this.timeout(10000);
|
|
||||||
|
|
||||||
const messageText = "JS to JS communication works";
|
|
||||||
const messageTimestamp = new Date("1995-12-17T03:24:00");
|
|
||||||
const message = {
|
|
||||||
payload: utf8ToBytes(messageText),
|
|
||||||
timestamp: messageTimestamp
|
|
||||||
};
|
|
||||||
|
|
||||||
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
|
||||||
(resolve) => {
|
|
||||||
void waku2.relay.subscribe([TestDecoder], resolve);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
await waku1.relay.send(TestEncoder, message);
|
|
||||||
|
|
||||||
const receivedMsg = await receivedMsgPromise;
|
|
||||||
|
|
||||||
expect(receivedMsg.contentTopic).to.eq(TestContentTopic);
|
|
||||||
expect(bytesToUtf8(receivedMsg.payload)).to.eq(messageText);
|
|
||||||
expect(receivedMsg.timestamp?.valueOf()).to.eq(
|
|
||||||
messageTimestamp.valueOf()
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Filter on content topics", async function () {
|
|
||||||
this.timeout(10000);
|
|
||||||
|
|
||||||
const fooMessageText = "Published on content topic foo";
|
|
||||||
const barMessageText = "Published on content topic bar";
|
|
||||||
|
|
||||||
const fooContentTopic = "foo";
|
|
||||||
const barContentTopic = "bar";
|
|
||||||
|
|
||||||
const fooEncoder = createEncoder({ contentTopic: fooContentTopic });
|
|
||||||
const barEncoder = createEncoder({ contentTopic: barContentTopic });
|
|
||||||
|
|
||||||
const fooDecoder = createDecoder(fooContentTopic);
|
|
||||||
const barDecoder = createDecoder(barContentTopic);
|
|
||||||
|
|
||||||
const fooMessages: DecodedMessage[] = [];
|
|
||||||
void waku2.relay.subscribe([fooDecoder], (msg) => {
|
|
||||||
fooMessages.push(msg);
|
|
||||||
});
|
|
||||||
|
|
||||||
const barMessages: DecodedMessage[] = [];
|
|
||||||
void waku2.relay.subscribe([barDecoder], (msg) => {
|
|
||||||
barMessages.push(msg);
|
|
||||||
});
|
|
||||||
|
|
||||||
await waku1.relay.send(barEncoder, {
|
|
||||||
payload: utf8ToBytes(barMessageText)
|
|
||||||
});
|
|
||||||
await waku1.relay.send(fooEncoder, {
|
|
||||||
payload: utf8ToBytes(fooMessageText)
|
|
||||||
});
|
|
||||||
|
|
||||||
while (!fooMessages.length && !barMessages.length) {
|
|
||||||
await delay(100);
|
|
||||||
}
|
|
||||||
|
|
||||||
expect(fooMessages[0].contentTopic).to.eq(fooContentTopic);
|
|
||||||
expect(bytesToUtf8(fooMessages[0].payload)).to.eq(fooMessageText);
|
|
||||||
|
|
||||||
expect(barMessages[0].contentTopic).to.eq(barContentTopic);
|
|
||||||
expect(bytesToUtf8(barMessages[0].payload)).to.eq(barMessageText);
|
|
||||||
|
|
||||||
expect(fooMessages.length).to.eq(1);
|
|
||||||
expect(barMessages.length).to.eq(1);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Decrypt messages", async function () {
|
|
||||||
this.timeout(10000);
|
|
||||||
|
|
||||||
const asymText = "This message is encrypted using asymmetric";
|
|
||||||
const asymTopic = "/test/1/asymmetric/proto";
|
|
||||||
const symText = "This message is encrypted using symmetric encryption";
|
|
||||||
const symTopic = "/test/1/symmetric/proto";
|
|
||||||
|
|
||||||
const privateKey = generatePrivateKey();
|
|
||||||
const symKey = generateSymmetricKey();
|
|
||||||
const publicKey = getPublicKey(privateKey);
|
|
||||||
|
|
||||||
const eciesEncoder = createEciesEncoder({
|
|
||||||
contentTopic: asymTopic,
|
|
||||||
publicKey
|
|
||||||
});
|
|
||||||
const symEncoder = createSymEncoder({
|
|
||||||
contentTopic: symTopic,
|
|
||||||
symKey
|
|
||||||
});
|
|
||||||
|
|
||||||
const eciesDecoder = createEciesDecoder(asymTopic, privateKey);
|
|
||||||
const symDecoder = createSymDecoder(symTopic, symKey);
|
|
||||||
|
|
||||||
const msgs: DecodedMessage[] = [];
|
|
||||||
void waku2.relay.subscribe([eciesDecoder], (wakuMsg) => {
|
|
||||||
msgs.push(wakuMsg);
|
|
||||||
});
|
|
||||||
void waku2.relay.subscribe([symDecoder], (wakuMsg) => {
|
|
||||||
msgs.push(wakuMsg);
|
|
||||||
});
|
|
||||||
|
|
||||||
await waku1.relay.send(eciesEncoder, { payload: utf8ToBytes(asymText) });
|
|
||||||
await delay(200);
|
|
||||||
await waku1.relay.send(symEncoder, { payload: utf8ToBytes(symText) });
|
|
||||||
|
|
||||||
while (msgs.length < 2) {
|
|
||||||
await delay(200);
|
|
||||||
}
|
|
||||||
|
|
||||||
expect(msgs[0].contentTopic).to.eq(asymTopic);
|
|
||||||
expect(bytesToUtf8(msgs[0].payload!)).to.eq(asymText);
|
|
||||||
expect(msgs[1].contentTopic).to.eq(symTopic);
|
|
||||||
expect(bytesToUtf8(msgs[1].payload!)).to.eq(symText);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Delete observer", async function () {
|
|
||||||
this.timeout(10000);
|
|
||||||
|
|
||||||
const messageText =
|
|
||||||
"Published on content topic with added then deleted observer";
|
|
||||||
|
|
||||||
const contentTopic = "added-then-deleted-observer";
|
|
||||||
|
|
||||||
// The promise **fails** if we receive a message on this observer.
|
|
||||||
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
|
||||||
(resolve, reject) => {
|
|
||||||
const deleteObserver = waku2.relay.subscribe(
|
|
||||||
[createDecoder(contentTopic)],
|
|
||||||
reject
|
|
||||||
) as () => void;
|
|
||||||
deleteObserver();
|
|
||||||
setTimeout(resolve, 500);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
await waku1.relay.send(createEncoder({ contentTopic }), {
|
|
||||||
payload: utf8ToBytes(messageText)
|
|
||||||
});
|
|
||||||
|
|
||||||
await receivedMsgPromise;
|
|
||||||
// If it does not throw then we are good.
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("Custom pubsub topic", () => {
|
|
||||||
let waku1: RelayNode;
|
|
||||||
let waku2: RelayNode;
|
|
||||||
let waku3: RelayNode;
|
|
||||||
|
|
||||||
const CustomContentTopic = "/test/2/waku-relay/utf8";
|
|
||||||
const CustomPubSubTopic = "/some/pubsub/topic";
|
|
||||||
|
|
||||||
const CustomEncoder = createEncoder({
|
|
||||||
contentTopic: CustomContentTopic,
|
|
||||||
pubsubTopic: CustomPubSubTopic
|
|
||||||
});
|
|
||||||
const CustomDecoder = createDecoder(CustomContentTopic, CustomPubSubTopic);
|
|
||||||
|
|
||||||
afterEach(async function () {
|
|
||||||
!!waku1 &&
|
|
||||||
waku1.stop().catch((e) => console.log("Waku failed to stop", e));
|
|
||||||
!!waku2 &&
|
|
||||||
waku2.stop().catch((e) => console.log("Waku failed to stop", e));
|
|
||||||
!!waku3 &&
|
|
||||||
waku3.stop().catch((e) => console.log("Waku failed to stop", e));
|
|
||||||
});
|
|
||||||
|
|
||||||
[
|
|
||||||
{
|
|
||||||
pubsub: CustomPubSubTopic,
|
|
||||||
encoder: CustomEncoder,
|
|
||||||
decoder: CustomDecoder
|
|
||||||
},
|
|
||||||
{
|
|
||||||
pubsub: DefaultPubSubTopic,
|
|
||||||
encoder: TestEncoder,
|
|
||||||
decoder: TestDecoder
|
|
||||||
}
|
|
||||||
].forEach((testItem) => {
|
|
||||||
it(`3 nodes on ${testItem.pubsub} topic`, async function () {
|
|
||||||
this.timeout(10000);
|
|
||||||
|
|
||||||
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
|
|
||||||
.fill(null)
|
|
||||||
.map(() => new MessageCollector());
|
|
||||||
|
|
||||||
[waku1, waku2, waku3] = await Promise.all([
|
|
||||||
createRelayNode({
|
|
||||||
pubsubTopics: [testItem.pubsub],
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
createRelayNode({
|
|
||||||
pubsubTopics: [testItem.pubsub],
|
|
||||||
staticNoiseKey: NOISE_KEY_2,
|
|
||||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
createRelayNode({
|
|
||||||
pubsubTopics: [testItem.pubsub],
|
|
||||||
staticNoiseKey: NOISE_KEY_3
|
|
||||||
}).then((waku) => waku.start().then(() => waku))
|
|
||||||
]);
|
|
||||||
|
|
||||||
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
|
||||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
|
||||||
});
|
|
||||||
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
|
||||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
|
||||||
});
|
|
||||||
await Promise.all([
|
|
||||||
waku1.dial(waku2.libp2p.peerId),
|
|
||||||
waku3.dial(waku2.libp2p.peerId)
|
|
||||||
]);
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
|
||||||
waitForRemotePeer(waku2, [Protocols.Relay]),
|
|
||||||
waitForRemotePeer(waku3, [Protocols.Relay])
|
|
||||||
]);
|
|
||||||
|
|
||||||
await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback);
|
|
||||||
await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback);
|
|
||||||
await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback);
|
|
||||||
|
|
||||||
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
|
|
||||||
const relayResponse1 = await waku1.relay.send(testItem.encoder, {
|
|
||||||
payload: utf8ToBytes("M1")
|
|
||||||
});
|
|
||||||
const relayResponse2 = await waku2.relay.send(testItem.encoder, {
|
|
||||||
payload: utf8ToBytes("M2")
|
|
||||||
});
|
|
||||||
const relayResponse3 = await waku3.relay.send(testItem.encoder, {
|
|
||||||
payload: utf8ToBytes("M3")
|
|
||||||
});
|
|
||||||
|
|
||||||
expect(relayResponse1.recipients[0].toString()).to.eq(
|
|
||||||
waku2.libp2p.peerId.toString()
|
|
||||||
);
|
|
||||||
expect(relayResponse3.recipients[0].toString()).to.eq(
|
|
||||||
waku2.libp2p.peerId.toString()
|
|
||||||
);
|
|
||||||
expect(relayResponse2.recipients.map((r) => r.toString())).to.include(
|
|
||||||
waku1.libp2p.peerId.toString()
|
|
||||||
);
|
|
||||||
expect(relayResponse2.recipients.map((r) => r.toString())).to.include(
|
|
||||||
waku3.libp2p.peerId.toString()
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(await msgCollector1.waitForMessages(2, { exact: true })).to.eq(
|
|
||||||
true
|
|
||||||
);
|
|
||||||
expect(await msgCollector2.waitForMessages(2, { exact: true })).to.eq(
|
|
||||||
true
|
|
||||||
);
|
|
||||||
expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(
|
|
||||||
true
|
|
||||||
);
|
|
||||||
|
|
||||||
expect(
|
|
||||||
msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2")
|
|
||||||
).to.eq(true);
|
|
||||||
expect(
|
|
||||||
msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3")
|
|
||||||
).to.eq(true);
|
|
||||||
expect(
|
|
||||||
msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1")
|
|
||||||
).to.eq(true);
|
|
||||||
expect(
|
|
||||||
msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3")
|
|
||||||
).to.eq(true);
|
|
||||||
expect(
|
|
||||||
msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1")
|
|
||||||
).to.eq(true);
|
|
||||||
expect(
|
|
||||||
msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2")
|
|
||||||
).to.eq(true);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Nodes with multiple pubsub topic", async function () {
|
|
||||||
this.timeout(10000);
|
|
||||||
|
|
||||||
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
|
|
||||||
.fill(null)
|
|
||||||
.map(() => new MessageCollector());
|
|
||||||
|
|
||||||
// Waku1 and waku2 are using multiple pubsub topis
|
|
||||||
[waku1, waku2, waku3] = await Promise.all([
|
|
||||||
createRelayNode({
|
|
||||||
pubsubTopics: [DefaultPubSubTopic, CustomPubSubTopic],
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
createRelayNode({
|
|
||||||
pubsubTopics: [DefaultPubSubTopic, CustomPubSubTopic],
|
|
||||||
staticNoiseKey: NOISE_KEY_2,
|
|
||||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
createRelayNode({
|
|
||||||
pubsubTopics: [DefaultPubSubTopic],
|
|
||||||
staticNoiseKey: NOISE_KEY_3
|
|
||||||
}).then((waku) => waku.start().then(() => waku))
|
|
||||||
]);
|
|
||||||
|
|
||||||
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
|
||||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
|
||||||
});
|
|
||||||
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
|
||||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
|
||||||
});
|
|
||||||
await Promise.all([
|
|
||||||
waku1.dial(waku2.libp2p.peerId),
|
|
||||||
waku3.dial(waku2.libp2p.peerId)
|
|
||||||
]);
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
|
||||||
waitForRemotePeer(waku2, [Protocols.Relay]),
|
|
||||||
waitForRemotePeer(waku3, [Protocols.Relay])
|
|
||||||
]);
|
|
||||||
|
|
||||||
await waku1.relay.subscribe(
|
|
||||||
[TestDecoder, CustomDecoder],
|
|
||||||
msgCollector1.callback
|
|
||||||
);
|
|
||||||
await waku2.relay.subscribe(
|
|
||||||
[TestDecoder, CustomDecoder],
|
|
||||||
msgCollector2.callback
|
|
||||||
);
|
|
||||||
await waku3.relay.subscribe([TestDecoder], msgCollector3.callback);
|
|
||||||
|
|
||||||
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
|
|
||||||
// However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
|
|
||||||
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") });
|
|
||||||
await waku1.relay.send(CustomEncoder, { payload: utf8ToBytes("M2") });
|
|
||||||
await waku2.relay.send(TestEncoder, { payload: utf8ToBytes("M3") });
|
|
||||||
await waku2.relay.send(CustomEncoder, { payload: utf8ToBytes("M4") });
|
|
||||||
await waku3.relay.send(TestEncoder, { payload: utf8ToBytes("M5") });
|
|
||||||
await waku3.relay.send(CustomEncoder, { payload: utf8ToBytes("M6") });
|
|
||||||
|
|
||||||
expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq(
|
|
||||||
true
|
|
||||||
);
|
|
||||||
expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq(
|
|
||||||
true
|
|
||||||
);
|
|
||||||
expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(
|
|
||||||
true
|
|
||||||
);
|
|
||||||
expect(msgCollector1.hasMessage(TestContentTopic, "M3")).to.eq(true);
|
|
||||||
expect(msgCollector1.hasMessage(CustomContentTopic, "M4")).to.eq(true);
|
|
||||||
expect(msgCollector1.hasMessage(TestContentTopic, "M5")).to.eq(true);
|
|
||||||
expect(msgCollector2.hasMessage(TestContentTopic, "M1")).to.eq(true);
|
|
||||||
expect(msgCollector2.hasMessage(CustomContentTopic, "M2")).to.eq(true);
|
|
||||||
expect(msgCollector2.hasMessage(TestContentTopic, "M5")).to.eq(true);
|
|
||||||
expect(msgCollector3.hasMessage(TestContentTopic, "M1")).to.eq(true);
|
|
||||||
expect(msgCollector3.hasMessage(TestContentTopic, "M3")).to.eq(true);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () {
|
|
||||||
this.timeout(10000);
|
|
||||||
|
|
||||||
[waku1, waku2, waku3] = await Promise.all([
|
|
||||||
createRelayNode({
|
|
||||||
pubsubTopics: [CustomPubSubTopic],
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
createRelayNode({
|
|
||||||
pubsubTopics: [CustomPubSubTopic],
|
|
||||||
staticNoiseKey: NOISE_KEY_2,
|
|
||||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
createRelayNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_3
|
|
||||||
}).then((waku) => waku.start().then(() => waku))
|
|
||||||
]);
|
|
||||||
|
|
||||||
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
|
||||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
|
||||||
});
|
|
||||||
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
|
||||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
|
||||||
});
|
|
||||||
await Promise.all([
|
|
||||||
waku1.dial(waku2.libp2p.peerId),
|
|
||||||
waku3.dial(waku2.libp2p.peerId)
|
|
||||||
]);
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
|
||||||
waitForRemotePeer(waku2, [Protocols.Relay])
|
|
||||||
]);
|
|
||||||
|
|
||||||
const messageText = "Communicating using a custom pubsub topic";
|
|
||||||
|
|
||||||
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
|
||||||
(resolve) => {
|
|
||||||
void waku2.relay.subscribe([CustomDecoder], resolve);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
// The promise **fails** if we receive a message on the default
|
|
||||||
// pubsub topic.
|
|
||||||
const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
|
|
||||||
(resolve, reject) => {
|
|
||||||
void waku3.relay.subscribe([TestDecoder], reject);
|
|
||||||
setTimeout(resolve, 1000);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
await waku1.relay.send(CustomEncoder, {
|
|
||||||
payload: utf8ToBytes(messageText)
|
|
||||||
});
|
|
||||||
|
|
||||||
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
|
|
||||||
await waku3NoMsgPromise;
|
|
||||||
|
|
||||||
expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText);
|
|
||||||
expect(waku2ReceivedMsg.pubsubTopic).to.eq(CustomPubSubTopic);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Publishes <= 1 MB and rejects others", async function () {
|
|
||||||
this.timeout(10000);
|
|
||||||
const MB = 1024 ** 2;
|
|
||||||
|
|
||||||
// 1 and 2 uses a custom pubsub
|
|
||||||
[waku1, waku2] = await Promise.all([
|
|
||||||
createRelayNode({
|
|
||||||
pubsubTopics: [CustomPubSubTopic],
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
createRelayNode({
|
|
||||||
pubsubTopics: [CustomPubSubTopic],
|
|
||||||
staticNoiseKey: NOISE_KEY_2,
|
|
||||||
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
|
||||||
}).then((waku) => waku.start().then(() => waku))
|
|
||||||
]);
|
|
||||||
|
|
||||||
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
|
||||||
multiaddrs: waku2.libp2p.getMultiaddrs()
|
|
||||||
});
|
|
||||||
await Promise.all([waku1.dial(waku2.libp2p.peerId)]);
|
|
||||||
|
|
||||||
await Promise.all([
|
|
||||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
|
||||||
waitForRemotePeer(waku2, [Protocols.Relay])
|
|
||||||
]);
|
|
||||||
|
|
||||||
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
|
||||||
(resolve) => {
|
|
||||||
void waku2.relay.subscribe([CustomDecoder], () =>
|
|
||||||
resolve({
|
|
||||||
payload: new Uint8Array([])
|
|
||||||
} as DecodedMessage)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
let sendResult = await waku1.relay.send(CustomEncoder, {
|
|
||||||
payload: generateRandomUint8Array(1 * MB)
|
|
||||||
});
|
|
||||||
expect(sendResult.recipients.length).to.eq(1);
|
|
||||||
|
|
||||||
sendResult = await waku1.relay.send(CustomEncoder, {
|
|
||||||
payload: generateRandomUint8Array(1 * MB + 65536)
|
|
||||||
});
|
|
||||||
expect(sendResult.recipients.length).to.eq(0);
|
|
||||||
expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG);
|
|
||||||
|
|
||||||
sendResult = await waku1.relay.send(CustomEncoder, {
|
|
||||||
payload: generateRandomUint8Array(2 * MB)
|
|
||||||
});
|
|
||||||
expect(sendResult.recipients.length).to.eq(0);
|
|
||||||
expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG);
|
|
||||||
|
|
||||||
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
|
|
||||||
expect(waku2ReceivedMsg?.payload?.length).to.eq(0);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
describe("Interop: NimGoNode", function () {
|
|
||||||
let waku: RelayNode;
|
|
||||||
let nwaku: NimGoNode;
|
|
||||||
|
|
||||||
beforeEach(async function () {
|
|
||||||
this.timeout(30_000);
|
|
||||||
waku = await createRelayNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1
|
|
||||||
});
|
|
||||||
await waku.start();
|
|
||||||
|
|
||||||
nwaku = new NimGoNode(this.test?.ctx?.currentTest?.title + "");
|
|
||||||
await nwaku.start({ relay: true });
|
|
||||||
|
|
||||||
await waku.dial(await nwaku.getMultiaddrWithId());
|
|
||||||
await waitForRemotePeer(waku, [Protocols.Relay]);
|
|
||||||
|
|
||||||
// Nwaku subscribe to the default pubsub topic
|
|
||||||
await nwaku.ensureSubscriptions();
|
|
||||||
});
|
|
||||||
|
|
||||||
afterEach(async function () {
|
|
||||||
this.timeout(15000);
|
|
||||||
await tearDownNodes(nwaku, waku);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("nwaku subscribes", async function () {
|
|
||||||
let subscribers: PeerId[] = [];
|
|
||||||
|
|
||||||
while (subscribers.length === 0) {
|
|
||||||
await delay(200);
|
|
||||||
subscribers =
|
|
||||||
waku.libp2p.services.pubsub!.getSubscribers(DefaultPubSubTopic);
|
|
||||||
}
|
|
||||||
|
|
||||||
const nimPeerId = await nwaku.getPeerId();
|
|
||||||
expect(subscribers.map((p) => p.toString())).to.contain(
|
|
||||||
nimPeerId.toString()
|
|
||||||
);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Publishes to nwaku", async function () {
|
|
||||||
this.timeout(30000);
|
|
||||||
|
|
||||||
const messageText = "This is a message";
|
|
||||||
await waku.relay.send(TestEncoder, { payload: utf8ToBytes(messageText) });
|
|
||||||
|
|
||||||
let msgs: MessageRpcResponse[] = [];
|
|
||||||
|
|
||||||
while (msgs.length === 0) {
|
|
||||||
console.log("Waiting for messages");
|
|
||||||
await delay(200);
|
|
||||||
msgs = await nwaku.messages();
|
|
||||||
}
|
|
||||||
|
|
||||||
expect(msgs[0].contentTopic).to.equal(TestContentTopic);
|
|
||||||
expect(msgs[0].version).to.equal(0);
|
|
||||||
expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Nwaku publishes", async function () {
|
|
||||||
await delay(200);
|
|
||||||
|
|
||||||
const messageText = "Here is another message.";
|
|
||||||
|
|
||||||
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
|
||||||
(resolve) => {
|
|
||||||
void waku.relay.subscribe<DecodedMessage>(TestDecoder, (msg) =>
|
|
||||||
resolve(msg)
|
|
||||||
);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
await nwaku.sendMessage(
|
|
||||||
NimGoNode.toMessageRpcQuery({
|
|
||||||
contentTopic: TestContentTopic,
|
|
||||||
payload: utf8ToBytes(messageText)
|
|
||||||
})
|
|
||||||
);
|
|
||||||
|
|
||||||
const receivedMsg = await receivedMsgPromise;
|
|
||||||
|
|
||||||
expect(receivedMsg.contentTopic).to.eq(TestContentTopic);
|
|
||||||
expect(receivedMsg.version!).to.eq(0);
|
|
||||||
expect(bytesToUtf8(receivedMsg.payload!)).to.eq(messageText);
|
|
||||||
});
|
|
||||||
|
|
||||||
describe.skip("Two nodes connected to nwaku", function () {
|
|
||||||
let waku1: RelayNode;
|
|
||||||
let waku2: RelayNode;
|
|
||||||
let nwaku: NimGoNode;
|
|
||||||
|
|
||||||
afterEach(async function () {
|
|
||||||
await tearDownNodes(nwaku, [waku1, waku2]);
|
|
||||||
});
|
|
||||||
|
|
||||||
it("Js publishes, other Js receives", async function () {
|
|
||||||
this.timeout(60_000);
|
|
||||||
[waku1, waku2] = await Promise.all([
|
|
||||||
createRelayNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_1,
|
|
||||||
emitSelf: true
|
|
||||||
}).then((waku) => waku.start().then(() => waku)),
|
|
||||||
createRelayNode({
|
|
||||||
staticNoiseKey: NOISE_KEY_2
|
|
||||||
}).then((waku) => waku.start().then(() => waku))
|
|
||||||
]);
|
|
||||||
|
|
||||||
nwaku = new NimGoNode(makeLogFileName(this));
|
|
||||||
await nwaku.start();
|
|
||||||
|
|
||||||
const nwakuMultiaddr = await nwaku.getMultiaddrWithId();
|
|
||||||
await Promise.all([
|
|
||||||
waku1.dial(nwakuMultiaddr),
|
|
||||||
waku2.dial(nwakuMultiaddr)
|
|
||||||
]);
|
|
||||||
|
|
||||||
// Wait for identify protocol to finish
|
|
||||||
await Promise.all([
|
|
||||||
waitForRemotePeer(waku1, [Protocols.Relay]),
|
|
||||||
waitForRemotePeer(waku2, [Protocols.Relay])
|
|
||||||
]);
|
|
||||||
|
|
||||||
await delay(2000);
|
|
||||||
// Check that the two JS peers are NOT directly connected
|
|
||||||
expect(await waku1.libp2p.peerStore.has(waku2.libp2p.peerId)).to.be
|
|
||||||
.false;
|
|
||||||
expect(waku2.libp2p.peerStore.has(waku1.libp2p.peerId)).to.be.false;
|
|
||||||
|
|
||||||
const msgStr = "Hello there!";
|
|
||||||
const message = { payload: utf8ToBytes(msgStr) };
|
|
||||||
|
|
||||||
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
|
||||||
(resolve) => {
|
|
||||||
void waku2.relay.subscribe(TestDecoder, resolve);
|
|
||||||
}
|
|
||||||
);
|
|
||||||
|
|
||||||
await waku1.relay.send(TestEncoder, message);
|
|
||||||
console.log("Waiting for message");
|
|
||||||
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
|
|
||||||
|
|
||||||
expect(waku2ReceivedMsg.payload).to.eq(msgStr);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
||||||
});
|
|
|
@ -0,0 +1,127 @@
|
||||||
|
import { createDecoder, createEncoder, DecodedMessage } from "@waku/core";
|
||||||
|
import { RelayNode } from "@waku/interfaces";
|
||||||
|
import {
|
||||||
|
createDecoder as createEciesDecoder,
|
||||||
|
createEncoder as createEciesEncoder,
|
||||||
|
generatePrivateKey,
|
||||||
|
getPublicKey
|
||||||
|
} from "@waku/message-encryption/ecies";
|
||||||
|
import {
|
||||||
|
createDecoder as createSymDecoder,
|
||||||
|
createEncoder as createSymEncoder,
|
||||||
|
generateSymmetricKey
|
||||||
|
} from "@waku/message-encryption/symmetric";
|
||||||
|
import { createRelayNode } from "@waku/sdk";
|
||||||
|
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import {
|
||||||
|
delay,
|
||||||
|
NOISE_KEY_1,
|
||||||
|
NOISE_KEY_2,
|
||||||
|
tearDownNodes
|
||||||
|
} from "../../src/index.js";
|
||||||
|
|
||||||
|
import { log, waitForAllRemotePeers } from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Relay", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku1: RelayNode;
|
||||||
|
let waku2: RelayNode;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(10000);
|
||||||
|
log.info("Starting JS Waku instances");
|
||||||
|
[waku1, waku2] = await Promise.all([
|
||||||
|
createRelayNode({ staticNoiseKey: NOISE_KEY_1 }).then((waku) =>
|
||||||
|
waku.start().then(() => waku)
|
||||||
|
),
|
||||||
|
createRelayNode({
|
||||||
|
staticNoiseKey: NOISE_KEY_2,
|
||||||
|
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||||
|
}).then((waku) => waku.start().then(() => waku))
|
||||||
|
]);
|
||||||
|
log.info("Instances started, adding waku2 to waku1's address book");
|
||||||
|
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||||
|
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||||
|
});
|
||||||
|
await waku1.dial(waku2.libp2p.peerId);
|
||||||
|
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
log.info("before each hook done");
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes([], [waku1, waku2]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Decrypt messages", async function () {
|
||||||
|
const asymText = "This message is encrypted using asymmetric";
|
||||||
|
const asymTopic = "/test/1/asymmetric/proto";
|
||||||
|
const symText = "This message is encrypted using symmetric encryption";
|
||||||
|
const symTopic = "/test/1/symmetric/proto";
|
||||||
|
|
||||||
|
const privateKey = generatePrivateKey();
|
||||||
|
const symKey = generateSymmetricKey();
|
||||||
|
const publicKey = getPublicKey(privateKey);
|
||||||
|
|
||||||
|
const eciesEncoder = createEciesEncoder({
|
||||||
|
contentTopic: asymTopic,
|
||||||
|
publicKey
|
||||||
|
});
|
||||||
|
const symEncoder = createSymEncoder({
|
||||||
|
contentTopic: symTopic,
|
||||||
|
symKey
|
||||||
|
});
|
||||||
|
|
||||||
|
const eciesDecoder = createEciesDecoder(asymTopic, privateKey);
|
||||||
|
const symDecoder = createSymDecoder(symTopic, symKey);
|
||||||
|
|
||||||
|
const msgs: DecodedMessage[] = [];
|
||||||
|
void waku2.relay.subscribe([eciesDecoder], (wakuMsg) => {
|
||||||
|
msgs.push(wakuMsg);
|
||||||
|
});
|
||||||
|
void waku2.relay.subscribe([symDecoder], (wakuMsg) => {
|
||||||
|
msgs.push(wakuMsg);
|
||||||
|
});
|
||||||
|
|
||||||
|
await waku1.relay.send(eciesEncoder, { payload: utf8ToBytes(asymText) });
|
||||||
|
await delay(200);
|
||||||
|
await waku1.relay.send(symEncoder, { payload: utf8ToBytes(symText) });
|
||||||
|
|
||||||
|
while (msgs.length < 2) {
|
||||||
|
await delay(200);
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(msgs[0].contentTopic).to.eq(asymTopic);
|
||||||
|
expect(bytesToUtf8(msgs[0].payload!)).to.eq(asymText);
|
||||||
|
expect(msgs[1].contentTopic).to.eq(symTopic);
|
||||||
|
expect(bytesToUtf8(msgs[1].payload!)).to.eq(symText);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Delete observer", async function () {
|
||||||
|
const messageText =
|
||||||
|
"Published on content topic with added then deleted observer";
|
||||||
|
|
||||||
|
const contentTopic = "added-then-deleted-observer";
|
||||||
|
|
||||||
|
// The promise **fails** if we receive a message on this observer.
|
||||||
|
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||||
|
(resolve, reject) => {
|
||||||
|
const deleteObserver = waku2.relay.subscribe(
|
||||||
|
[createDecoder(contentTopic)],
|
||||||
|
reject
|
||||||
|
) as () => void;
|
||||||
|
deleteObserver();
|
||||||
|
setTimeout(resolve, 500);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
await waku1.relay.send(createEncoder({ contentTopic }), {
|
||||||
|
payload: utf8ToBytes(messageText)
|
||||||
|
});
|
||||||
|
|
||||||
|
await receivedMsgPromise;
|
||||||
|
// If it does not throw then we are good.
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,169 @@
|
||||||
|
import type { PeerId } from "@libp2p/interface/peer-id";
|
||||||
|
import {
|
||||||
|
DecodedMessage,
|
||||||
|
DefaultPubSubTopic,
|
||||||
|
waitForRemotePeer
|
||||||
|
} from "@waku/core";
|
||||||
|
import { RelayNode } from "@waku/interfaces";
|
||||||
|
import { Protocols } from "@waku/interfaces";
|
||||||
|
import { createRelayNode } from "@waku/sdk";
|
||||||
|
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import {
|
||||||
|
delay,
|
||||||
|
makeLogFileName,
|
||||||
|
NOISE_KEY_1,
|
||||||
|
NOISE_KEY_2,
|
||||||
|
tearDownNodes
|
||||||
|
} from "../../src/index.js";
|
||||||
|
import { MessageRpcResponse } from "../../src/node/interfaces.js";
|
||||||
|
import { base64ToUtf8, NimGoNode } from "../../src/node/node.js";
|
||||||
|
|
||||||
|
import { TestContentTopic, TestDecoder, TestEncoder } from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Relay, Interop", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku: RelayNode;
|
||||||
|
let nwaku: NimGoNode;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(30000);
|
||||||
|
waku = await createRelayNode({
|
||||||
|
staticNoiseKey: NOISE_KEY_1
|
||||||
|
});
|
||||||
|
await waku.start();
|
||||||
|
|
||||||
|
nwaku = new NimGoNode(this.test?.ctx?.currentTest?.title + "");
|
||||||
|
await nwaku.start({ relay: true });
|
||||||
|
|
||||||
|
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||||
|
await waitForRemotePeer(waku, [Protocols.Relay]);
|
||||||
|
|
||||||
|
// Nwaku subscribe to the default pubsub topic
|
||||||
|
await nwaku.ensureSubscriptions();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes(nwaku, waku);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("nwaku subscribes", async function () {
|
||||||
|
let subscribers: PeerId[] = [];
|
||||||
|
|
||||||
|
while (subscribers.length === 0) {
|
||||||
|
await delay(200);
|
||||||
|
subscribers =
|
||||||
|
waku.libp2p.services.pubsub!.getSubscribers(DefaultPubSubTopic);
|
||||||
|
}
|
||||||
|
|
||||||
|
const nimPeerId = await nwaku.getPeerId();
|
||||||
|
expect(subscribers.map((p) => p.toString())).to.contain(
|
||||||
|
nimPeerId.toString()
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Publishes to nwaku", async function () {
|
||||||
|
const messageText = "This is a message";
|
||||||
|
await waku.relay.send(TestEncoder, { payload: utf8ToBytes(messageText) });
|
||||||
|
|
||||||
|
let msgs: MessageRpcResponse[] = [];
|
||||||
|
|
||||||
|
while (msgs.length === 0) {
|
||||||
|
await delay(200);
|
||||||
|
msgs = await nwaku.messages();
|
||||||
|
}
|
||||||
|
|
||||||
|
expect(msgs[0].contentTopic).to.equal(TestContentTopic);
|
||||||
|
expect(msgs[0].version).to.equal(0);
|
||||||
|
expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Nwaku publishes", async function () {
|
||||||
|
await delay(200);
|
||||||
|
|
||||||
|
const messageText = "Here is another message.";
|
||||||
|
|
||||||
|
const receivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||||
|
(resolve) => {
|
||||||
|
void waku.relay.subscribe<DecodedMessage>(TestDecoder, (msg) =>
|
||||||
|
resolve(msg)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
await nwaku.sendMessage(
|
||||||
|
NimGoNode.toMessageRpcQuery({
|
||||||
|
contentTopic: TestContentTopic,
|
||||||
|
payload: utf8ToBytes(messageText)
|
||||||
|
})
|
||||||
|
);
|
||||||
|
|
||||||
|
const receivedMsg = await receivedMsgPromise;
|
||||||
|
|
||||||
|
expect(receivedMsg.contentTopic).to.eq(TestContentTopic);
|
||||||
|
expect(receivedMsg.version!).to.eq(0);
|
||||||
|
expect(bytesToUtf8(receivedMsg.payload!)).to.eq(messageText);
|
||||||
|
});
|
||||||
|
|
||||||
|
describe("Two nodes connected to nwaku", function () {
|
||||||
|
let waku1: RelayNode;
|
||||||
|
let waku2: RelayNode;
|
||||||
|
let nwaku: NimGoNode;
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
await tearDownNodes(nwaku, [waku1, waku2]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Js publishes, other Js receives", async function () {
|
||||||
|
[waku1, waku2] = await Promise.all([
|
||||||
|
createRelayNode({
|
||||||
|
staticNoiseKey: NOISE_KEY_1,
|
||||||
|
emitSelf: true
|
||||||
|
}).then((waku) => waku.start().then(() => waku)),
|
||||||
|
createRelayNode({
|
||||||
|
staticNoiseKey: NOISE_KEY_2
|
||||||
|
}).then((waku) => waku.start().then(() => waku))
|
||||||
|
]);
|
||||||
|
|
||||||
|
nwaku = new NimGoNode(makeLogFileName(this));
|
||||||
|
await nwaku.start({ relay: true });
|
||||||
|
|
||||||
|
const nwakuMultiaddr = await nwaku.getMultiaddrWithId();
|
||||||
|
await Promise.all([
|
||||||
|
waku1.dial(nwakuMultiaddr),
|
||||||
|
waku2.dial(nwakuMultiaddr)
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Wait for identify protocol to finish
|
||||||
|
await Promise.all([
|
||||||
|
waitForRemotePeer(waku1, [Protocols.Relay]),
|
||||||
|
waitForRemotePeer(waku2, [Protocols.Relay])
|
||||||
|
]);
|
||||||
|
|
||||||
|
await delay(2000);
|
||||||
|
// Check that the two JS peers are NOT directly connected
|
||||||
|
expect(await waku1.libp2p.peerStore.has(waku2.libp2p.peerId)).to.eq(
|
||||||
|
false
|
||||||
|
);
|
||||||
|
expect(await waku2.libp2p.peerStore.has(waku1.libp2p.peerId)).to.eq(
|
||||||
|
false
|
||||||
|
);
|
||||||
|
|
||||||
|
const msgStr = "Hello there!";
|
||||||
|
const message = { payload: utf8ToBytes(msgStr) };
|
||||||
|
|
||||||
|
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||||
|
(resolve) => {
|
||||||
|
void waku2.relay.subscribe(TestDecoder, resolve);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
await waku1.relay.send(TestEncoder, message);
|
||||||
|
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
|
||||||
|
|
||||||
|
expect(bytesToUtf8(waku2ReceivedMsg.payload)).to.eq(msgStr);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,280 @@
|
||||||
|
import {
|
||||||
|
DecodedMessage,
|
||||||
|
DefaultPubSubTopic,
|
||||||
|
waitForRemotePeer
|
||||||
|
} from "@waku/core";
|
||||||
|
import { RelayNode } from "@waku/interfaces";
|
||||||
|
import { Protocols } from "@waku/interfaces";
|
||||||
|
import { createRelayNode } from "@waku/sdk";
|
||||||
|
import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import {
|
||||||
|
MessageCollector,
|
||||||
|
NOISE_KEY_1,
|
||||||
|
NOISE_KEY_2,
|
||||||
|
NOISE_KEY_3,
|
||||||
|
tearDownNodes
|
||||||
|
} from "../../src/index.js";
|
||||||
|
|
||||||
|
import {
|
||||||
|
CustomContentTopic,
|
||||||
|
CustomDecoder,
|
||||||
|
CustomEncoder,
|
||||||
|
CustomPubSubTopic,
|
||||||
|
TestContentTopic,
|
||||||
|
TestDecoder,
|
||||||
|
TestEncoder
|
||||||
|
} from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Relay, multiple pubsub topics", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku1: RelayNode;
|
||||||
|
let waku2: RelayNode;
|
||||||
|
let waku3: RelayNode;
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes([], [waku1, waku2, waku3]);
|
||||||
|
});
|
||||||
|
|
||||||
|
[
|
||||||
|
{
|
||||||
|
pubsub: CustomPubSubTopic,
|
||||||
|
encoder: CustomEncoder,
|
||||||
|
decoder: CustomDecoder
|
||||||
|
},
|
||||||
|
{
|
||||||
|
pubsub: DefaultPubSubTopic,
|
||||||
|
encoder: TestEncoder,
|
||||||
|
decoder: TestDecoder
|
||||||
|
}
|
||||||
|
].forEach((testItem) => {
|
||||||
|
it(`3 nodes on ${testItem.pubsub} topic`, async function () {
|
||||||
|
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
|
||||||
|
.fill(null)
|
||||||
|
.map(() => new MessageCollector());
|
||||||
|
|
||||||
|
[waku1, waku2, waku3] = await Promise.all([
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [testItem.pubsub],
|
||||||
|
staticNoiseKey: NOISE_KEY_1
|
||||||
|
}).then((waku) => waku.start().then(() => waku)),
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [testItem.pubsub],
|
||||||
|
staticNoiseKey: NOISE_KEY_2,
|
||||||
|
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||||
|
}).then((waku) => waku.start().then(() => waku)),
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [testItem.pubsub],
|
||||||
|
staticNoiseKey: NOISE_KEY_3
|
||||||
|
}).then((waku) => waku.start().then(() => waku))
|
||||||
|
]);
|
||||||
|
|
||||||
|
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||||
|
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||||
|
});
|
||||||
|
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||||
|
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||||
|
});
|
||||||
|
await Promise.all([
|
||||||
|
waku1.dial(waku2.libp2p.peerId),
|
||||||
|
waku3.dial(waku2.libp2p.peerId)
|
||||||
|
]);
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
waitForRemotePeer(waku1, [Protocols.Relay]),
|
||||||
|
waitForRemotePeer(waku2, [Protocols.Relay]),
|
||||||
|
waitForRemotePeer(waku3, [Protocols.Relay])
|
||||||
|
]);
|
||||||
|
|
||||||
|
await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback);
|
||||||
|
await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback);
|
||||||
|
await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback);
|
||||||
|
|
||||||
|
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
|
||||||
|
const relayResponse1 = await waku1.relay.send(testItem.encoder, {
|
||||||
|
payload: utf8ToBytes("M1")
|
||||||
|
});
|
||||||
|
const relayResponse2 = await waku2.relay.send(testItem.encoder, {
|
||||||
|
payload: utf8ToBytes("M2")
|
||||||
|
});
|
||||||
|
const relayResponse3 = await waku3.relay.send(testItem.encoder, {
|
||||||
|
payload: utf8ToBytes("M3")
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(relayResponse1.recipients[0].toString()).to.eq(
|
||||||
|
waku2.libp2p.peerId.toString()
|
||||||
|
);
|
||||||
|
expect(relayResponse3.recipients[0].toString()).to.eq(
|
||||||
|
waku2.libp2p.peerId.toString()
|
||||||
|
);
|
||||||
|
expect(relayResponse2.recipients.map((r) => r.toString())).to.include(
|
||||||
|
waku1.libp2p.peerId.toString()
|
||||||
|
);
|
||||||
|
expect(relayResponse2.recipients.map((r) => r.toString())).to.include(
|
||||||
|
waku3.libp2p.peerId.toString()
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(await msgCollector1.waitForMessages(2, { exact: true })).to.eq(
|
||||||
|
true
|
||||||
|
);
|
||||||
|
expect(await msgCollector2.waitForMessages(2, { exact: true })).to.eq(
|
||||||
|
true
|
||||||
|
);
|
||||||
|
expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(
|
||||||
|
true
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(
|
||||||
|
msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2")
|
||||||
|
).to.eq(true);
|
||||||
|
expect(
|
||||||
|
msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3")
|
||||||
|
).to.eq(true);
|
||||||
|
expect(
|
||||||
|
msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1")
|
||||||
|
).to.eq(true);
|
||||||
|
expect(
|
||||||
|
msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3")
|
||||||
|
).to.eq(true);
|
||||||
|
expect(
|
||||||
|
msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1")
|
||||||
|
).to.eq(true);
|
||||||
|
expect(
|
||||||
|
msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2")
|
||||||
|
).to.eq(true);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Nodes with multiple pubsub topic", async function () {
|
||||||
|
const [msgCollector1, msgCollector2, msgCollector3] = Array(3)
|
||||||
|
.fill(null)
|
||||||
|
.map(() => new MessageCollector());
|
||||||
|
|
||||||
|
// Waku1 and waku2 are using multiple pubsub topis
|
||||||
|
[waku1, waku2, waku3] = await Promise.all([
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [DefaultPubSubTopic, CustomPubSubTopic],
|
||||||
|
staticNoiseKey: NOISE_KEY_1
|
||||||
|
}).then((waku) => waku.start().then(() => waku)),
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [DefaultPubSubTopic, CustomPubSubTopic],
|
||||||
|
staticNoiseKey: NOISE_KEY_2,
|
||||||
|
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||||
|
}).then((waku) => waku.start().then(() => waku)),
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [DefaultPubSubTopic],
|
||||||
|
staticNoiseKey: NOISE_KEY_3
|
||||||
|
}).then((waku) => waku.start().then(() => waku))
|
||||||
|
]);
|
||||||
|
|
||||||
|
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||||
|
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||||
|
});
|
||||||
|
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||||
|
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||||
|
});
|
||||||
|
await Promise.all([
|
||||||
|
waku1.dial(waku2.libp2p.peerId),
|
||||||
|
waku3.dial(waku2.libp2p.peerId)
|
||||||
|
]);
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
waitForRemotePeer(waku1, [Protocols.Relay]),
|
||||||
|
waitForRemotePeer(waku2, [Protocols.Relay]),
|
||||||
|
waitForRemotePeer(waku3, [Protocols.Relay])
|
||||||
|
]);
|
||||||
|
|
||||||
|
await waku1.relay.subscribe(
|
||||||
|
[TestDecoder, CustomDecoder],
|
||||||
|
msgCollector1.callback
|
||||||
|
);
|
||||||
|
await waku2.relay.subscribe(
|
||||||
|
[TestDecoder, CustomDecoder],
|
||||||
|
msgCollector2.callback
|
||||||
|
);
|
||||||
|
await waku3.relay.subscribe([TestDecoder], msgCollector3.callback);
|
||||||
|
|
||||||
|
// The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network
|
||||||
|
// However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic
|
||||||
|
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") });
|
||||||
|
await waku1.relay.send(CustomEncoder, { payload: utf8ToBytes("M2") });
|
||||||
|
await waku2.relay.send(TestEncoder, { payload: utf8ToBytes("M3") });
|
||||||
|
await waku2.relay.send(CustomEncoder, { payload: utf8ToBytes("M4") });
|
||||||
|
await waku3.relay.send(TestEncoder, { payload: utf8ToBytes("M5") });
|
||||||
|
await waku3.relay.send(CustomEncoder, { payload: utf8ToBytes("M6") });
|
||||||
|
|
||||||
|
expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq(true);
|
||||||
|
expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq(true);
|
||||||
|
expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(true);
|
||||||
|
expect(msgCollector1.hasMessage(TestContentTopic, "M3")).to.eq(true);
|
||||||
|
expect(msgCollector1.hasMessage(CustomContentTopic, "M4")).to.eq(true);
|
||||||
|
expect(msgCollector1.hasMessage(TestContentTopic, "M5")).to.eq(true);
|
||||||
|
expect(msgCollector2.hasMessage(TestContentTopic, "M1")).to.eq(true);
|
||||||
|
expect(msgCollector2.hasMessage(CustomContentTopic, "M2")).to.eq(true);
|
||||||
|
expect(msgCollector2.hasMessage(TestContentTopic, "M5")).to.eq(true);
|
||||||
|
expect(msgCollector3.hasMessage(TestContentTopic, "M1")).to.eq(true);
|
||||||
|
expect(msgCollector3.hasMessage(TestContentTopic, "M3")).to.eq(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () {
|
||||||
|
[waku1, waku2, waku3] = await Promise.all([
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [CustomPubSubTopic],
|
||||||
|
staticNoiseKey: NOISE_KEY_1
|
||||||
|
}).then((waku) => waku.start().then(() => waku)),
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [CustomPubSubTopic],
|
||||||
|
staticNoiseKey: NOISE_KEY_2,
|
||||||
|
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||||
|
}).then((waku) => waku.start().then(() => waku)),
|
||||||
|
createRelayNode({
|
||||||
|
staticNoiseKey: NOISE_KEY_3
|
||||||
|
}).then((waku) => waku.start().then(() => waku))
|
||||||
|
]);
|
||||||
|
|
||||||
|
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||||
|
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||||
|
});
|
||||||
|
await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||||
|
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||||
|
});
|
||||||
|
await Promise.all([
|
||||||
|
waku1.dial(waku2.libp2p.peerId),
|
||||||
|
waku3.dial(waku2.libp2p.peerId)
|
||||||
|
]);
|
||||||
|
|
||||||
|
await Promise.all([
|
||||||
|
waitForRemotePeer(waku1, [Protocols.Relay]),
|
||||||
|
waitForRemotePeer(waku2, [Protocols.Relay])
|
||||||
|
]);
|
||||||
|
|
||||||
|
const messageText = "Communicating using a custom pubsub topic";
|
||||||
|
|
||||||
|
const waku2ReceivedMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||||
|
(resolve) => {
|
||||||
|
void waku2.relay.subscribe([CustomDecoder], resolve);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
// The promise **fails** if we receive a message on the default
|
||||||
|
// pubsub topic.
|
||||||
|
const waku3NoMsgPromise: Promise<DecodedMessage> = new Promise(
|
||||||
|
(resolve, reject) => {
|
||||||
|
void waku3.relay.subscribe([TestDecoder], reject);
|
||||||
|
setTimeout(resolve, 1000);
|
||||||
|
}
|
||||||
|
);
|
||||||
|
|
||||||
|
await waku1.relay.send(CustomEncoder, {
|
||||||
|
payload: utf8ToBytes(messageText)
|
||||||
|
});
|
||||||
|
|
||||||
|
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
|
||||||
|
await waku3NoMsgPromise;
|
||||||
|
|
||||||
|
expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText);
|
||||||
|
expect(waku2ReceivedMsg.pubsubTopic).to.eq(CustomPubSubTopic);
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,256 @@
|
||||||
|
import { createEncoder, DefaultPubSubTopic } from "@waku/core";
|
||||||
|
import { IRateLimitProof, RelayNode, SendError } from "@waku/interfaces";
|
||||||
|
import { createRelayNode } from "@waku/sdk";
|
||||||
|
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import {
|
||||||
|
delay,
|
||||||
|
MessageCollector,
|
||||||
|
NOISE_KEY_1,
|
||||||
|
NOISE_KEY_2,
|
||||||
|
tearDownNodes,
|
||||||
|
TEST_STRING
|
||||||
|
} from "../../src/index.js";
|
||||||
|
import { generateRandomUint8Array } from "../../src/random_array.js";
|
||||||
|
|
||||||
|
import {
|
||||||
|
log,
|
||||||
|
messageText,
|
||||||
|
TestContentTopic,
|
||||||
|
TestDecoder,
|
||||||
|
TestEncoder,
|
||||||
|
waitForAllRemotePeers
|
||||||
|
} from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Relay, Publish", function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
let waku1: RelayNode;
|
||||||
|
let waku2: RelayNode;
|
||||||
|
let messageCollector: MessageCollector;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(10000);
|
||||||
|
log.info("Starting JS Waku instances");
|
||||||
|
[waku1, waku2] = await Promise.all([
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [DefaultPubSubTopic],
|
||||||
|
staticNoiseKey: NOISE_KEY_1
|
||||||
|
}).then((waku) => waku.start().then(() => waku)),
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [DefaultPubSubTopic],
|
||||||
|
staticNoiseKey: NOISE_KEY_2,
|
||||||
|
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||||
|
}).then((waku) => waku.start().then(() => waku))
|
||||||
|
]);
|
||||||
|
log.info("Instances started, adding waku2 to waku1's address book");
|
||||||
|
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||||
|
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||||
|
});
|
||||||
|
await waku1.dial(waku2.libp2p.peerId);
|
||||||
|
log.info("before each hook done");
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
messageCollector = new MessageCollector();
|
||||||
|
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes([], [waku1, waku2]);
|
||||||
|
});
|
||||||
|
|
||||||
|
TEST_STRING.forEach((testItem) => {
|
||||||
|
it(`Check publish message containing ${testItem.description}`, async function () {
|
||||||
|
const pushResponse = await waku1.relay.send(TestEncoder, {
|
||||||
|
payload: utf8ToBytes(testItem.value)
|
||||||
|
});
|
||||||
|
expect(pushResponse.recipients.length).to.eq(1);
|
||||||
|
expect(pushResponse.recipients[0].toString()).to.eq(
|
||||||
|
waku2.libp2p.peerId.toString()
|
||||||
|
);
|
||||||
|
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||||
|
messageCollector.verifyReceivedMessage(0, {
|
||||||
|
expectedMessageText: testItem.value,
|
||||||
|
expectedContentTopic: TestContentTopic
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
[
|
||||||
|
new Date("1995-12-17T03:24:00"),
|
||||||
|
new Date(Date.now() - 3600000 * 24 * 356),
|
||||||
|
new Date(Date.now() - 3600000),
|
||||||
|
new Date(Date.now() + 3600000)
|
||||||
|
].forEach((testItem) => {
|
||||||
|
it(`Publish message with custom timestamp: ${testItem}`, async function () {
|
||||||
|
const pushResponse = await waku1.relay.send(TestEncoder, {
|
||||||
|
payload: utf8ToBytes(messageText),
|
||||||
|
timestamp: testItem
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(pushResponse.recipients.length).to.eq(1);
|
||||||
|
expect(pushResponse.recipients[0].toString()).to.eq(
|
||||||
|
waku2.libp2p.peerId.toString()
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||||
|
|
||||||
|
messageCollector.verifyReceivedMessage(0, {
|
||||||
|
expectedMessageText: messageText,
|
||||||
|
expectedContentTopic: TestContentTopic,
|
||||||
|
expectedTimestamp: testItem.valueOf()
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Fails to publish duplicate message", async function () {
|
||||||
|
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("m1") });
|
||||||
|
try {
|
||||||
|
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("m1") });
|
||||||
|
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("m1") });
|
||||||
|
expect.fail("Expected an error but didn't get one");
|
||||||
|
} catch (error) {
|
||||||
|
expect((error as Error).message).to.include("PublishError.Duplicate");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Fails to publish message with empty text", async function () {
|
||||||
|
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("") });
|
||||||
|
await delay(400);
|
||||||
|
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Fails to publish message with wrong content topic", async function () {
|
||||||
|
const wrong_encoder = createEncoder({ contentTopic: "wrong" });
|
||||||
|
await waku1.relay.send(wrong_encoder, {
|
||||||
|
payload: utf8ToBytes("")
|
||||||
|
});
|
||||||
|
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Fails to publish message with wrong pubsubtopic", async function () {
|
||||||
|
const wrong_encoder = createEncoder({
|
||||||
|
pubsubTopic: "wrong",
|
||||||
|
contentTopic: TestContentTopic
|
||||||
|
});
|
||||||
|
const pushResponse = await waku1.relay.send(wrong_encoder, {
|
||||||
|
payload: utf8ToBytes("")
|
||||||
|
});
|
||||||
|
expect(pushResponse.errors?.[0]).to.eq("Topic not configured");
|
||||||
|
await delay(400);
|
||||||
|
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Publish message with size of 1 MB", async function () {
|
||||||
|
const pushResponse = await waku1.relay.send(TestEncoder, {
|
||||||
|
payload: generateRandomUint8Array(1024 ** 2)
|
||||||
|
});
|
||||||
|
expect(pushResponse.recipients.length).to.eq(1);
|
||||||
|
expect(pushResponse.recipients[0].toString()).to.eq(
|
||||||
|
waku2.libp2p.peerId.toString()
|
||||||
|
);
|
||||||
|
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
[1024 ** 2 + 65536, 2 * 1024 ** 2].forEach((testItem) => {
|
||||||
|
it("Fails to publish message with size larger than 1 MB", async function () {
|
||||||
|
const pushResponse = await waku1.relay.send(TestEncoder, {
|
||||||
|
payload: generateRandomUint8Array(testItem)
|
||||||
|
});
|
||||||
|
expect(pushResponse.recipients.length).to.eq(0);
|
||||||
|
expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG);
|
||||||
|
await delay(400);
|
||||||
|
expect(await messageCollector.waitForMessages(1)).to.eq(false);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
|
||||||
|
it.skip("Check publish message after service node is restarted", async function () {
|
||||||
|
await waku1.relay.send(TestEncoder, {
|
||||||
|
payload: utf8ToBytes("m1")
|
||||||
|
});
|
||||||
|
|
||||||
|
// Restart js-waku node
|
||||||
|
await waku1.stop();
|
||||||
|
expect(waku1.isStarted()).to.eq(false);
|
||||||
|
await waku1.start();
|
||||||
|
expect(waku1.isStarted()).to.eq(true);
|
||||||
|
await waku1.dial(waku2.libp2p.peerId);
|
||||||
|
|
||||||
|
// Redo the connection and create a new subscription
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
const pushResponse = await waku1.relay.send(TestEncoder, {
|
||||||
|
payload: utf8ToBytes("m2")
|
||||||
|
});
|
||||||
|
expect(pushResponse.recipients.length).to.eq(1);
|
||||||
|
expect(pushResponse.recipients[0].toString()).to.eq(
|
||||||
|
waku2.libp2p.peerId.toString()
|
||||||
|
);
|
||||||
|
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done
|
||||||
|
it.skip("Check publish message after client node is restarted", async function () {
|
||||||
|
await waku1.relay.send(TestEncoder, {
|
||||||
|
payload: utf8ToBytes("m1")
|
||||||
|
});
|
||||||
|
|
||||||
|
// Restart js-waku node
|
||||||
|
await waku2.stop();
|
||||||
|
expect(waku2.isStarted()).to.eq(false);
|
||||||
|
await waku2.start();
|
||||||
|
expect(waku2.isStarted()).to.eq(true);
|
||||||
|
await waku1.dial(waku2.libp2p.peerId);
|
||||||
|
|
||||||
|
// Redo the connection and create a new subscription
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
const pushResponse = await waku1.relay.send(TestEncoder, {
|
||||||
|
payload: utf8ToBytes("m2")
|
||||||
|
});
|
||||||
|
expect(pushResponse.recipients.length).to.eq(1);
|
||||||
|
expect(pushResponse.recipients[0].toString()).to.eq(
|
||||||
|
waku2.libp2p.peerId.toString()
|
||||||
|
);
|
||||||
|
expect(await messageCollector.waitForMessages(2)).to.eq(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Publish message with large meta", async function () {
|
||||||
|
const customTestEncoder = createEncoder({
|
||||||
|
contentTopic: TestContentTopic,
|
||||||
|
metaSetter: () => new Uint8Array(10 ** 6)
|
||||||
|
});
|
||||||
|
|
||||||
|
const pushResponse = await waku1.relay.send(customTestEncoder, {
|
||||||
|
payload: utf8ToBytes(messageText)
|
||||||
|
});
|
||||||
|
expect(pushResponse.recipients.length).to.eq(1);
|
||||||
|
expect(pushResponse.recipients[0].toString()).to.eq(
|
||||||
|
waku2.libp2p.peerId.toString()
|
||||||
|
);
|
||||||
|
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Publish message with rate limit", async function () {
|
||||||
|
const rateLimitProof: IRateLimitProof = {
|
||||||
|
proof: utf8ToBytes("proofData"),
|
||||||
|
merkleRoot: utf8ToBytes("merkleRootData"),
|
||||||
|
epoch: utf8ToBytes("epochData"),
|
||||||
|
shareX: utf8ToBytes("shareXData"),
|
||||||
|
shareY: utf8ToBytes("shareYData"),
|
||||||
|
nullifier: utf8ToBytes("nullifierData"),
|
||||||
|
rlnIdentifier: utf8ToBytes("rlnIdentifierData")
|
||||||
|
};
|
||||||
|
|
||||||
|
const pushResponse = await waku1.relay.send(TestEncoder, {
|
||||||
|
payload: utf8ToBytes(messageText),
|
||||||
|
rateLimitProof: rateLimitProof
|
||||||
|
});
|
||||||
|
expect(pushResponse.recipients.length).to.eq(1);
|
||||||
|
|
||||||
|
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||||
|
messageCollector.verifyReceivedMessage(0, {
|
||||||
|
expectedMessageText: messageText,
|
||||||
|
expectedContentTopic: TestContentTopic
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,279 @@
|
||||||
|
import { createDecoder, createEncoder, DefaultPubSubTopic } from "@waku/core";
|
||||||
|
import { RelayNode } from "@waku/interfaces";
|
||||||
|
import { createRelayNode } from "@waku/sdk";
|
||||||
|
import { utf8ToBytes } from "@waku/utils/bytes";
|
||||||
|
import { expect } from "chai";
|
||||||
|
|
||||||
|
import {
|
||||||
|
generateTestData,
|
||||||
|
MessageCollector,
|
||||||
|
NOISE_KEY_1,
|
||||||
|
NOISE_KEY_2,
|
||||||
|
tearDownNodes,
|
||||||
|
TEST_STRING
|
||||||
|
} from "../../src/index.js";
|
||||||
|
|
||||||
|
import {
|
||||||
|
log,
|
||||||
|
messageText,
|
||||||
|
TestContentTopic,
|
||||||
|
TestDecoder,
|
||||||
|
TestEncoder,
|
||||||
|
waitForAllRemotePeers
|
||||||
|
} from "./utils.js";
|
||||||
|
|
||||||
|
describe("Waku Relay, Subscribe", function () {
|
||||||
|
this.timeout(40000);
|
||||||
|
let waku1: RelayNode;
|
||||||
|
let waku2: RelayNode;
|
||||||
|
let messageCollector: MessageCollector;
|
||||||
|
|
||||||
|
beforeEach(async function () {
|
||||||
|
this.timeout(10000);
|
||||||
|
log.info("Starting JS Waku instances");
|
||||||
|
[waku1, waku2] = await Promise.all([
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [DefaultPubSubTopic],
|
||||||
|
staticNoiseKey: NOISE_KEY_1
|
||||||
|
}).then((waku) => waku.start().then(() => waku)),
|
||||||
|
createRelayNode({
|
||||||
|
pubsubTopics: [DefaultPubSubTopic],
|
||||||
|
staticNoiseKey: NOISE_KEY_2,
|
||||||
|
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }
|
||||||
|
}).then((waku) => waku.start().then(() => waku))
|
||||||
|
]);
|
||||||
|
log.info("Instances started, adding waku2 to waku1's address book");
|
||||||
|
await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, {
|
||||||
|
multiaddrs: waku2.libp2p.getMultiaddrs()
|
||||||
|
});
|
||||||
|
await waku1.dial(waku2.libp2p.peerId);
|
||||||
|
log.info("before each hook done");
|
||||||
|
messageCollector = new MessageCollector();
|
||||||
|
});
|
||||||
|
|
||||||
|
afterEach(async function () {
|
||||||
|
this.timeout(15000);
|
||||||
|
await tearDownNodes([], [waku1, waku2]);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Mutual subscription", async function () {
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
const subscribers1 = waku1.libp2p.services
|
||||||
|
.pubsub!.getSubscribers(DefaultPubSubTopic)
|
||||||
|
.map((p) => p.toString());
|
||||||
|
const subscribers2 = waku2.libp2p.services
|
||||||
|
.pubsub!.getSubscribers(DefaultPubSubTopic)
|
||||||
|
.map((p) => p.toString());
|
||||||
|
|
||||||
|
expect(subscribers1).to.contain(waku2.libp2p.peerId.toString());
|
||||||
|
expect(subscribers2).to.contain(waku1.libp2p.peerId.toString());
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Register correct protocols", async function () {
|
||||||
|
const protocols = waku1.libp2p.getProtocols();
|
||||||
|
|
||||||
|
expect(protocols).to.contain("/vac/waku/relay/2.0.0");
|
||||||
|
expect(protocols.findIndex((value) => value.match(/sub/))).to.eq(-1);
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Publish without waiting for remote peer", async function () {
|
||||||
|
try {
|
||||||
|
await waku1.relay.send(TestEncoder, {
|
||||||
|
payload: utf8ToBytes(messageText)
|
||||||
|
});
|
||||||
|
throw new Error("Publish was successful but was expected to fail");
|
||||||
|
} catch (err) {
|
||||||
|
if (
|
||||||
|
!(err instanceof Error) ||
|
||||||
|
!err.message.includes("PublishError.InsufficientPeers")
|
||||||
|
) {
|
||||||
|
throw err;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Subscribe and publish message", async function () {
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
|
||||||
|
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes(messageText) });
|
||||||
|
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||||
|
messageCollector.verifyReceivedMessage(0, {
|
||||||
|
expectedMessageText: messageText,
|
||||||
|
expectedContentTopic: TestContentTopic
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Subscribe and publish 10000 messages on the same topic", async function () {
|
||||||
|
const messageCount = 10000;
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
|
||||||
|
// Send a unique message on each topic.
|
||||||
|
for (let i = 0; i < messageCount; i++) {
|
||||||
|
await waku1.relay.send(TestEncoder, {
|
||||||
|
payload: utf8ToBytes(`M${i + 1}`)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that each message was received on the corresponding topic.
|
||||||
|
expect(
|
||||||
|
await messageCollector.waitForMessages(messageCount, { exact: true })
|
||||||
|
).to.eq(true);
|
||||||
|
|
||||||
|
for (let i = 0; i < messageCount; i++) {
|
||||||
|
messageCollector.verifyReceivedMessage(i, {
|
||||||
|
expectedMessageText: `M${i + 1}`,
|
||||||
|
expectedContentTopic: TestContentTopic,
|
||||||
|
checkTimestamp: false
|
||||||
|
});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Subscribe and publish messages on 2 different content topics", async function () {
|
||||||
|
const secondContentTopic = "/test/2/waku-relay/utf8";
|
||||||
|
const secondEncoder = createEncoder({ contentTopic: secondContentTopic });
|
||||||
|
const secondDecoder = createDecoder(secondContentTopic);
|
||||||
|
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
|
||||||
|
await waku2.relay.subscribe([secondDecoder], messageCollector.callback);
|
||||||
|
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") });
|
||||||
|
await waku1.relay.send(secondEncoder, { payload: utf8ToBytes("M2") });
|
||||||
|
expect(await messageCollector.waitForMessages(2, { exact: true })).to.eq(
|
||||||
|
true
|
||||||
|
);
|
||||||
|
messageCollector.verifyReceivedMessage(0, {
|
||||||
|
expectedMessageText: "M1",
|
||||||
|
expectedContentTopic: TestContentTopic
|
||||||
|
});
|
||||||
|
messageCollector.verifyReceivedMessage(1, {
|
||||||
|
expectedMessageText: "M2",
|
||||||
|
expectedContentTopic: secondContentTopic
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Subscribe one by one to 100 topics and publish messages", async function () {
|
||||||
|
const topicCount = 100;
|
||||||
|
const td = generateTestData(topicCount);
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
|
||||||
|
// Subscribe to topics one by one
|
||||||
|
for (let i = 0; i < topicCount; i++) {
|
||||||
|
await waku2.relay.subscribe([td.decoders[i]], messageCollector.callback);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send a unique message on each topic.
|
||||||
|
for (let i = 0; i < topicCount; i++) {
|
||||||
|
await waku1.relay.send(td.encoders[i], {
|
||||||
|
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that each message was received on the corresponding topic.
|
||||||
|
expect(
|
||||||
|
await messageCollector.waitForMessages(topicCount, { exact: true })
|
||||||
|
).to.eq(true);
|
||||||
|
td.contentTopics.forEach((topic, index) => {
|
||||||
|
messageCollector.verifyReceivedMessage(index, {
|
||||||
|
expectedContentTopic: topic,
|
||||||
|
expectedMessageText: `Message for Topic ${index + 1}`
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
it("Subscribe at once to 10000 topics and publish messages", async function () {
|
||||||
|
const topicCount = 10000;
|
||||||
|
const td = generateTestData(topicCount);
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
|
||||||
|
// Subscribe to all topics at once
|
||||||
|
await waku2.relay.subscribe(td.decoders, messageCollector.callback);
|
||||||
|
|
||||||
|
// Send a unique message on each topic.
|
||||||
|
for (let i = 0; i < topicCount; i++) {
|
||||||
|
await waku1.relay.send(td.encoders[i], {
|
||||||
|
payload: utf8ToBytes(`Message for Topic ${i + 1}`)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify that each message was received on the corresponding topic.
|
||||||
|
expect(
|
||||||
|
await messageCollector.waitForMessages(topicCount, { exact: true })
|
||||||
|
).to.eq(true);
|
||||||
|
td.contentTopics.forEach((topic, index) => {
|
||||||
|
messageCollector.verifyReceivedMessage(index, {
|
||||||
|
expectedContentTopic: topic,
|
||||||
|
expectedMessageText: `Message for Topic ${index + 1}`,
|
||||||
|
checkTimestamp: false
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
// Will be skipped until https://github.com/waku-org/js-waku/issues/1678 is fixed
|
||||||
|
it.skip("Refresh subscription", async function () {
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
|
||||||
|
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
|
||||||
|
await waku2.relay.subscribe([TestDecoder], messageCollector.callback);
|
||||||
|
|
||||||
|
await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") });
|
||||||
|
|
||||||
|
expect(await messageCollector.waitForMessages(1, { exact: true })).to.eq(
|
||||||
|
true
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Will be skipped until https://github.com/waku-org/js-waku/issues/1678 is fixed
|
||||||
|
it.skip("Overlapping topic subscription", async function () {
|
||||||
|
// Define two sets of test data with overlapping topics.
|
||||||
|
const topicCount1 = 2;
|
||||||
|
const td1 = generateTestData(topicCount1);
|
||||||
|
const topicCount2 = 4;
|
||||||
|
const td2 = generateTestData(topicCount2);
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
|
||||||
|
// Subscribe to the first set of topics.
|
||||||
|
await waku2.relay.subscribe(td1.decoders, messageCollector.callback);
|
||||||
|
// Subscribe to the second set of topics which has overlapping topics with the first set.
|
||||||
|
await waku2.relay.subscribe(td2.decoders, messageCollector.callback);
|
||||||
|
|
||||||
|
// Send messages to the first set of topics.
|
||||||
|
for (let i = 0; i < topicCount1; i++) {
|
||||||
|
const messageText = `Message for Topic ${i + 1}`;
|
||||||
|
await waku1.relay.send(td1.encoders[i], {
|
||||||
|
payload: utf8ToBytes(messageText)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send messages to the second set of topics.
|
||||||
|
for (let i = 0; i < topicCount2; i++) {
|
||||||
|
const messageText = `Message for Topic ${i + 3}`;
|
||||||
|
await waku1.relay.send(td2.encoders[i], {
|
||||||
|
payload: utf8ToBytes(messageText)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if all messages were received.
|
||||||
|
// Since there are overlapping topics, there should be 6 messages in total (2 from the first set + 4 from the second set).
|
||||||
|
expect(await messageCollector.waitForMessages(6, { exact: true })).to.eq(
|
||||||
|
true
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
|
TEST_STRING.forEach((testItem) => {
|
||||||
|
it(`Subscribe to topic containing ${testItem.description} and publish message`, async function () {
|
||||||
|
const newContentTopic = testItem.value;
|
||||||
|
const newEncoder = createEncoder({ contentTopic: newContentTopic });
|
||||||
|
const newDecoder = createDecoder(newContentTopic);
|
||||||
|
await waitForAllRemotePeers(waku1, waku2);
|
||||||
|
await waku2.relay.subscribe([newDecoder], messageCollector.callback);
|
||||||
|
await waku1.relay.send(newEncoder, {
|
||||||
|
payload: utf8ToBytes(messageText)
|
||||||
|
});
|
||||||
|
expect(await messageCollector.waitForMessages(1)).to.eq(true);
|
||||||
|
messageCollector.verifyReceivedMessage(0, {
|
||||||
|
expectedMessageText: messageText,
|
||||||
|
expectedContentTopic: newContentTopic
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
||||||
|
});
|
|
@ -0,0 +1,29 @@
|
||||||
|
import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core";
|
||||||
|
import { Protocols, RelayNode } from "@waku/interfaces";
|
||||||
|
import { Logger } from "@waku/utils";
|
||||||
|
|
||||||
|
export const messageText = "Relay works!";
|
||||||
|
export const TestContentTopic = "/test/1/waku-relay/utf8";
|
||||||
|
export const TestEncoder = createEncoder({ contentTopic: TestContentTopic });
|
||||||
|
export const TestDecoder = createDecoder(TestContentTopic);
|
||||||
|
export const CustomContentTopic = "/test/2/waku-relay/utf8";
|
||||||
|
export const CustomPubSubTopic = "/some/pubsub/topic";
|
||||||
|
export const CustomEncoder = createEncoder({
|
||||||
|
contentTopic: CustomContentTopic,
|
||||||
|
pubsubTopic: CustomPubSubTopic
|
||||||
|
});
|
||||||
|
export const CustomDecoder = createDecoder(
|
||||||
|
CustomContentTopic,
|
||||||
|
CustomPubSubTopic
|
||||||
|
);
|
||||||
|
|
||||||
|
export const log = new Logger("test:relay");
|
||||||
|
|
||||||
|
export async function waitForAllRemotePeers(
|
||||||
|
...nodes: RelayNode[]
|
||||||
|
): Promise<void> {
|
||||||
|
log.info("Wait for mutual pubsub subscription");
|
||||||
|
await Promise.all(
|
||||||
|
nodes.map((node) => waitForRemotePeer(node, [Protocols.Relay]))
|
||||||
|
);
|
||||||
|
}
|
Loading…
Reference in New Issue