diff --git a/packages/tests/src/constants.ts b/packages/tests/src/constants.ts index 1efa573f8b..31b31fc056 100644 --- a/packages/tests/src/constants.ts +++ b/packages/tests/src/constants.ts @@ -50,7 +50,8 @@ export const TEST_STRING = [ { description: "JSON", value: '{"user":"admin","password":"123456"}' }, { description: "shell command", value: "`rm -rf /`" }, { description: "escaped characters", value: "\\n\\t\\0" }, - { description: "unicode special characters", value: "\u202Ereverse" } + { description: "unicode special characters", value: "\u202Ereverse" }, + { description: "emoji", value: "🤫 🤥 😶 😶‍🌫️ 😐 😑 😬 🫨 🫠 🙄 😯 😦 😧 😮" } ]; export const TEST_TIMESTAMPS = [ diff --git a/packages/tests/src/index.ts b/packages/tests/src/index.ts index 746345a2b0..d56386239b 100644 --- a/packages/tests/src/index.ts +++ b/packages/tests/src/index.ts @@ -12,3 +12,4 @@ export * from "./log_file.js"; export * from "./node/node.js"; export * from "./teardown.js"; export * from "./message_collector.js"; +export * from "./utils.js"; diff --git a/packages/tests/src/message_collector.ts b/packages/tests/src/message_collector.ts index 476b97c6ed..5f7a473688 100644 --- a/packages/tests/src/message_collector.ts +++ b/packages/tests/src/message_collector.ts @@ -113,7 +113,7 @@ export class MessageCollector { expectedVersion?: number; expectedMeta?: Uint8Array; expectedEphemeral?: boolean; - expectedTimestamp?: bigint; + expectedTimestamp?: bigint | number; checkTimestamp?: boolean; // Used to determine if we need to check the timestamp } ): void { @@ -148,6 +148,38 @@ export class MessageCollector { ); } + const shouldCheckTimestamp = + options.checkTimestamp !== undefined ? options.checkTimestamp : true; + if (shouldCheckTimestamp && message.timestamp) { + // In we send timestamp in the request we assert that it matches the timestamp in the response +- 1 sec + // We take the 1s deviation because there are some ms diffs in timestamps, probably because of conversions + let timestampAsNumber: number; + + if (message.timestamp instanceof Date) { + timestampAsNumber = message.timestamp.getTime(); + } else { + timestampAsNumber = Number(message.timestamp) / 1_000_000; + } + + let lowerBound: number; + let upperBound: number; + + // Define the bounds based on the expectedTimestamp + if (options.expectedTimestamp !== undefined) { + lowerBound = Number(options.expectedTimestamp) - 1000; + upperBound = Number(options.expectedTimestamp) + 1000; + } else { + upperBound = Date.now(); + lowerBound = upperBound - 10000; + } + + if (timestampAsNumber < lowerBound || timestampAsNumber > upperBound) { + throw new AssertionError( + `Message timestamp not within the expected range. Expected between: ${lowerBound} and ${upperBound}. Got: ${timestampAsNumber}` + ); + } + } + if (this.isMessageRpcResponse(message)) { // nwaku message specific assertions const receivedMessageText = message.payload @@ -158,37 +190,6 @@ export class MessageCollector { options.expectedMessageText, `Message text mismatch. Expected: ${options.expectedMessageText}. Got: ${receivedMessageText}` ); - - if (message.timestamp) { - // In we send timestamp in the request we assert that it matches the timestamp in the response +- 1 sec - // We take the 1s deviation because there are some ms diffs in timestamps, probably because of conversions - if (options.expectedTimestamp !== undefined) { - const lowerBound = - BigInt(options.expectedTimestamp) - BigInt(1000000000); - const upperBound = - BigInt(options.expectedTimestamp) + BigInt(1000000000); - - if ( - message.timestamp < lowerBound || - message.timestamp > upperBound - ) { - throw new AssertionError( - `Message timestamp not within the expected range. Expected between: ${lowerBound} and ${upperBound}. Got: ${message.timestamp}` - ); - } - } - // In we don't send timestamp in the request we assert that the timestamp in the response is between now and (now-10s) - else { - const now = BigInt(Date.now()) * BigInt(1_000_000); - const tenSecondsAgo = now - BigInt(10_000_000_000); - - if (message.timestamp < tenSecondsAgo || message.timestamp > now) { - throw new AssertionError( - `Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp}` - ); - } - } - } } else { // js-waku message specific assertions expect(message.pubsubTopic).to.eq( @@ -205,18 +206,6 @@ export class MessageCollector { }. Got: ${bytesToUtf8(message.payload)}` ); - const shouldCheckTimestamp = - options.checkTimestamp !== undefined ? options.checkTimestamp : true; - if (shouldCheckTimestamp && message.timestamp) { - const now = Date.now(); - const tenSecondsAgo = now - 10000; - expect(message.timestamp.getTime()).to.be.within( - tenSecondsAgo, - now, - `Message timestamp not within the expected range. Expected between: ${tenSecondsAgo} and ${now}. Got: ${message.timestamp.getTime()}` - ); - } - expect([ options.expectedMeta, undefined, diff --git a/packages/tests/src/utils.ts b/packages/tests/src/utils.ts new file mode 100644 index 0000000000..8644e26437 --- /dev/null +++ b/packages/tests/src/utils.ts @@ -0,0 +1,22 @@ +import { createDecoder, createEncoder, Decoder, Encoder } from "@waku/core"; + +// Utility to generate test data for multiple topics tests. +export function generateTestData(topicCount: number): { + contentTopics: string[]; + encoders: Encoder[]; + decoders: Decoder[]; +} { + const contentTopics = Array.from( + { length: topicCount }, + (_, i) => `/test/${i + 1}/waku-multi` + ); + const encoders = contentTopics.map((topic) => + createEncoder({ contentTopic: topic }) + ); + const decoders = contentTopics.map((topic) => createDecoder(topic)); + return { + contentTopics, + encoders, + decoders + }; +} diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index 19f65bd5b2..11cb20909b 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -88,7 +88,7 @@ describe("Waku Filter V2: FilterPush", function () { }); }); - it("Check received message with invalid timestamp is not received", async function () { + it("Check message with invalid timestamp is not received", async function () { await subscription.subscribe([TestDecoder], messageCollector.callback); await delay(400); @@ -105,7 +105,7 @@ describe("Waku Filter V2: FilterPush", function () { expect(await messageCollector.waitForMessages(1)).to.eq(false); }); - it("Check received message on other pubsub topic is not received", async function () { + it("Check message on other pubsub topic is not received", async function () { await subscription.subscribe([TestDecoder], messageCollector.callback); await delay(400); @@ -121,7 +121,7 @@ describe("Waku Filter V2: FilterPush", function () { expect(await messageCollector.waitForMessages(1)).to.eq(false); }); - it("Check received message with no pubsub topic is not received", async function () { + it("Check message with no pubsub topic is not received", async function () { await subscription.subscribe([TestDecoder], messageCollector.callback); await delay(400); @@ -136,7 +136,7 @@ describe("Waku Filter V2: FilterPush", function () { expect(await messageCollector.waitForMessages(1)).to.eq(false); }); - it("Check received message with no content topic is not received", async function () { + it("Check message with no content topic is not received", async function () { await subscription.subscribe([TestDecoder], messageCollector.callback); await delay(400); @@ -151,7 +151,7 @@ describe("Waku Filter V2: FilterPush", function () { expect(await messageCollector.waitForMessages(1)).to.eq(false); }); - it("Check received message with no payload is not received", async function () { + it("Check message with no payload is not received", async function () { await subscription.subscribe([TestDecoder], messageCollector.callback); await delay(400); @@ -171,7 +171,7 @@ describe("Waku Filter V2: FilterPush", function () { } }); - it("Check received message with non string payload is not received", async function () { + it("Check message with non string payload is not received", async function () { await subscription.subscribe([TestDecoder], messageCollector.callback); await delay(400); @@ -187,7 +187,7 @@ describe("Waku Filter V2: FilterPush", function () { expect(await messageCollector.waitForMessages(1)).to.eq(false); }); - it("Check received message with extra parameter is not received", async function () { + it("Check message with extra parameter is not received", async function () { await subscription.subscribe([TestDecoder], messageCollector.callback); await delay(400); @@ -226,7 +226,7 @@ describe("Waku Filter V2: FilterPush", function () { }); // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done - it.skip("Check received message received after jswaku node is restarted", async function () { + it.skip("Check message received after jswaku node is restarted", async function () { // Subscribe and send message await subscription.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); @@ -259,7 +259,7 @@ describe("Waku Filter V2: FilterPush", function () { }); // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done - it.skip("Check received message received after nwaku node is restarted", async function () { + it.skip("Check message received after nwaku node is restarted", async function () { await subscription.subscribe([TestDecoder], messageCollector.callback); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M1") }); expect(await messageCollector.waitForMessages(1)).to.eq(true); diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index ce10cc6ca0..68d9f1af0b 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -12,6 +12,7 @@ import { expect } from "chai"; import { delay, + generateTestData, makeLogFileName, MessageCollector, NimGoNode, @@ -20,7 +21,6 @@ import { } from "../../src/index.js"; import { - generateTestData, messagePayload, messageText, runNodes, @@ -295,7 +295,9 @@ describe("Waku Filter V2: Subscribe", function () { // Check if all messages were received. // Since there are overlapping topics, there should be 6 messages in total (2 from the first set + 4 from the second set). - expect(await messageCollector.waitForMessages(6)).to.eq(true); + expect(await messageCollector.waitForMessages(6, { exact: true })).to.eq( + true + ); }); it("Refresh subscription", async function () { @@ -307,7 +309,9 @@ describe("Waku Filter V2: Subscribe", function () { await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("M2") }); // Confirm both messages were received. - expect(await messageCollector.waitForMessages(2)).to.eq(true); + expect(await messageCollector.waitForMessages(2, { exact: true })).to.eq( + true + ); messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: TestContentTopic diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index b79d3e313e..4147fbbf48 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -3,10 +3,14 @@ import type { IFilterSubscription, LightNode } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; -import { MessageCollector, NimGoNode, tearDownNodes } from "../../src/index.js"; - import { generateTestData, + MessageCollector, + NimGoNode, + tearDownNodes +} from "../../src/index.js"; + +import { messagePayload, messageText, runNodes, diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index bdb2146106..20f56e699f 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -1,10 +1,4 @@ -import { - createDecoder, - createEncoder, - Decoder, - Encoder, - waitForRemotePeer -} from "@waku/core"; +import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; import { IFilterSubscription, LightNode, Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { Logger } from "@waku/utils"; @@ -21,27 +15,6 @@ export const TestDecoder = createDecoder(TestContentTopic); export const messageText = "Filtering works!"; export const messagePayload = { payload: utf8ToBytes(messageText) }; -// Utility to generate test data for multiple topics tests. -export function generateTestData(topicCount: number): { - contentTopics: string[]; - encoders: Encoder[]; - decoders: Decoder[]; -} { - const contentTopics = Array.from( - { length: topicCount }, - (_, i) => `/test/${i + 1}/waku-multi` - ); - const encoders = contentTopics.map((topic) => - createEncoder({ contentTopic: topic }) - ); - const decoders = contentTopics.map((topic) => createDecoder(topic)); - return { - contentTopics, - encoders, - decoders - }; -} - // Utility to validate errors related to pings in the subscription. export async function validatePingError( subscription: IFilterSubscription diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index c7f04d5987..8c68462c62 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -188,7 +188,6 @@ describe("Waku Light Push", function () { Date.now() + 3600000 ].forEach((testItem) => { it(`Push message with custom timestamp: ${testItem}`, async function () { - const customTimeNanos = BigInt(testItem) * BigInt(1000000); const pushResponse = await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes(messageText), timestamp: new Date(testItem) @@ -198,7 +197,7 @@ describe("Waku Light Push", function () { expect(await messageCollector.waitForMessages(1)).to.eq(true); messageCollector.verifyReceivedMessage(0, { expectedMessageText: messageText, - expectedTimestamp: customTimeNanos, + expectedTimestamp: testItem, expectedContentTopic: TestContentTopic }); }); diff --git a/packages/tests/tests/relay.node.spec.ts b/packages/tests/tests/relay.node.spec.ts deleted file mode 100644 index b94c07f150..0000000000 --- a/packages/tests/tests/relay.node.spec.ts +++ /dev/null @@ -1,740 +0,0 @@ -import type { PeerId } from "@libp2p/interface/peer-id"; -import { - createDecoder, - createEncoder, - DecodedMessage, - DefaultPubSubTopic, - waitForRemotePeer -} from "@waku/core"; -import { RelayNode, SendError } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; -import { - createDecoder as createEciesDecoder, - createEncoder as createEciesEncoder, - generatePrivateKey, - getPublicKey -} from "@waku/message-encryption/ecies"; -import { - createDecoder as createSymDecoder, - createEncoder as createSymEncoder, - generateSymmetricKey -} from "@waku/message-encryption/symmetric"; -import { createRelayNode } from "@waku/sdk"; -import { Logger } from "@waku/utils"; -import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; -import { expect } from "chai"; - -import { - delay, - makeLogFileName, - MessageCollector, - NOISE_KEY_1, - NOISE_KEY_2, - NOISE_KEY_3, - tearDownNodes -} from "../src/index.js"; -import { MessageRpcResponse } from "../src/node/interfaces.js"; -import { base64ToUtf8, NimGoNode } from "../src/node/node.js"; -import { generateRandomUint8Array } from "../src/random_array.js"; - -const log = new Logger("test:relay"); - -const TestContentTopic = "/test/1/waku-relay/utf8"; -const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); -const TestDecoder = createDecoder(TestContentTopic); - -describe("Waku Relay [node only]", () => { - // Node needed as we don't have a way to connect 2 js waku - // nodes in the browser yet - describe("2 js nodes", () => { - afterEach(function () { - if (this.currentTest?.state === "failed") { - console.log(`Test failed, log file name is ${makeLogFileName(this)}`); - } - }); - - let waku1: RelayNode; - let waku2: RelayNode; - beforeEach(async function () { - this.timeout(10000); - - log.info("Starting JS Waku instances"); - [waku1, waku2] = await Promise.all([ - createRelayNode({ staticNoiseKey: NOISE_KEY_1 }).then((waku) => - waku.start().then(() => waku) - ), - createRelayNode({ - staticNoiseKey: NOISE_KEY_2, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } - }).then((waku) => waku.start().then(() => waku)) - ]); - log.info("Instances started, adding waku2 to waku1's address book"); - await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await waku1.dial(waku2.libp2p.peerId); - - log.info("Wait for mutual pubsub subscription"); - await Promise.all([ - waitForRemotePeer(waku1, [Protocols.Relay]), - waitForRemotePeer(waku2, [Protocols.Relay]) - ]); - log.info("before each hook done"); - }); - - afterEach(async function () { - !!waku1 && - waku1.stop().catch((e) => console.log("Waku failed to stop", e)); - !!waku2 && - waku2.stop().catch((e) => console.log("Waku failed to stop", e)); - }); - - it("Subscribe", async function () { - log.info("Getting subscribers"); - const subscribers1 = waku1.libp2p.services - .pubsub!.getSubscribers(DefaultPubSubTopic) - .map((p) => p.toString()); - const subscribers2 = waku2.libp2p.services - .pubsub!.getSubscribers(DefaultPubSubTopic) - .map((p) => p.toString()); - - log.info("Asserting mutual subscription"); - expect(subscribers1).to.contain(waku2.libp2p.peerId.toString()); - expect(subscribers2).to.contain(waku1.libp2p.peerId.toString()); - }); - - it("Register correct protocols", async function () { - const protocols = waku1.libp2p.getProtocols(); - - expect(protocols).to.contain("/vac/waku/relay/2.0.0"); - expect(protocols.findIndex((value) => value.match(/sub/))).to.eq(-1); - }); - - it("Publish", async function () { - this.timeout(10000); - - const messageText = "JS to JS communication works"; - const messageTimestamp = new Date("1995-12-17T03:24:00"); - const message = { - payload: utf8ToBytes(messageText), - timestamp: messageTimestamp - }; - - const receivedMsgPromise: Promise = new Promise( - (resolve) => { - void waku2.relay.subscribe([TestDecoder], resolve); - } - ); - - await waku1.relay.send(TestEncoder, message); - - const receivedMsg = await receivedMsgPromise; - - expect(receivedMsg.contentTopic).to.eq(TestContentTopic); - expect(bytesToUtf8(receivedMsg.payload)).to.eq(messageText); - expect(receivedMsg.timestamp?.valueOf()).to.eq( - messageTimestamp.valueOf() - ); - }); - - it("Filter on content topics", async function () { - this.timeout(10000); - - const fooMessageText = "Published on content topic foo"; - const barMessageText = "Published on content topic bar"; - - const fooContentTopic = "foo"; - const barContentTopic = "bar"; - - const fooEncoder = createEncoder({ contentTopic: fooContentTopic }); - const barEncoder = createEncoder({ contentTopic: barContentTopic }); - - const fooDecoder = createDecoder(fooContentTopic); - const barDecoder = createDecoder(barContentTopic); - - const fooMessages: DecodedMessage[] = []; - void waku2.relay.subscribe([fooDecoder], (msg) => { - fooMessages.push(msg); - }); - - const barMessages: DecodedMessage[] = []; - void waku2.relay.subscribe([barDecoder], (msg) => { - barMessages.push(msg); - }); - - await waku1.relay.send(barEncoder, { - payload: utf8ToBytes(barMessageText) - }); - await waku1.relay.send(fooEncoder, { - payload: utf8ToBytes(fooMessageText) - }); - - while (!fooMessages.length && !barMessages.length) { - await delay(100); - } - - expect(fooMessages[0].contentTopic).to.eq(fooContentTopic); - expect(bytesToUtf8(fooMessages[0].payload)).to.eq(fooMessageText); - - expect(barMessages[0].contentTopic).to.eq(barContentTopic); - expect(bytesToUtf8(barMessages[0].payload)).to.eq(barMessageText); - - expect(fooMessages.length).to.eq(1); - expect(barMessages.length).to.eq(1); - }); - - it("Decrypt messages", async function () { - this.timeout(10000); - - const asymText = "This message is encrypted using asymmetric"; - const asymTopic = "/test/1/asymmetric/proto"; - const symText = "This message is encrypted using symmetric encryption"; - const symTopic = "/test/1/symmetric/proto"; - - const privateKey = generatePrivateKey(); - const symKey = generateSymmetricKey(); - const publicKey = getPublicKey(privateKey); - - const eciesEncoder = createEciesEncoder({ - contentTopic: asymTopic, - publicKey - }); - const symEncoder = createSymEncoder({ - contentTopic: symTopic, - symKey - }); - - const eciesDecoder = createEciesDecoder(asymTopic, privateKey); - const symDecoder = createSymDecoder(symTopic, symKey); - - const msgs: DecodedMessage[] = []; - void waku2.relay.subscribe([eciesDecoder], (wakuMsg) => { - msgs.push(wakuMsg); - }); - void waku2.relay.subscribe([symDecoder], (wakuMsg) => { - msgs.push(wakuMsg); - }); - - await waku1.relay.send(eciesEncoder, { payload: utf8ToBytes(asymText) }); - await delay(200); - await waku1.relay.send(symEncoder, { payload: utf8ToBytes(symText) }); - - while (msgs.length < 2) { - await delay(200); - } - - expect(msgs[0].contentTopic).to.eq(asymTopic); - expect(bytesToUtf8(msgs[0].payload!)).to.eq(asymText); - expect(msgs[1].contentTopic).to.eq(symTopic); - expect(bytesToUtf8(msgs[1].payload!)).to.eq(symText); - }); - - it("Delete observer", async function () { - this.timeout(10000); - - const messageText = - "Published on content topic with added then deleted observer"; - - const contentTopic = "added-then-deleted-observer"; - - // The promise **fails** if we receive a message on this observer. - const receivedMsgPromise: Promise = new Promise( - (resolve, reject) => { - const deleteObserver = waku2.relay.subscribe( - [createDecoder(contentTopic)], - reject - ) as () => void; - deleteObserver(); - setTimeout(resolve, 500); - } - ); - await waku1.relay.send(createEncoder({ contentTopic }), { - payload: utf8ToBytes(messageText) - }); - - await receivedMsgPromise; - // If it does not throw then we are good. - }); - }); - - describe("Custom pubsub topic", () => { - let waku1: RelayNode; - let waku2: RelayNode; - let waku3: RelayNode; - - const CustomContentTopic = "/test/2/waku-relay/utf8"; - const CustomPubSubTopic = "/some/pubsub/topic"; - - const CustomEncoder = createEncoder({ - contentTopic: CustomContentTopic, - pubsubTopic: CustomPubSubTopic - }); - const CustomDecoder = createDecoder(CustomContentTopic, CustomPubSubTopic); - - afterEach(async function () { - !!waku1 && - waku1.stop().catch((e) => console.log("Waku failed to stop", e)); - !!waku2 && - waku2.stop().catch((e) => console.log("Waku failed to stop", e)); - !!waku3 && - waku3.stop().catch((e) => console.log("Waku failed to stop", e)); - }); - - [ - { - pubsub: CustomPubSubTopic, - encoder: CustomEncoder, - decoder: CustomDecoder - }, - { - pubsub: DefaultPubSubTopic, - encoder: TestEncoder, - decoder: TestDecoder - } - ].forEach((testItem) => { - it(`3 nodes on ${testItem.pubsub} topic`, async function () { - this.timeout(10000); - - const [msgCollector1, msgCollector2, msgCollector3] = Array(3) - .fill(null) - .map(() => new MessageCollector()); - - [waku1, waku2, waku3] = await Promise.all([ - createRelayNode({ - pubsubTopics: [testItem.pubsub], - staticNoiseKey: NOISE_KEY_1 - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - pubsubTopics: [testItem.pubsub], - staticNoiseKey: NOISE_KEY_2, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - pubsubTopics: [testItem.pubsub], - staticNoiseKey: NOISE_KEY_3 - }).then((waku) => waku.start().then(() => waku)) - ]); - - await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await Promise.all([ - waku1.dial(waku2.libp2p.peerId), - waku3.dial(waku2.libp2p.peerId) - ]); - - await Promise.all([ - waitForRemotePeer(waku1, [Protocols.Relay]), - waitForRemotePeer(waku2, [Protocols.Relay]), - waitForRemotePeer(waku3, [Protocols.Relay]) - ]); - - await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback); - await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback); - await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback); - - // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network - const relayResponse1 = await waku1.relay.send(testItem.encoder, { - payload: utf8ToBytes("M1") - }); - const relayResponse2 = await waku2.relay.send(testItem.encoder, { - payload: utf8ToBytes("M2") - }); - const relayResponse3 = await waku3.relay.send(testItem.encoder, { - payload: utf8ToBytes("M3") - }); - - expect(relayResponse1.recipients[0].toString()).to.eq( - waku2.libp2p.peerId.toString() - ); - expect(relayResponse3.recipients[0].toString()).to.eq( - waku2.libp2p.peerId.toString() - ); - expect(relayResponse2.recipients.map((r) => r.toString())).to.include( - waku1.libp2p.peerId.toString() - ); - expect(relayResponse2.recipients.map((r) => r.toString())).to.include( - waku3.libp2p.peerId.toString() - ); - - expect(await msgCollector1.waitForMessages(2, { exact: true })).to.eq( - true - ); - expect(await msgCollector2.waitForMessages(2, { exact: true })).to.eq( - true - ); - expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq( - true - ); - - expect( - msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2") - ).to.eq(true); - expect( - msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3") - ).to.eq(true); - expect( - msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1") - ).to.eq(true); - expect( - msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3") - ).to.eq(true); - expect( - msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1") - ).to.eq(true); - expect( - msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2") - ).to.eq(true); - }); - }); - - it("Nodes with multiple pubsub topic", async function () { - this.timeout(10000); - - const [msgCollector1, msgCollector2, msgCollector3] = Array(3) - .fill(null) - .map(() => new MessageCollector()); - - // Waku1 and waku2 are using multiple pubsub topis - [waku1, waku2, waku3] = await Promise.all([ - createRelayNode({ - pubsubTopics: [DefaultPubSubTopic, CustomPubSubTopic], - staticNoiseKey: NOISE_KEY_1 - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - pubsubTopics: [DefaultPubSubTopic, CustomPubSubTopic], - staticNoiseKey: NOISE_KEY_2, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - pubsubTopics: [DefaultPubSubTopic], - staticNoiseKey: NOISE_KEY_3 - }).then((waku) => waku.start().then(() => waku)) - ]); - - await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await Promise.all([ - waku1.dial(waku2.libp2p.peerId), - waku3.dial(waku2.libp2p.peerId) - ]); - - await Promise.all([ - waitForRemotePeer(waku1, [Protocols.Relay]), - waitForRemotePeer(waku2, [Protocols.Relay]), - waitForRemotePeer(waku3, [Protocols.Relay]) - ]); - - await waku1.relay.subscribe( - [TestDecoder, CustomDecoder], - msgCollector1.callback - ); - await waku2.relay.subscribe( - [TestDecoder, CustomDecoder], - msgCollector2.callback - ); - await waku3.relay.subscribe([TestDecoder], msgCollector3.callback); - - // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network - // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic - await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") }); - await waku1.relay.send(CustomEncoder, { payload: utf8ToBytes("M2") }); - await waku2.relay.send(TestEncoder, { payload: utf8ToBytes("M3") }); - await waku2.relay.send(CustomEncoder, { payload: utf8ToBytes("M4") }); - await waku3.relay.send(TestEncoder, { payload: utf8ToBytes("M5") }); - await waku3.relay.send(CustomEncoder, { payload: utf8ToBytes("M6") }); - - expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq( - true - ); - expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq( - true - ); - expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq( - true - ); - expect(msgCollector1.hasMessage(TestContentTopic, "M3")).to.eq(true); - expect(msgCollector1.hasMessage(CustomContentTopic, "M4")).to.eq(true); - expect(msgCollector1.hasMessage(TestContentTopic, "M5")).to.eq(true); - expect(msgCollector2.hasMessage(TestContentTopic, "M1")).to.eq(true); - expect(msgCollector2.hasMessage(CustomContentTopic, "M2")).to.eq(true); - expect(msgCollector2.hasMessage(TestContentTopic, "M5")).to.eq(true); - expect(msgCollector3.hasMessage(TestContentTopic, "M1")).to.eq(true); - expect(msgCollector3.hasMessage(TestContentTopic, "M3")).to.eq(true); - }); - - it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () { - this.timeout(10000); - - [waku1, waku2, waku3] = await Promise.all([ - createRelayNode({ - pubsubTopics: [CustomPubSubTopic], - staticNoiseKey: NOISE_KEY_1 - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - pubsubTopics: [CustomPubSubTopic], - staticNoiseKey: NOISE_KEY_2, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - staticNoiseKey: NOISE_KEY_3 - }).then((waku) => waku.start().then(() => waku)) - ]); - - await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await Promise.all([ - waku1.dial(waku2.libp2p.peerId), - waku3.dial(waku2.libp2p.peerId) - ]); - - await Promise.all([ - waitForRemotePeer(waku1, [Protocols.Relay]), - waitForRemotePeer(waku2, [Protocols.Relay]) - ]); - - const messageText = "Communicating using a custom pubsub topic"; - - const waku2ReceivedMsgPromise: Promise = new Promise( - (resolve) => { - void waku2.relay.subscribe([CustomDecoder], resolve); - } - ); - - // The promise **fails** if we receive a message on the default - // pubsub topic. - const waku3NoMsgPromise: Promise = new Promise( - (resolve, reject) => { - void waku3.relay.subscribe([TestDecoder], reject); - setTimeout(resolve, 1000); - } - ); - - await waku1.relay.send(CustomEncoder, { - payload: utf8ToBytes(messageText) - }); - - const waku2ReceivedMsg = await waku2ReceivedMsgPromise; - await waku3NoMsgPromise; - - expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText); - expect(waku2ReceivedMsg.pubsubTopic).to.eq(CustomPubSubTopic); - }); - - it("Publishes <= 1 MB and rejects others", async function () { - this.timeout(10000); - const MB = 1024 ** 2; - - // 1 and 2 uses a custom pubsub - [waku1, waku2] = await Promise.all([ - createRelayNode({ - pubsubTopics: [CustomPubSubTopic], - staticNoiseKey: NOISE_KEY_1 - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - pubsubTopics: [CustomPubSubTopic], - staticNoiseKey: NOISE_KEY_2, - libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } - }).then((waku) => waku.start().then(() => waku)) - ]); - - await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { - multiaddrs: waku2.libp2p.getMultiaddrs() - }); - await Promise.all([waku1.dial(waku2.libp2p.peerId)]); - - await Promise.all([ - waitForRemotePeer(waku1, [Protocols.Relay]), - waitForRemotePeer(waku2, [Protocols.Relay]) - ]); - - const waku2ReceivedMsgPromise: Promise = new Promise( - (resolve) => { - void waku2.relay.subscribe([CustomDecoder], () => - resolve({ - payload: new Uint8Array([]) - } as DecodedMessage) - ); - } - ); - - let sendResult = await waku1.relay.send(CustomEncoder, { - payload: generateRandomUint8Array(1 * MB) - }); - expect(sendResult.recipients.length).to.eq(1); - - sendResult = await waku1.relay.send(CustomEncoder, { - payload: generateRandomUint8Array(1 * MB + 65536) - }); - expect(sendResult.recipients.length).to.eq(0); - expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG); - - sendResult = await waku1.relay.send(CustomEncoder, { - payload: generateRandomUint8Array(2 * MB) - }); - expect(sendResult.recipients.length).to.eq(0); - expect(sendResult.errors).to.include(SendError.SIZE_TOO_BIG); - - const waku2ReceivedMsg = await waku2ReceivedMsgPromise; - expect(waku2ReceivedMsg?.payload?.length).to.eq(0); - }); - }); - - describe("Interop: NimGoNode", function () { - let waku: RelayNode; - let nwaku: NimGoNode; - - beforeEach(async function () { - this.timeout(30_000); - waku = await createRelayNode({ - staticNoiseKey: NOISE_KEY_1 - }); - await waku.start(); - - nwaku = new NimGoNode(this.test?.ctx?.currentTest?.title + ""); - await nwaku.start({ relay: true }); - - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Relay]); - - // Nwaku subscribe to the default pubsub topic - await nwaku.ensureSubscriptions(); - }); - - afterEach(async function () { - this.timeout(15000); - await tearDownNodes(nwaku, waku); - }); - - it("nwaku subscribes", async function () { - let subscribers: PeerId[] = []; - - while (subscribers.length === 0) { - await delay(200); - subscribers = - waku.libp2p.services.pubsub!.getSubscribers(DefaultPubSubTopic); - } - - const nimPeerId = await nwaku.getPeerId(); - expect(subscribers.map((p) => p.toString())).to.contain( - nimPeerId.toString() - ); - }); - - it("Publishes to nwaku", async function () { - this.timeout(30000); - - const messageText = "This is a message"; - await waku.relay.send(TestEncoder, { payload: utf8ToBytes(messageText) }); - - let msgs: MessageRpcResponse[] = []; - - while (msgs.length === 0) { - console.log("Waiting for messages"); - await delay(200); - msgs = await nwaku.messages(); - } - - expect(msgs[0].contentTopic).to.equal(TestContentTopic); - expect(msgs[0].version).to.equal(0); - expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText); - }); - - it("Nwaku publishes", async function () { - await delay(200); - - const messageText = "Here is another message."; - - const receivedMsgPromise: Promise = new Promise( - (resolve) => { - void waku.relay.subscribe(TestDecoder, (msg) => - resolve(msg) - ); - } - ); - - await nwaku.sendMessage( - NimGoNode.toMessageRpcQuery({ - contentTopic: TestContentTopic, - payload: utf8ToBytes(messageText) - }) - ); - - const receivedMsg = await receivedMsgPromise; - - expect(receivedMsg.contentTopic).to.eq(TestContentTopic); - expect(receivedMsg.version!).to.eq(0); - expect(bytesToUtf8(receivedMsg.payload!)).to.eq(messageText); - }); - - describe.skip("Two nodes connected to nwaku", function () { - let waku1: RelayNode; - let waku2: RelayNode; - let nwaku: NimGoNode; - - afterEach(async function () { - await tearDownNodes(nwaku, [waku1, waku2]); - }); - - it("Js publishes, other Js receives", async function () { - this.timeout(60_000); - [waku1, waku2] = await Promise.all([ - createRelayNode({ - staticNoiseKey: NOISE_KEY_1, - emitSelf: true - }).then((waku) => waku.start().then(() => waku)), - createRelayNode({ - staticNoiseKey: NOISE_KEY_2 - }).then((waku) => waku.start().then(() => waku)) - ]); - - nwaku = new NimGoNode(makeLogFileName(this)); - await nwaku.start(); - - const nwakuMultiaddr = await nwaku.getMultiaddrWithId(); - await Promise.all([ - waku1.dial(nwakuMultiaddr), - waku2.dial(nwakuMultiaddr) - ]); - - // Wait for identify protocol to finish - await Promise.all([ - waitForRemotePeer(waku1, [Protocols.Relay]), - waitForRemotePeer(waku2, [Protocols.Relay]) - ]); - - await delay(2000); - // Check that the two JS peers are NOT directly connected - expect(await waku1.libp2p.peerStore.has(waku2.libp2p.peerId)).to.be - .false; - expect(waku2.libp2p.peerStore.has(waku1.libp2p.peerId)).to.be.false; - - const msgStr = "Hello there!"; - const message = { payload: utf8ToBytes(msgStr) }; - - const waku2ReceivedMsgPromise: Promise = new Promise( - (resolve) => { - void waku2.relay.subscribe(TestDecoder, resolve); - } - ); - - await waku1.relay.send(TestEncoder, message); - console.log("Waiting for message"); - const waku2ReceivedMsg = await waku2ReceivedMsgPromise; - - expect(waku2ReceivedMsg.payload).to.eq(msgStr); - }); - }); - }); -}); diff --git a/packages/tests/tests/relay/index.node.spec.ts b/packages/tests/tests/relay/index.node.spec.ts new file mode 100644 index 0000000000..31f955ab7c --- /dev/null +++ b/packages/tests/tests/relay/index.node.spec.ts @@ -0,0 +1,127 @@ +import { createDecoder, createEncoder, DecodedMessage } from "@waku/core"; +import { RelayNode } from "@waku/interfaces"; +import { + createDecoder as createEciesDecoder, + createEncoder as createEciesEncoder, + generatePrivateKey, + getPublicKey +} from "@waku/message-encryption/ecies"; +import { + createDecoder as createSymDecoder, + createEncoder as createSymEncoder, + generateSymmetricKey +} from "@waku/message-encryption/symmetric"; +import { createRelayNode } from "@waku/sdk"; +import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + delay, + NOISE_KEY_1, + NOISE_KEY_2, + tearDownNodes +} from "../../src/index.js"; + +import { log, waitForAllRemotePeers } from "./utils.js"; + +describe("Waku Relay", function () { + this.timeout(15000); + let waku1: RelayNode; + let waku2: RelayNode; + + beforeEach(async function () { + this.timeout(10000); + log.info("Starting JS Waku instances"); + [waku1, waku2] = await Promise.all([ + createRelayNode({ staticNoiseKey: NOISE_KEY_1 }).then((waku) => + waku.start().then(() => waku) + ), + createRelayNode({ + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)) + ]); + log.info("Instances started, adding waku2 to waku1's address book"); + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku1.dial(waku2.libp2p.peerId); + + await waitForAllRemotePeers(waku1, waku2); + log.info("before each hook done"); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([], [waku1, waku2]); + }); + + it("Decrypt messages", async function () { + const asymText = "This message is encrypted using asymmetric"; + const asymTopic = "/test/1/asymmetric/proto"; + const symText = "This message is encrypted using symmetric encryption"; + const symTopic = "/test/1/symmetric/proto"; + + const privateKey = generatePrivateKey(); + const symKey = generateSymmetricKey(); + const publicKey = getPublicKey(privateKey); + + const eciesEncoder = createEciesEncoder({ + contentTopic: asymTopic, + publicKey + }); + const symEncoder = createSymEncoder({ + contentTopic: symTopic, + symKey + }); + + const eciesDecoder = createEciesDecoder(asymTopic, privateKey); + const symDecoder = createSymDecoder(symTopic, symKey); + + const msgs: DecodedMessage[] = []; + void waku2.relay.subscribe([eciesDecoder], (wakuMsg) => { + msgs.push(wakuMsg); + }); + void waku2.relay.subscribe([symDecoder], (wakuMsg) => { + msgs.push(wakuMsg); + }); + + await waku1.relay.send(eciesEncoder, { payload: utf8ToBytes(asymText) }); + await delay(200); + await waku1.relay.send(symEncoder, { payload: utf8ToBytes(symText) }); + + while (msgs.length < 2) { + await delay(200); + } + + expect(msgs[0].contentTopic).to.eq(asymTopic); + expect(bytesToUtf8(msgs[0].payload!)).to.eq(asymText); + expect(msgs[1].contentTopic).to.eq(symTopic); + expect(bytesToUtf8(msgs[1].payload!)).to.eq(symText); + }); + + it("Delete observer", async function () { + const messageText = + "Published on content topic with added then deleted observer"; + + const contentTopic = "added-then-deleted-observer"; + + // The promise **fails** if we receive a message on this observer. + const receivedMsgPromise: Promise = new Promise( + (resolve, reject) => { + const deleteObserver = waku2.relay.subscribe( + [createDecoder(contentTopic)], + reject + ) as () => void; + deleteObserver(); + setTimeout(resolve, 500); + } + ); + await waku1.relay.send(createEncoder({ contentTopic }), { + payload: utf8ToBytes(messageText) + }); + + await receivedMsgPromise; + // If it does not throw then we are good. + }); +}); diff --git a/packages/tests/tests/relay/interop.node.spec.ts b/packages/tests/tests/relay/interop.node.spec.ts new file mode 100644 index 0000000000..b4d5372ded --- /dev/null +++ b/packages/tests/tests/relay/interop.node.spec.ts @@ -0,0 +1,169 @@ +import type { PeerId } from "@libp2p/interface/peer-id"; +import { + DecodedMessage, + DefaultPubSubTopic, + waitForRemotePeer +} from "@waku/core"; +import { RelayNode } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; +import { createRelayNode } from "@waku/sdk"; +import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + delay, + makeLogFileName, + NOISE_KEY_1, + NOISE_KEY_2, + tearDownNodes +} from "../../src/index.js"; +import { MessageRpcResponse } from "../../src/node/interfaces.js"; +import { base64ToUtf8, NimGoNode } from "../../src/node/node.js"; + +import { TestContentTopic, TestDecoder, TestEncoder } from "./utils.js"; + +describe("Waku Relay, Interop", function () { + this.timeout(15000); + let waku: RelayNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(30000); + waku = await createRelayNode({ + staticNoiseKey: NOISE_KEY_1 + }); + await waku.start(); + + nwaku = new NimGoNode(this.test?.ctx?.currentTest?.title + ""); + await nwaku.start({ relay: true }); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Relay]); + + // Nwaku subscribe to the default pubsub topic + await nwaku.ensureSubscriptions(); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes(nwaku, waku); + }); + + it("nwaku subscribes", async function () { + let subscribers: PeerId[] = []; + + while (subscribers.length === 0) { + await delay(200); + subscribers = + waku.libp2p.services.pubsub!.getSubscribers(DefaultPubSubTopic); + } + + const nimPeerId = await nwaku.getPeerId(); + expect(subscribers.map((p) => p.toString())).to.contain( + nimPeerId.toString() + ); + }); + + it("Publishes to nwaku", async function () { + const messageText = "This is a message"; + await waku.relay.send(TestEncoder, { payload: utf8ToBytes(messageText) }); + + let msgs: MessageRpcResponse[] = []; + + while (msgs.length === 0) { + await delay(200); + msgs = await nwaku.messages(); + } + + expect(msgs[0].contentTopic).to.equal(TestContentTopic); + expect(msgs[0].version).to.equal(0); + expect(base64ToUtf8(msgs[0].payload)).to.equal(messageText); + }); + + it("Nwaku publishes", async function () { + await delay(200); + + const messageText = "Here is another message."; + + const receivedMsgPromise: Promise = new Promise( + (resolve) => { + void waku.relay.subscribe(TestDecoder, (msg) => + resolve(msg) + ); + } + ); + + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + contentTopic: TestContentTopic, + payload: utf8ToBytes(messageText) + }) + ); + + const receivedMsg = await receivedMsgPromise; + + expect(receivedMsg.contentTopic).to.eq(TestContentTopic); + expect(receivedMsg.version!).to.eq(0); + expect(bytesToUtf8(receivedMsg.payload!)).to.eq(messageText); + }); + + describe("Two nodes connected to nwaku", function () { + let waku1: RelayNode; + let waku2: RelayNode; + let nwaku: NimGoNode; + + afterEach(async function () { + await tearDownNodes(nwaku, [waku1, waku2]); + }); + + it("Js publishes, other Js receives", async function () { + [waku1, waku2] = await Promise.all([ + createRelayNode({ + staticNoiseKey: NOISE_KEY_1, + emitSelf: true + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + staticNoiseKey: NOISE_KEY_2 + }).then((waku) => waku.start().then(() => waku)) + ]); + + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start({ relay: true }); + + const nwakuMultiaddr = await nwaku.getMultiaddrWithId(); + await Promise.all([ + waku1.dial(nwakuMultiaddr), + waku2.dial(nwakuMultiaddr) + ]); + + // Wait for identify protocol to finish + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]) + ]); + + await delay(2000); + // Check that the two JS peers are NOT directly connected + expect(await waku1.libp2p.peerStore.has(waku2.libp2p.peerId)).to.eq( + false + ); + expect(await waku2.libp2p.peerStore.has(waku1.libp2p.peerId)).to.eq( + false + ); + + const msgStr = "Hello there!"; + const message = { payload: utf8ToBytes(msgStr) }; + + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + void waku2.relay.subscribe(TestDecoder, resolve); + } + ); + + await waku1.relay.send(TestEncoder, message); + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; + + expect(bytesToUtf8(waku2ReceivedMsg.payload)).to.eq(msgStr); + }); + }); +}); diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts new file mode 100644 index 0000000000..3711b6286e --- /dev/null +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -0,0 +1,280 @@ +import { + DecodedMessage, + DefaultPubSubTopic, + waitForRemotePeer +} from "@waku/core"; +import { RelayNode } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; +import { createRelayNode } from "@waku/sdk"; +import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + MessageCollector, + NOISE_KEY_1, + NOISE_KEY_2, + NOISE_KEY_3, + tearDownNodes +} from "../../src/index.js"; + +import { + CustomContentTopic, + CustomDecoder, + CustomEncoder, + CustomPubSubTopic, + TestContentTopic, + TestDecoder, + TestEncoder +} from "./utils.js"; + +describe("Waku Relay, multiple pubsub topics", function () { + this.timeout(15000); + let waku1: RelayNode; + let waku2: RelayNode; + let waku3: RelayNode; + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([], [waku1, waku2, waku3]); + }); + + [ + { + pubsub: CustomPubSubTopic, + encoder: CustomEncoder, + decoder: CustomDecoder + }, + { + pubsub: DefaultPubSubTopic, + encoder: TestEncoder, + decoder: TestDecoder + } + ].forEach((testItem) => { + it(`3 nodes on ${testItem.pubsub} topic`, async function () { + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + pubsubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback); + await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback); + await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + const relayResponse1 = await waku1.relay.send(testItem.encoder, { + payload: utf8ToBytes("M1") + }); + const relayResponse2 = await waku2.relay.send(testItem.encoder, { + payload: utf8ToBytes("M2") + }); + const relayResponse3 = await waku3.relay.send(testItem.encoder, { + payload: utf8ToBytes("M3") + }); + + expect(relayResponse1.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse3.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku1.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku3.libp2p.peerId.toString() + ); + + expect(await msgCollector1.waitForMessages(2, { exact: true })).to.eq( + true + ); + expect(await msgCollector2.waitForMessages(2, { exact: true })).to.eq( + true + ); + expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq( + true + ); + + expect( + msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2") + ).to.eq(true); + expect( + msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3") + ).to.eq(true); + expect( + msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1") + ).to.eq(true); + expect( + msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3") + ).to.eq(true); + expect( + msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1") + ).to.eq(true); + expect( + msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2") + ).to.eq(true); + }); + }); + + it("Nodes with multiple pubsub topic", async function () { + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + // Waku1 and waku2 are using multiple pubsub topis + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + pubsubTopics: [DefaultPubSubTopic, CustomPubSubTopic], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [DefaultPubSubTopic, CustomPubSubTopic], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [DefaultPubSubTopic], + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe( + [TestDecoder, CustomDecoder], + msgCollector1.callback + ); + await waku2.relay.subscribe( + [TestDecoder, CustomDecoder], + msgCollector2.callback + ); + await waku3.relay.subscribe([TestDecoder], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic + await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") }); + await waku1.relay.send(CustomEncoder, { payload: utf8ToBytes("M2") }); + await waku2.relay.send(TestEncoder, { payload: utf8ToBytes("M3") }); + await waku2.relay.send(CustomEncoder, { payload: utf8ToBytes("M4") }); + await waku3.relay.send(TestEncoder, { payload: utf8ToBytes("M5") }); + await waku3.relay.send(CustomEncoder, { payload: utf8ToBytes("M6") }); + + expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq(true); + expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq(true); + expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(true); + expect(msgCollector1.hasMessage(TestContentTopic, "M3")).to.eq(true); + expect(msgCollector1.hasMessage(CustomContentTopic, "M4")).to.eq(true); + expect(msgCollector1.hasMessage(TestContentTopic, "M5")).to.eq(true); + expect(msgCollector2.hasMessage(TestContentTopic, "M1")).to.eq(true); + expect(msgCollector2.hasMessage(CustomContentTopic, "M2")).to.eq(true); + expect(msgCollector2.hasMessage(TestContentTopic, "M5")).to.eq(true); + expect(msgCollector3.hasMessage(TestContentTopic, "M1")).to.eq(true); + expect(msgCollector3.hasMessage(TestContentTopic, "M3")).to.eq(true); + }); + + it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () { + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + pubsubTopics: [CustomPubSubTopic], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [CustomPubSubTopic], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]) + ]); + + const messageText = "Communicating using a custom pubsub topic"; + + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + void waku2.relay.subscribe([CustomDecoder], resolve); + } + ); + + // The promise **fails** if we receive a message on the default + // pubsub topic. + const waku3NoMsgPromise: Promise = new Promise( + (resolve, reject) => { + void waku3.relay.subscribe([TestDecoder], reject); + setTimeout(resolve, 1000); + } + ); + + await waku1.relay.send(CustomEncoder, { + payload: utf8ToBytes(messageText) + }); + + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; + await waku3NoMsgPromise; + + expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText); + expect(waku2ReceivedMsg.pubsubTopic).to.eq(CustomPubSubTopic); + }); +}); diff --git a/packages/tests/tests/relay/publish.node.spec.ts b/packages/tests/tests/relay/publish.node.spec.ts new file mode 100644 index 0000000000..f650fe19b6 --- /dev/null +++ b/packages/tests/tests/relay/publish.node.spec.ts @@ -0,0 +1,256 @@ +import { createEncoder, DefaultPubSubTopic } from "@waku/core"; +import { IRateLimitProof, RelayNode, SendError } from "@waku/interfaces"; +import { createRelayNode } from "@waku/sdk"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + delay, + MessageCollector, + NOISE_KEY_1, + NOISE_KEY_2, + tearDownNodes, + TEST_STRING +} from "../../src/index.js"; +import { generateRandomUint8Array } from "../../src/random_array.js"; + +import { + log, + messageText, + TestContentTopic, + TestDecoder, + TestEncoder, + waitForAllRemotePeers +} from "./utils.js"; + +describe("Waku Relay, Publish", function () { + this.timeout(15000); + let waku1: RelayNode; + let waku2: RelayNode; + let messageCollector: MessageCollector; + + beforeEach(async function () { + this.timeout(10000); + log.info("Starting JS Waku instances"); + [waku1, waku2] = await Promise.all([ + createRelayNode({ + pubsubTopics: [DefaultPubSubTopic], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [DefaultPubSubTopic], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)) + ]); + log.info("Instances started, adding waku2 to waku1's address book"); + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku1.dial(waku2.libp2p.peerId); + log.info("before each hook done"); + await waitForAllRemotePeers(waku1, waku2); + messageCollector = new MessageCollector(); + await waku2.relay.subscribe([TestDecoder], messageCollector.callback); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([], [waku1, waku2]); + }); + + TEST_STRING.forEach((testItem) => { + it(`Check publish message containing ${testItem.description}`, async function () { + const pushResponse = await waku1.relay.send(TestEncoder, { + payload: utf8ToBytes(testItem.value) + }); + expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: testItem.value, + expectedContentTopic: TestContentTopic + }); + }); + }); + + [ + new Date("1995-12-17T03:24:00"), + new Date(Date.now() - 3600000 * 24 * 356), + new Date(Date.now() - 3600000), + new Date(Date.now() + 3600000) + ].forEach((testItem) => { + it(`Publish message with custom timestamp: ${testItem}`, async function () { + const pushResponse = await waku1.relay.send(TestEncoder, { + payload: utf8ToBytes(messageText), + timestamp: testItem + }); + + expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic, + expectedTimestamp: testItem.valueOf() + }); + }); + }); + + it("Fails to publish duplicate message", async function () { + await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("m1") }); + try { + await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("m1") }); + await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("m1") }); + expect.fail("Expected an error but didn't get one"); + } catch (error) { + expect((error as Error).message).to.include("PublishError.Duplicate"); + } + }); + + it("Fails to publish message with empty text", async function () { + await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("") }); + await delay(400); + expect(await messageCollector.waitForMessages(1)).to.eq(false); + }); + + it("Fails to publish message with wrong content topic", async function () { + const wrong_encoder = createEncoder({ contentTopic: "wrong" }); + await waku1.relay.send(wrong_encoder, { + payload: utf8ToBytes("") + }); + expect(await messageCollector.waitForMessages(1)).to.eq(false); + }); + + it("Fails to publish message with wrong pubsubtopic", async function () { + const wrong_encoder = createEncoder({ + pubsubTopic: "wrong", + contentTopic: TestContentTopic + }); + const pushResponse = await waku1.relay.send(wrong_encoder, { + payload: utf8ToBytes("") + }); + expect(pushResponse.errors?.[0]).to.eq("Topic not configured"); + await delay(400); + expect(await messageCollector.waitForMessages(1)).to.eq(false); + }); + + it("Publish message with size of 1 MB", async function () { + const pushResponse = await waku1.relay.send(TestEncoder, { + payload: generateRandomUint8Array(1024 ** 2) + }); + expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + }); + + [1024 ** 2 + 65536, 2 * 1024 ** 2].forEach((testItem) => { + it("Fails to publish message with size larger than 1 MB", async function () { + const pushResponse = await waku1.relay.send(TestEncoder, { + payload: generateRandomUint8Array(testItem) + }); + expect(pushResponse.recipients.length).to.eq(0); + expect(pushResponse.errors).to.include(SendError.SIZE_TOO_BIG); + await delay(400); + expect(await messageCollector.waitForMessages(1)).to.eq(false); + }); + }); + + // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done + it.skip("Check publish message after service node is restarted", async function () { + await waku1.relay.send(TestEncoder, { + payload: utf8ToBytes("m1") + }); + + // Restart js-waku node + await waku1.stop(); + expect(waku1.isStarted()).to.eq(false); + await waku1.start(); + expect(waku1.isStarted()).to.eq(true); + await waku1.dial(waku2.libp2p.peerId); + + // Redo the connection and create a new subscription + await waitForAllRemotePeers(waku1, waku2); + const pushResponse = await waku1.relay.send(TestEncoder, { + payload: utf8ToBytes("m2") + }); + expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(await messageCollector.waitForMessages(2)).to.eq(true); + }); + + // Will be skipped until https://github.com/waku-org/js-waku/issues/1464 si done + it.skip("Check publish message after client node is restarted", async function () { + await waku1.relay.send(TestEncoder, { + payload: utf8ToBytes("m1") + }); + + // Restart js-waku node + await waku2.stop(); + expect(waku2.isStarted()).to.eq(false); + await waku2.start(); + expect(waku2.isStarted()).to.eq(true); + await waku1.dial(waku2.libp2p.peerId); + + // Redo the connection and create a new subscription + await waitForAllRemotePeers(waku1, waku2); + const pushResponse = await waku1.relay.send(TestEncoder, { + payload: utf8ToBytes("m2") + }); + expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(await messageCollector.waitForMessages(2)).to.eq(true); + }); + + it("Publish message with large meta", async function () { + const customTestEncoder = createEncoder({ + contentTopic: TestContentTopic, + metaSetter: () => new Uint8Array(10 ** 6) + }); + + const pushResponse = await waku1.relay.send(customTestEncoder, { + payload: utf8ToBytes(messageText) + }); + expect(pushResponse.recipients.length).to.eq(1); + expect(pushResponse.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + }); + + it("Publish message with rate limit", async function () { + const rateLimitProof: IRateLimitProof = { + proof: utf8ToBytes("proofData"), + merkleRoot: utf8ToBytes("merkleRootData"), + epoch: utf8ToBytes("epochData"), + shareX: utf8ToBytes("shareXData"), + shareY: utf8ToBytes("shareYData"), + nullifier: utf8ToBytes("nullifierData"), + rlnIdentifier: utf8ToBytes("rlnIdentifierData") + }; + + const pushResponse = await waku1.relay.send(TestEncoder, { + payload: utf8ToBytes(messageText), + rateLimitProof: rateLimitProof + }); + expect(pushResponse.recipients.length).to.eq(1); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + }); +}); diff --git a/packages/tests/tests/relay/subscribe.node.spec.ts b/packages/tests/tests/relay/subscribe.node.spec.ts new file mode 100644 index 0000000000..65771158e2 --- /dev/null +++ b/packages/tests/tests/relay/subscribe.node.spec.ts @@ -0,0 +1,279 @@ +import { createDecoder, createEncoder, DefaultPubSubTopic } from "@waku/core"; +import { RelayNode } from "@waku/interfaces"; +import { createRelayNode } from "@waku/sdk"; +import { utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + generateTestData, + MessageCollector, + NOISE_KEY_1, + NOISE_KEY_2, + tearDownNodes, + TEST_STRING +} from "../../src/index.js"; + +import { + log, + messageText, + TestContentTopic, + TestDecoder, + TestEncoder, + waitForAllRemotePeers +} from "./utils.js"; + +describe("Waku Relay, Subscribe", function () { + this.timeout(40000); + let waku1: RelayNode; + let waku2: RelayNode; + let messageCollector: MessageCollector; + + beforeEach(async function () { + this.timeout(10000); + log.info("Starting JS Waku instances"); + [waku1, waku2] = await Promise.all([ + createRelayNode({ + pubsubTopics: [DefaultPubSubTopic], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [DefaultPubSubTopic], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)) + ]); + log.info("Instances started, adding waku2 to waku1's address book"); + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku1.dial(waku2.libp2p.peerId); + log.info("before each hook done"); + messageCollector = new MessageCollector(); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([], [waku1, waku2]); + }); + + it("Mutual subscription", async function () { + await waitForAllRemotePeers(waku1, waku2); + const subscribers1 = waku1.libp2p.services + .pubsub!.getSubscribers(DefaultPubSubTopic) + .map((p) => p.toString()); + const subscribers2 = waku2.libp2p.services + .pubsub!.getSubscribers(DefaultPubSubTopic) + .map((p) => p.toString()); + + expect(subscribers1).to.contain(waku2.libp2p.peerId.toString()); + expect(subscribers2).to.contain(waku1.libp2p.peerId.toString()); + }); + + it("Register correct protocols", async function () { + const protocols = waku1.libp2p.getProtocols(); + + expect(protocols).to.contain("/vac/waku/relay/2.0.0"); + expect(protocols.findIndex((value) => value.match(/sub/))).to.eq(-1); + }); + + it("Publish without waiting for remote peer", async function () { + try { + await waku1.relay.send(TestEncoder, { + payload: utf8ToBytes(messageText) + }); + throw new Error("Publish was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes("PublishError.InsufficientPeers") + ) { + throw err; + } + } + }); + + it("Subscribe and publish message", async function () { + await waitForAllRemotePeers(waku1, waku2); + await waku2.relay.subscribe([TestDecoder], messageCollector.callback); + await waku1.relay.send(TestEncoder, { payload: utf8ToBytes(messageText) }); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: TestContentTopic + }); + }); + + it("Subscribe and publish 10000 messages on the same topic", async function () { + const messageCount = 10000; + await waitForAllRemotePeers(waku1, waku2); + await waku2.relay.subscribe([TestDecoder], messageCollector.callback); + // Send a unique message on each topic. + for (let i = 0; i < messageCount; i++) { + await waku1.relay.send(TestEncoder, { + payload: utf8ToBytes(`M${i + 1}`) + }); + } + + // Verify that each message was received on the corresponding topic. + expect( + await messageCollector.waitForMessages(messageCount, { exact: true }) + ).to.eq(true); + + for (let i = 0; i < messageCount; i++) { + messageCollector.verifyReceivedMessage(i, { + expectedMessageText: `M${i + 1}`, + expectedContentTopic: TestContentTopic, + checkTimestamp: false + }); + } + }); + + it("Subscribe and publish messages on 2 different content topics", async function () { + const secondContentTopic = "/test/2/waku-relay/utf8"; + const secondEncoder = createEncoder({ contentTopic: secondContentTopic }); + const secondDecoder = createDecoder(secondContentTopic); + + await waitForAllRemotePeers(waku1, waku2); + await waku2.relay.subscribe([TestDecoder], messageCollector.callback); + await waku2.relay.subscribe([secondDecoder], messageCollector.callback); + await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") }); + await waku1.relay.send(secondEncoder, { payload: utf8ToBytes("M2") }); + expect(await messageCollector.waitForMessages(2, { exact: true })).to.eq( + true + ); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: "M1", + expectedContentTopic: TestContentTopic + }); + messageCollector.verifyReceivedMessage(1, { + expectedMessageText: "M2", + expectedContentTopic: secondContentTopic + }); + }); + + it("Subscribe one by one to 100 topics and publish messages", async function () { + const topicCount = 100; + const td = generateTestData(topicCount); + await waitForAllRemotePeers(waku1, waku2); + + // Subscribe to topics one by one + for (let i = 0; i < topicCount; i++) { + await waku2.relay.subscribe([td.decoders[i]], messageCollector.callback); + } + + // Send a unique message on each topic. + for (let i = 0; i < topicCount; i++) { + await waku1.relay.send(td.encoders[i], { + payload: utf8ToBytes(`Message for Topic ${i + 1}`) + }); + } + + // Verify that each message was received on the corresponding topic. + expect( + await messageCollector.waitForMessages(topicCount, { exact: true }) + ).to.eq(true); + td.contentTopics.forEach((topic, index) => { + messageCollector.verifyReceivedMessage(index, { + expectedContentTopic: topic, + expectedMessageText: `Message for Topic ${index + 1}` + }); + }); + }); + + it("Subscribe at once to 10000 topics and publish messages", async function () { + const topicCount = 10000; + const td = generateTestData(topicCount); + await waitForAllRemotePeers(waku1, waku2); + + // Subscribe to all topics at once + await waku2.relay.subscribe(td.decoders, messageCollector.callback); + + // Send a unique message on each topic. + for (let i = 0; i < topicCount; i++) { + await waku1.relay.send(td.encoders[i], { + payload: utf8ToBytes(`Message for Topic ${i + 1}`) + }); + } + + // Verify that each message was received on the corresponding topic. + expect( + await messageCollector.waitForMessages(topicCount, { exact: true }) + ).to.eq(true); + td.contentTopics.forEach((topic, index) => { + messageCollector.verifyReceivedMessage(index, { + expectedContentTopic: topic, + expectedMessageText: `Message for Topic ${index + 1}`, + checkTimestamp: false + }); + }); + }); + + // Will be skipped until https://github.com/waku-org/js-waku/issues/1678 is fixed + it.skip("Refresh subscription", async function () { + await waitForAllRemotePeers(waku1, waku2); + + await waku2.relay.subscribe([TestDecoder], messageCollector.callback); + await waku2.relay.subscribe([TestDecoder], messageCollector.callback); + + await waku1.relay.send(TestEncoder, { payload: utf8ToBytes("M1") }); + + expect(await messageCollector.waitForMessages(1, { exact: true })).to.eq( + true + ); + }); + + // Will be skipped until https://github.com/waku-org/js-waku/issues/1678 is fixed + it.skip("Overlapping topic subscription", async function () { + // Define two sets of test data with overlapping topics. + const topicCount1 = 2; + const td1 = generateTestData(topicCount1); + const topicCount2 = 4; + const td2 = generateTestData(topicCount2); + await waitForAllRemotePeers(waku1, waku2); + + // Subscribe to the first set of topics. + await waku2.relay.subscribe(td1.decoders, messageCollector.callback); + // Subscribe to the second set of topics which has overlapping topics with the first set. + await waku2.relay.subscribe(td2.decoders, messageCollector.callback); + + // Send messages to the first set of topics. + for (let i = 0; i < topicCount1; i++) { + const messageText = `Message for Topic ${i + 1}`; + await waku1.relay.send(td1.encoders[i], { + payload: utf8ToBytes(messageText) + }); + } + + // Send messages to the second set of topics. + for (let i = 0; i < topicCount2; i++) { + const messageText = `Message for Topic ${i + 3}`; + await waku1.relay.send(td2.encoders[i], { + payload: utf8ToBytes(messageText) + }); + } + + // Check if all messages were received. + // Since there are overlapping topics, there should be 6 messages in total (2 from the first set + 4 from the second set). + expect(await messageCollector.waitForMessages(6, { exact: true })).to.eq( + true + ); + }); + + TEST_STRING.forEach((testItem) => { + it(`Subscribe to topic containing ${testItem.description} and publish message`, async function () { + const newContentTopic = testItem.value; + const newEncoder = createEncoder({ contentTopic: newContentTopic }); + const newDecoder = createDecoder(newContentTopic); + await waitForAllRemotePeers(waku1, waku2); + await waku2.relay.subscribe([newDecoder], messageCollector.callback); + await waku1.relay.send(newEncoder, { + payload: utf8ToBytes(messageText) + }); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedMessageText: messageText, + expectedContentTopic: newContentTopic + }); + }); + }); +}); diff --git a/packages/tests/tests/relay/utils.ts b/packages/tests/tests/relay/utils.ts new file mode 100644 index 0000000000..ea866a349f --- /dev/null +++ b/packages/tests/tests/relay/utils.ts @@ -0,0 +1,29 @@ +import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; +import { Protocols, RelayNode } from "@waku/interfaces"; +import { Logger } from "@waku/utils"; + +export const messageText = "Relay works!"; +export const TestContentTopic = "/test/1/waku-relay/utf8"; +export const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); +export const TestDecoder = createDecoder(TestContentTopic); +export const CustomContentTopic = "/test/2/waku-relay/utf8"; +export const CustomPubSubTopic = "/some/pubsub/topic"; +export const CustomEncoder = createEncoder({ + contentTopic: CustomContentTopic, + pubsubTopic: CustomPubSubTopic +}); +export const CustomDecoder = createDecoder( + CustomContentTopic, + CustomPubSubTopic +); + +export const log = new Logger("test:relay"); + +export async function waitForAllRemotePeers( + ...nodes: RelayNode[] +): Promise { + log.info("Wait for mutual pubsub subscription"); + await Promise.all( + nodes.map((node) => waitForRemotePeer(node, [Protocols.Relay])) + ); +}