diff --git a/package-lock.json b/package-lock.json index 799a09ac41..6a2f998419 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3961,6 +3961,12 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/lodash": { + "version": "4.14.199", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.199.tgz", + "integrity": "sha512-Vrjz5N5Ia4SEzWWgIVwnHNEnb1UE1XMkvY5DGXrAeOGE9imk0hgTHh5GyDjLDJi9OTCn9oo9dXH1uToK1VRfrg==", + "dev": true + }, "node_modules/@types/markdown-it": { "version": "12.2.3", "dev": true, @@ -13571,7 +13577,8 @@ }, "node_modules/lodash": { "version": "4.17.21", - "license": "MIT" + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", + "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" }, "node_modules/lodash-es": { "version": "4.17.21", @@ -26134,6 +26141,7 @@ "chai-as-promised": "^7.1.1", "debug": "^4.3.4", "dockerode": "^3.3.5", + "lodash": "^4.17.21", "p-retry": "^6.1.0", "p-timeout": "^6.1.0", "portfinder": "^1.0.32", @@ -26144,6 +26152,7 @@ "@libp2p/bootstrap": "^9.0.2", "@types/chai": "^4.3.5", "@types/dockerode": "^3.3.19", + "@types/lodash": "^4.14.199", "@types/mocha": "^10.0.1", "@types/sinon": "^10.0.16", "@types/tail": "^2.2.1", @@ -28698,6 +28707,12 @@ "version": "3.0.3", "dev": true }, + "@types/lodash": { + "version": "4.14.199", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.14.199.tgz", + "integrity": "sha512-Vrjz5N5Ia4SEzWWgIVwnHNEnb1UE1XMkvY5DGXrAeOGE9imk0hgTHh5GyDjLDJi9OTCn9oo9dXH1uToK1VRfrg==", + "dev": true + }, "@types/markdown-it": { "version": "12.2.3", "dev": true, @@ -29322,6 +29337,7 @@ "@libp2p/peer-id": "^3.0.2", "@types/chai": "^4.3.5", "@types/dockerode": "^3.3.19", + "@types/lodash": "^4.14.199", "@types/mocha": "^10.0.1", "@types/sinon": "^10.0.16", "@types/tail": "^2.2.1", @@ -29342,6 +29358,7 @@ "dockerode": "^3.3.5", "interface-datastore": "^8.2.5", "libp2p": "^0.46.12", + "lodash": "^4.17.21", "mocha": "^10.2.0", "npm-run-all": "^4.1.5", "p-retry": "^6.1.0", @@ -34746,7 +34763,9 @@ } }, "lodash": { - "version": "4.17.21" + "version": "4.17.21", + "resolved": "https://registry.npmjs.org/lodash/-/lodash-4.17.21.tgz", + "integrity": "sha512-v2kDEe57lecTulaDIuNTPy3Ry4gLGJ6Z1O3vE1krgXZNrsQ+LFTGHVxVjcXPs17LhbZVGedAJv8XZ1tvj5FvSg==" }, "lodash-es": { "version": "4.17.21" diff --git a/packages/tests/package.json b/packages/tests/package.json index 330e46d299..b5c5a844c8 100644 --- a/packages/tests/package.json +++ b/packages/tests/package.json @@ -60,6 +60,7 @@ "chai-as-promised": "^7.1.1", "debug": "^4.3.4", "dockerode": "^3.3.5", + "lodash": "^4.17.21", "p-retry": "^6.1.0", "p-timeout": "^6.1.0", "portfinder": "^1.0.32", @@ -70,6 +71,7 @@ "@libp2p/bootstrap": "^9.0.2", "@types/chai": "^4.3.5", "@types/dockerode": "^3.3.19", + "@types/lodash": "^4.14.199", "@types/mocha": "^10.0.1", "@types/sinon": "^10.0.16", "@types/tail": "^2.2.1", diff --git a/packages/tests/src/message_collector.ts b/packages/tests/src/message_collector.ts index d8929dc2a4..5b26e56615 100644 --- a/packages/tests/src/message_collector.ts +++ b/packages/tests/src/message_collector.ts @@ -1,7 +1,8 @@ import { DecodedMessage, DefaultPubSubTopic } from "@waku/core"; -import { bytesToUtf8 } from "@waku/utils/bytes"; +import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { AssertionError, expect } from "chai"; import debug from "debug"; +import isEqual from "lodash/isEqual"; import { MessageRpcResponse } from "./node/interfaces.js"; @@ -36,9 +37,17 @@ export class MessageCollector { } hasMessage(topic: string, text: string): boolean { - return this.list.some( - (message) => message.contentTopic === topic && message.payload === text - ); + return this.list.some((message) => { + if (message.contentTopic !== topic) { + return false; + } + if (typeof message.payload === "string") { + return message.payload === text; + } else if (message.payload instanceof Uint8Array) { + return isEqual(message.payload, utf8ToBytes(text)); + } + return false; + }); } // Type guard to determine if a message is of type MessageRpcResponse diff --git a/packages/tests/src/node/node.ts b/packages/tests/src/node/node.ts index 769775051f..68e9a68aeb 100644 --- a/packages/tests/src/node/node.ts +++ b/packages/tests/src/node/node.ts @@ -167,8 +167,8 @@ export class NimGoNode { async startWithRetries( args: Args, options: { - retries: number; - } + retries?: number; + } = { retries: 3 } ): Promise { await pRetry( async () => { diff --git a/packages/tests/src/teardown.ts b/packages/tests/src/teardown.ts index caecec6e44..e68782999e 100644 --- a/packages/tests/src/teardown.ts +++ b/packages/tests/src/teardown.ts @@ -1,23 +1,49 @@ import { LightNode } from "@waku/interfaces"; import debug from "debug"; +import pRetry from "p-retry"; import { NimGoNode } from "./index.js"; const log = debug("waku:test"); -export function tearDownNodes( - nwakuNodes: NimGoNode[], - wakuNodes: LightNode[] -): void { - nwakuNodes.forEach((nwaku) => { +export async function tearDownNodes( + nwakuNodes: NimGoNode | NimGoNode[], + wakuNodes: LightNode | LightNode[] +): Promise { + const nNodes = Array.isArray(nwakuNodes) ? nwakuNodes : [nwakuNodes]; + const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes]; + + const stopNwakuNodes = nNodes.map(async (nwaku) => { if (nwaku) { - nwaku.stop().catch((e) => log("Nwaku failed to stop", e)); + await pRetry( + async () => { + try { + await nwaku.stop(); + } catch (error) { + log("Nwaku failed to stop:", error); + throw error; + } + }, + { retries: 3 } + ); } }); - wakuNodes.forEach((waku) => { + const stopWakuNodes = wNodes.map(async (waku) => { if (waku) { - waku.stop().catch((e) => log("Waku failed to stop", e)); + await pRetry( + async () => { + try { + await waku.stop(); + } catch (error) { + log("Waku failed to stop:", error); + throw error; + } + }, + { retries: 3 } + ); } }); + + await Promise.all([...stopNwakuNodes, ...stopWakuNodes]); } diff --git a/packages/tests/tests/connection_manager.spec.ts b/packages/tests/tests/connection_manager.spec.ts index d6fbf4193b..b6e1f50a5e 100644 --- a/packages/tests/tests/connection_manager.spec.ts +++ b/packages/tests/tests/connection_manager.spec.ts @@ -149,10 +149,12 @@ describe("ConnectionManager", function () { let waku: LightNode; this.beforeEach(async function () { + this.timeout(15000); waku = await createLightNode(); }); afterEach(async () => { + this.timeout(15000); await waku.stop(); sinon.restore(); }); diff --git a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts index bd38430794..dd0ed193fd 100644 --- a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts @@ -50,7 +50,8 @@ describe("Waku Filter V2: Multiple PubSubtopics", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku, nwaku2], [waku]); + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); }); it("Subscribe and receive messages on custom pubsubtopic", async function () { diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index d2b17afe59..843133e5d8 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -29,7 +29,8 @@ describe("Waku Filter V2: Ping", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku], [waku]); + this.timeout(15000); + await tearDownNodes(nwaku, waku); }); it("Ping on subscribed peer", async function () { diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index cd32af89a3..69be141025 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -37,7 +37,8 @@ describe("Waku Filter V2: FilterPush", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku], [waku]); + this.timeout(15000); + await tearDownNodes(nwaku, waku); }); TEST_STRING.forEach((testItem) => { diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index 540c1391fa..ebb2f072a8 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -48,7 +48,8 @@ describe("Waku Filter V2: Subscribe", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku, nwaku2], [waku]); + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); }); it("Subscribe and receive messages via lightPush", async function () { @@ -341,7 +342,7 @@ describe("Waku Filter V2: Subscribe", function () { } // Check if both messages were received - expect(messageCollector.hasMessage(TestContentTopic, "M1")).to.be.true; - expect(messageCollector.hasMessage(newContentTopic, "M2")).to.be.true; + expect(messageCollector.hasMessage(TestContentTopic, "M1")).to.eq(true); + expect(messageCollector.hasMessage(newContentTopic, "M2")).to.eq(true); }); }); diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index 6ac79010c0..b79d3e313e 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -34,7 +34,8 @@ describe("Waku Filter V2: Unsubscribe", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku], [waku]); + this.timeout(15000); + await tearDownNodes(nwaku, waku); }); it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () { diff --git a/packages/tests/tests/light-push/index.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts similarity index 99% rename from packages/tests/tests/light-push/index.spec.ts rename to packages/tests/tests/light-push/index.node.spec.ts index d06c7edfc8..4e47a4b451 100644 --- a/packages/tests/tests/light-push/index.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -35,7 +35,8 @@ describe("Waku Light Push", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku], [waku]); + this.timeout(15000); + await tearDownNodes(nwaku, waku); }); TEST_STRING.forEach((testItem) => { diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts index 87712ad431..05f8f5068a 100644 --- a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -36,7 +36,8 @@ describe("Waku Light Push : Multiple PubSubtopics", function () { }); let nimPeerId: PeerId; - beforeEach(async function () { + this.beforeEach(async function () { + this.timeout(15000); [nwaku, waku] = await runNodes(this, [ customPubSubTopic, DefaultPubSubTopic @@ -46,7 +47,8 @@ describe("Waku Light Push : Multiple PubSubtopics", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku, nwaku2], [waku]); + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); }); it("Push message on custom pubSubTopic", async function () { diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts deleted file mode 100644 index 41c4249153..0000000000 --- a/packages/tests/tests/store.node.spec.ts +++ /dev/null @@ -1,772 +0,0 @@ -import { - createCursor, - createDecoder, - createEncoder, - DecodedMessage, - Decoder, - DefaultPubSubTopic, - PageDirection, - waitForRemotePeer -} from "@waku/core"; -import type { IMessage, LightNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; -import { - createDecoder as createEciesDecoder, - createEncoder as createEciesEncoder, - generatePrivateKey, - getPublicKey -} from "@waku/message-encryption/ecies"; -import { - createDecoder as createSymDecoder, - createEncoder as createSymEncoder, - generateSymmetricKey -} from "@waku/message-encryption/symmetric"; -import { createLightNode } from "@waku/sdk"; -import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; -import { expect } from "chai"; -import debug from "debug"; - -import { - delay, - makeLogFileName, - NOISE_KEY_1, - NOISE_KEY_2 -} from "../src/index.js"; -import { NimGoNode } from "../src/node/node.js"; - -const log = debug("waku:test:store"); - -const TestContentTopic = "/test/1/waku-store/utf8"; -const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); -const TestDecoder = createDecoder(TestContentTopic); - -describe("Waku Store", () => { - let waku: LightNode; - let nwaku: NimGoNode; - - beforeEach(async function () { - this.timeout(15_000); - nwaku = new NimGoNode(makeLogFileName(this)); - await nwaku.start({ store: true, lightpush: true, relay: true }); - }); - - afterEach(async function () { - !!nwaku && - nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e)); - !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); - }); - - it("Generator", async function () { - this.timeout(15_000); - const totalMsgs = 20; - - for (let i = 0; i < totalMsgs; i++) { - expect( - await nwaku.sendMessage( - NimGoNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: TestContentTopic - }) - ) - ).to.be.true; - } - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1 - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const messages: IMessage[] = []; - let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - const _promises = msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }); - - promises = promises.concat(_promises); - } - await Promise.all(promises); - - expect(messages?.length).eq(totalMsgs); - const result = messages?.findIndex((msg) => { - return msg.payload[0]! === 0; - }); - expect(result).to.not.eq(-1); - }); - - it("Generator, no message returned", async function () { - this.timeout(15_000); - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1 - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const messages: IMessage[] = []; - let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - const _promises = msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }); - - promises = promises.concat(_promises); - } - await Promise.all(promises); - - expect(messages?.length).eq(0); - }); - - it("Passing a cursor", async function () { - this.timeout(4_000); - const totalMsgs = 20; - - for (let i = 0; i < totalMsgs; i++) { - expect( - await nwaku.sendMessage( - NimGoNode.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), - contentTopic: TestContentTopic - }) - ) - ).to.be.true; - } - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1 - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const query = waku.store.queryGenerator([TestDecoder]); - - // messages in reversed order (first message at last index) - const messages: DecodedMessage[] = []; - for await (const page of query) { - for await (const msg of page.reverse()) { - messages.push(msg as DecodedMessage); - } - } - - // index 2 would mean the third last message sent - const cursorIndex = 2; - - // create cursor to extract messages after the 3rd index - const cursor = await createCursor(messages[cursorIndex]); - - const messagesAfterCursor: DecodedMessage[] = []; - for await (const page of waku.store.queryGenerator([TestDecoder], { - cursor - })) { - for await (const msg of page.reverse()) { - messagesAfterCursor.push(msg as DecodedMessage); - } - } - - const testMessage = messagesAfterCursor[0]; - - expect(messages.length).be.eq(totalMsgs); - - expect(bytesToUtf8(testMessage.payload)).to.be.eq( - bytesToUtf8(messages[cursorIndex + 1].payload) - ); - }); - - it("Callback on promise", async function () { - this.timeout(15_000); - - const totalMsgs = 15; - - for (let i = 0; i < totalMsgs; i++) { - expect( - await nwaku.sendMessage( - NimGoNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: TestContentTopic - }) - ) - ).to.be.true; - } - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1 - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const messages: IMessage[] = []; - await waku.store.queryWithPromiseCallback( - [TestDecoder], - async (msgPromise) => { - const msg = await msgPromise; - if (msg) { - messages.push(msg); - } - } - ); - - expect(messages?.length).eq(totalMsgs); - const result = messages?.findIndex((msg) => { - return msg.payload[0]! === 0; - }); - expect(result).to.not.eq(-1); - }); - - it("Callback on promise, aborts when callback returns true", async function () { - this.timeout(15_000); - - const totalMsgs = 20; - - for (let i = 0; i < totalMsgs; i++) { - expect( - await nwaku.sendMessage( - NimGoNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: TestContentTopic - }) - ) - ).to.be.true; - } - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1 - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const desiredMsgs = 14; - const messages: IMessage[] = []; - await waku.store.queryWithPromiseCallback( - [TestDecoder], - async (msgPromise) => { - const msg = await msgPromise; - if (msg) { - messages.push(msg); - } - return messages.length >= desiredMsgs; - }, - { pageSize: 7 } - ); - - expect(messages?.length).eq(desiredMsgs); - }); - - it("Ordered Callback - Forward", async function () { - this.timeout(15_000); - - const totalMsgs = 18; - for (let i = 0; i < totalMsgs; i++) { - expect( - await nwaku.sendMessage( - NimGoNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: TestContentTopic - }) - ) - ).to.be.true; - await delay(1); // to ensure each timestamp is unique. - } - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1 - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const messages: IMessage[] = []; - await waku.store.queryWithOrderedCallback( - [TestDecoder], - async (msg) => { - messages.push(msg); - }, - { - pageDirection: PageDirection.FORWARD - } - ); - - expect(messages?.length).eq(totalMsgs); - const payloads = messages.map((msg) => msg.payload[0]!); - expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys())); - }); - - it("Ordered Callback - Backward", async function () { - this.timeout(15_000); - - const totalMsgs = 18; - for (let i = 0; i < totalMsgs; i++) { - expect( - await nwaku.sendMessage( - NimGoNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: TestContentTopic - }) - ) - ).to.be.true; - await delay(1); // to ensure each timestamp is unique. - } - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1 - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - let messages: IMessage[] = []; - await waku.store.queryWithOrderedCallback( - [TestDecoder], - async (msg) => { - messages.push(msg); - }, - { - pageDirection: PageDirection.BACKWARD - } - ); - - messages = messages.reverse(); - - expect(messages?.length).eq(totalMsgs); - const payloads = messages.map((msg) => msg.payload![0]!); - expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys())); - }); - - it("Generator, with asymmetric & symmetric encrypted messages", async function () { - this.timeout(15_000); - - const asymText = "This message is encrypted for me using asymmetric"; - const asymTopic = "/test/1/asymmetric/proto"; - const symText = - "This message is encrypted for me using symmetric encryption"; - const symTopic = "/test/1/symmetric/proto"; - const clearText = "This is a clear text message for everyone to read"; - const otherText = - "This message is not for and I must not be able to read it"; - - const timestamp = new Date(); - - const asymMsg = { payload: utf8ToBytes(asymText), timestamp }; - const symMsg = { - payload: utf8ToBytes(symText), - timestamp: new Date(timestamp.valueOf() + 1) - }; - const clearMsg = { - payload: utf8ToBytes(clearText), - timestamp: new Date(timestamp.valueOf() + 2) - }; - const otherMsg = { - payload: utf8ToBytes(otherText), - timestamp: new Date(timestamp.valueOf() + 3) - }; - - const privateKey = generatePrivateKey(); - const symKey = generateSymmetricKey(); - const publicKey = getPublicKey(privateKey); - - const eciesEncoder = createEciesEncoder({ - contentTopic: asymTopic, - publicKey - }); - const symEncoder = createSymEncoder({ - contentTopic: symTopic, - symKey - }); - - const otherEncoder = createEciesEncoder({ - contentTopic: TestContentTopic, - publicKey: getPublicKey(generatePrivateKey()) - }); - - const eciesDecoder = createEciesDecoder(asymTopic, privateKey); - const symDecoder = createSymDecoder(symTopic, symKey); - - const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ - createLightNode({ - staticNoiseKey: NOISE_KEY_1 - }).then((waku) => waku.start().then(() => waku)), - createLightNode({ - staticNoiseKey: NOISE_KEY_2 - }).then((waku) => waku.start().then(() => waku)), - nwaku.getMultiaddrWithId() - ]); - - log("Waku nodes created"); - - await Promise.all([ - waku1.dial(nimWakuMultiaddr), - waku2.dial(nimWakuMultiaddr) - ]); - - log("Waku nodes connected to nwaku"); - - await waitForRemotePeer(waku1, [Protocols.LightPush]); - - log("Sending messages using light push"); - await Promise.all([ - waku1.lightPush.send(eciesEncoder, asymMsg), - waku1.lightPush.send(symEncoder, symMsg), - waku1.lightPush.send(otherEncoder, otherMsg), - waku1.lightPush.send(TestEncoder, clearMsg) - ]); - - await waitForRemotePeer(waku2, [Protocols.Store]); - - const messages: DecodedMessage[] = []; - log("Retrieve messages from store"); - - for await (const msgPromises of waku2.store.queryGenerator([ - eciesDecoder, - symDecoder, - TestDecoder - ])) { - for (const promise of msgPromises) { - const msg = await promise; - if (msg) { - messages.push(msg); - } - } - } - - // Messages are ordered from oldest to latest within a page (1 page query) - expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); - expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); - expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText); - expect(messages?.length).eq(3); - - !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); - !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); - }); - - it("Ordered callback, using start and end time", async function () { - this.timeout(20000); - - const now = new Date(); - - const startTime = new Date(); - // Set start time 15 seconds in the past - startTime.setTime(now.getTime() - 15 * 1000); - - const message1Timestamp = new Date(); - // Set first message was 10 seconds in the past - message1Timestamp.setTime(now.getTime() - 10 * 1000); - - const message2Timestamp = new Date(); - // Set second message 2 seconds in the past - message2Timestamp.setTime(now.getTime() - 2 * 1000); - const messageTimestamps = [message1Timestamp, message2Timestamp]; - - const endTime = new Date(); - // Set end time 1 second in the past - endTime.setTime(now.getTime() - 1000); - - for (let i = 0; i < 2; i++) { - expect( - await nwaku.sendMessage( - NimGoNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: TestContentTopic, - timestamp: messageTimestamps[i] - }) - ) - ).to.be.true; - } - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1 - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const firstMessages: IMessage[] = []; - await waku.store.queryWithOrderedCallback( - [TestDecoder], - (msg) => { - if (msg) { - firstMessages.push(msg); - } - }, - { - timeFilter: { startTime, endTime: message1Timestamp } - } - ); - - const bothMessages: IMessage[] = []; - await waku.store.queryWithOrderedCallback( - [TestDecoder], - async (msg) => { - bothMessages.push(msg); - }, - { - timeFilter: { - startTime, - endTime - } - } - ); - - expect(firstMessages?.length).eq(1); - - expect(firstMessages[0].payload![0]!).eq(0); - - expect(bothMessages?.length).eq(2); - }); - - it("Ordered callback, aborts when callback returns true", async function () { - this.timeout(15_000); - - const totalMsgs = 20; - - for (let i = 0; i < totalMsgs; i++) { - expect( - await nwaku.sendMessage( - NimGoNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: TestContentTopic - }) - ) - ).to.be.true; - await delay(1); // to ensure each timestamp is unique. - } - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1 - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const desiredMsgs = 14; - const messages: IMessage[] = []; - await waku.store.queryWithOrderedCallback( - [TestDecoder], - async (msg) => { - messages.push(msg); - return messages.length >= desiredMsgs; - }, - { pageSize: 7 } - ); - - expect(messages?.length).eq(desiredMsgs); - }); -}); - -describe("Waku Store, custom pubsub topic", () => { - const customPubSubTopic = "/waku/2/custom-dapp/proto"; - let waku: LightNode; - let nwaku: NimGoNode; - let nwaku2: NimGoNode; - - const customContentTopic = "/test/2/waku-store/utf8"; - - const customTestDecoder = createDecoder( - customContentTopic, - customPubSubTopic - ); - - beforeEach(async function () { - this.timeout(15_000); - nwaku = new NimGoNode(makeLogFileName(this)); - await nwaku.start({ - store: true, - topic: [customPubSubTopic, DefaultPubSubTopic], - relay: true - }); - await nwaku.ensureSubscriptions([customPubSubTopic, DefaultPubSubTopic]); - }); - - afterEach(async function () { - !!nwaku && - nwaku.stop().catch((e) => console.log("Nwaku failed to stop", e)); - !!nwaku2 && - nwaku2.stop().catch((e) => console.log("Nwaku failed to stop", e)); - !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); - }); - - it("Generator, custom pubsub topic", async function () { - this.timeout(15_000); - - const totalMsgs = 20; - for (let i = 0; i < totalMsgs; i++) { - expect( - await nwaku.sendMessage( - NimGoNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: customContentTopic - }), - customPubSubTopic - ) - ).to.be.true; - } - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1, - pubSubTopics: [customPubSubTopic] - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const messages: IMessage[] = []; - let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([ - customTestDecoder - ])) { - const _promises = msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - expect(msg.pubSubTopic).to.eq(customPubSubTopic); - } - }); - - promises = promises.concat(_promises); - } - await Promise.all(promises); - - expect(messages?.length).eq(totalMsgs); - const result = messages?.findIndex((msg) => { - return msg.payload![0]! === 0; - }); - expect(result).to.not.eq(-1); - }); - - it("Generator, 2 different pubsubtopics", async function () { - this.timeout(10000); - - const totalMsgs = 10; - await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic); - await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1, - pubSubTopics: [customPubSubTopic, DefaultPubSubTopic] - }); - await waku.start(); - - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const customMessages = await processMessages( - waku, - [customTestDecoder], - customPubSubTopic - ); - expect(customMessages?.length).eq(totalMsgs); - const result1 = customMessages?.findIndex((msg) => { - return msg.payload![0]! === 0; - }); - expect(result1).to.not.eq(-1); - - const testMessages = await processMessages( - waku, - [TestDecoder], - DefaultPubSubTopic - ); - expect(testMessages?.length).eq(totalMsgs); - const result2 = testMessages?.findIndex((msg) => { - return msg.payload![0]! === 0; - }); - expect(result2).to.not.eq(-1); - }); - - it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () { - this.timeout(10000); - - // Set up and start a new nwaku node with Default PubSubtopic - nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); - await nwaku2.start({ - store: true, - topic: [DefaultPubSubTopic], - relay: true - }); - - const totalMsgs = 10; - await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic); - await sendMessages(nwaku2, totalMsgs, TestContentTopic, DefaultPubSubTopic); - - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1, - pubSubTopics: [customPubSubTopic, DefaultPubSubTopic] - }); - await waku.start(); - - await waku.dial(await nwaku.getMultiaddrWithId()); - await waku.dial(await nwaku2.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - let customMessages: IMessage[] = []; - let testMessages: IMessage[] = []; - - while ( - customMessages.length != totalMsgs || - testMessages.length != totalMsgs - ) { - customMessages = await processMessages( - waku, - [customTestDecoder], - customPubSubTopic - ); - testMessages = await processMessages( - waku, - [TestDecoder], - DefaultPubSubTopic - ); - } - }); - - // will move those 2 reusable functions to store/utils when refactoring store tests but with another PR - async function sendMessages( - instance: NimGoNode, - numMessages: number, - contentTopic: string, - pubSubTopic: string - ): Promise { - for (let i = 0; i < numMessages; i++) { - expect( - await instance.sendMessage( - NimGoNode.toMessageRpcQuery({ - payload: new Uint8Array([i]), - contentTopic: contentTopic - }), - pubSubTopic - ) - ).to.be.true; - } - await delay(1); // to ensure each timestamp is unique. - } - - async function processMessages( - instance: LightNode, - decoders: Array, - expectedTopic: string - ): Promise { - const localMessages: IMessage[] = []; - let localPromises: Promise[] = []; - for await (const msgPromises of instance.store.queryGenerator(decoders)) { - const _promises = msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - localMessages.push(msg); - expect(msg.pubSubTopic).to.eq(expectedTopic); - } - }); - - localPromises = localPromises.concat(_promises); - } - await Promise.all(localPromises); - return localMessages; - } -}); diff --git a/packages/tests/tests/store/cursor.node.spec.ts b/packages/tests/tests/store/cursor.node.spec.ts new file mode 100644 index 0000000000..20b0f20ec9 --- /dev/null +++ b/packages/tests/tests/store/cursor.node.spec.ts @@ -0,0 +1,203 @@ +import { createCursor, DecodedMessage, DefaultPubSubTopic } from "@waku/core"; +import type { LightNode } from "@waku/interfaces"; +import { bytesToUtf8 } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; + +import { + customPubSubTopic, + sendMessages, + startAndConnectLightNode, + TestContentTopic, + TestDecoder, + totalMsgs +} from "./utils.js"; + +describe("Waku Store, cursor", function () { + this.timeout(15000); + let waku: LightNode; + let waku2: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.startWithRetries({ store: true, lightpush: true, relay: true }); + await nwaku.ensureSubscriptions(); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes(nwaku, [waku, waku2]); + }); + + [ + [2, 4], + [0, 20], + [10, 40], + [19, 20], + [19, 50], + [110, 120] + ].forEach(([cursorIndex, messageCount]) => { + it(`Passing a valid cursor at ${cursorIndex} index when there are ${messageCount} messages`, async function () { + await sendMessages( + nwaku, + messageCount, + TestContentTopic, + DefaultPubSubTopic + ); + waku = await startAndConnectLightNode(nwaku); + + // messages in reversed order (first message at last index) + const messages: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder])) { + for await (const msg of page.reverse()) { + messages.push(msg as DecodedMessage); + } + } + + // create cursor to extract messages after the cursorIndex + const cursor = await createCursor(messages[cursorIndex]); + + const messagesAfterCursor: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder], { + cursor + })) { + for await (const msg of page.reverse()) { + if (msg) { + messagesAfterCursor.push(msg as DecodedMessage); + } + } + } + + expect(messages.length).be.eql(messageCount); + expect(messagesAfterCursor.length).be.eql(messageCount - cursorIndex - 1); + if (cursorIndex == messages.length - 1) { + // in this case the cursor will return nothin because it points at the end of the list + expect(messagesAfterCursor).be.eql([]); + } else { + expect(bytesToUtf8(messagesAfterCursor[0].payload)).to.be.eq( + bytesToUtf8(messages[cursorIndex + 1].payload) + ); + expect( + bytesToUtf8( + messagesAfterCursor[messagesAfterCursor.length - 1].payload + ) + ).to.be.eq(bytesToUtf8(messages[messages.length - 1].payload)); + } + }); + }); + + it("Reusing cursor across nodes", async function () { + await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); + waku = await startAndConnectLightNode(nwaku); + waku2 = await startAndConnectLightNode(nwaku); + + // messages in reversed order (first message at last index) + const messages: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder])) { + for await (const msg of page.reverse()) { + messages.push(msg as DecodedMessage); + } + } + + // create cursor to extract messages after the cursorIndex + const cursor = await createCursor(messages[5]); + + // query node2 with the cursor from node1 + const messagesAfterCursor: DecodedMessage[] = []; + for await (const page of waku2.store.queryGenerator([TestDecoder], { + cursor + })) { + for await (const msg of page.reverse()) { + if (msg) { + messagesAfterCursor.push(msg as DecodedMessage); + } + } + } + + expect(messages.length).be.eql(totalMsgs); + expect(messagesAfterCursor.length).be.eql(totalMsgs - 6); + expect(bytesToUtf8(messagesAfterCursor[0].payload)).to.be.eq( + bytesToUtf8(messages[6].payload) + ); + expect( + bytesToUtf8(messagesAfterCursor[messagesAfterCursor.length - 1].payload) + ).to.be.eq(bytesToUtf8(messages[messages.length - 1].payload)); + }); + + it("Passing cursor with wrong message digest", async function () { + await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); + waku = await startAndConnectLightNode(nwaku); + + const messages: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder])) { + for await (const msg of page.reverse()) { + messages.push(msg as DecodedMessage); + } + } + const cursor = await createCursor(messages[5]); + + // setting a wrong digest + cursor.digest = new Uint8Array([]); + + const messagesAfterCursor: DecodedMessage[] = []; + try { + for await (const page of waku.store.queryGenerator([TestDecoder], { + cursor + })) { + for await (const msg of page.reverse()) { + if (msg) { + messagesAfterCursor.push(msg as DecodedMessage); + } + } + } + // Should return same as go-waku. Raised bug: https://github.com/waku-org/nwaku/issues/2117 + expect(messagesAfterCursor.length).to.eql(0); + } catch (error) { + if ( + nwaku.type() === "go-waku" && + typeof error === "string" && + error.includes("History response contains an Error: INVALID_CURSOR") + ) { + return; + } + throw error instanceof Error + ? new Error(`Unexpected error: ${error.message}`) + : error; + } + }); + + it("Passing cursor with wrong pubSubTopic", async function () { + await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); + waku = await startAndConnectLightNode(nwaku); + + const messages: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder])) { + for await (const msg of page.reverse()) { + messages.push(msg as DecodedMessage); + } + } + messages[5].pubSubTopic = customPubSubTopic; + const cursor = await createCursor(messages[5]); + + try { + for await (const page of waku.store.queryGenerator([TestDecoder], { + cursor + })) { + page; + } + throw new Error("Cursor with wrong pubsubtopic was accepted"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + `Cursor pubsub topic (${customPubSubTopic}) does not match decoder pubsub topic (${DefaultPubSubTopic})` + ) + ) { + throw err; + } + } + }); +}); diff --git a/packages/tests/tests/store/error_handling.node.spec.ts b/packages/tests/tests/store/error_handling.node.spec.ts new file mode 100644 index 0000000000..37fa6d8971 --- /dev/null +++ b/packages/tests/tests/store/error_handling.node.spec.ts @@ -0,0 +1,224 @@ +import { DefaultPubSubTopic } from "@waku/core"; +import { IMessage, type LightNode } from "@waku/interfaces"; +import { expect } from "chai"; + +import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; + +import { + customPubSubTopic, + customTestDecoder, + processQueriedMessages, + startAndConnectLightNode, + TestDecoder +} from "./utils.js"; + +describe("Waku Store, error handling", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.startWithRetries({ store: true, lightpush: true, relay: true }); + await nwaku.ensureSubscriptions(); + waku = await startAndConnectLightNode(nwaku); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes(nwaku, waku); + }); + + it("Query Generator, Wrong PubSubTopic", async function () { + try { + for await (const msgPromises of waku.store.queryGenerator([ + customTestDecoder + ])) { + msgPromises; + } + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + `PubSub topic ${customPubSubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubSubTopic}` + ) + ) { + throw err; + } + } + }); + + it("Query Generator, Multiple PubSubTopics", async function () { + try { + for await (const msgPromises of waku.store.queryGenerator([ + TestDecoder, + customTestDecoder + ])) { + msgPromises; + } + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "API does not support querying multiple pubsub topics at once" + ) + ) { + throw err; + } + } + }); + + it("Query Generator, No Decoder", async function () { + try { + for await (const msgPromises of waku.store.queryGenerator([])) { + msgPromises; + } + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes("No decoders provided") + ) { + throw err; + } + } + }); + + it("Query Generator, No message returned", async function () { + const messages = await processQueriedMessages( + waku, + [TestDecoder], + DefaultPubSubTopic + ); + expect(messages?.length).eq(0); + }); + + it("Query with Ordered Callback, Wrong PubSubTopic", async function () { + try { + await waku.store.queryWithOrderedCallback( + [customTestDecoder], + async () => {} + ); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + `PubSub topic ${customPubSubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubSubTopic}` + ) + ) { + throw err; + } + } + }); + + it("Query with Ordered Callback, Multiple PubSubTopics", async function () { + try { + await waku.store.queryWithOrderedCallback( + [TestDecoder, customTestDecoder], + async () => {} + ); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "API does not support querying multiple pubsub topics at once" + ) + ) { + throw err; + } + } + }); + + it("Query with Ordered Callback, No Decoder", async function () { + try { + await waku.store.queryWithOrderedCallback([], async () => {}); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes("No decoders provided") + ) { + throw err; + } + } + }); + + it("Query with Ordered Callback, No message returned", async function () { + const messages: IMessage[] = []; + await waku.store.queryWithOrderedCallback([TestDecoder], async (msg) => { + messages.push(msg); + }); + expect(messages?.length).eq(0); + }); + + it("Query with Promise Callback, Wrong PubSubTopic", async function () { + try { + await waku.store.queryWithPromiseCallback( + [customTestDecoder], + async () => {} + ); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + `PubSub topic ${customPubSubTopic} has not been configured on this instance. Configured topics are: ${DefaultPubSubTopic}` + ) + ) { + throw err; + } + } + }); + + it("Query with Promise Callback, Multiple PubSubTopics", async function () { + try { + await waku.store.queryWithPromiseCallback( + [TestDecoder, customTestDecoder], + async () => {} + ); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "API does not support querying multiple pubsub topics at once" + ) + ) { + throw err; + } + } + }); + + it("Query with Promise Callback, No Decoder", async function () { + try { + await waku.store.queryWithPromiseCallback([], async () => {}); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes("No decoders provided") + ) { + throw err; + } + } + }); + + it("Query with Promise Callback, No message returned", async function () { + const messages: IMessage[] = []; + await waku.store.queryWithPromiseCallback( + [TestDecoder], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } + } + ); + expect(messages?.length).eq(0); + }); +}); diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts new file mode 100644 index 0000000000..f519535a72 --- /dev/null +++ b/packages/tests/tests/store/index.node.spec.ts @@ -0,0 +1,333 @@ +import { + createDecoder, + DecodedMessage, + DefaultPubSubTopic, + waitForRemotePeer +} from "@waku/core"; +import type { IMessage, LightNode } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; +import { + createDecoder as createEciesDecoder, + createEncoder as createEciesEncoder, + generatePrivateKey, + getPublicKey +} from "@waku/message-encryption/ecies"; +import { + createDecoder as createSymDecoder, + createEncoder as createSymEncoder, + generateSymmetricKey +} from "@waku/message-encryption/symmetric"; +import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import isEqual from "lodash/isEqual"; + +import { + delay, + makeLogFileName, + MessageCollector, + NimGoNode, + tearDownNodes, + TEST_STRING +} from "../../src/index.js"; + +import { + customContentTopic, + log, + messageText, + processQueriedMessages, + sendMessages, + startAndConnectLightNode, + TestContentTopic, + TestDecoder, + TestEncoder, + totalMsgs +} from "./utils.js"; + +const secondDecoder = createDecoder(customContentTopic, DefaultPubSubTopic); + +describe("Waku Store, general", function () { + this.timeout(15000); + let waku: LightNode; + let waku2: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.startWithRetries({ store: true, lightpush: true, relay: true }); + await nwaku.ensureSubscriptions(); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes(nwaku, [waku, waku2]); + }); + + it("Query generator for multiple messages", async function () { + await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); + waku = await startAndConnectLightNode(nwaku); + const messages = await processQueriedMessages( + waku, + [TestDecoder], + DefaultPubSubTopic + ); + + expect(messages?.length).eq(totalMsgs); + + // checking that the message with text 0 exists + const result = messages?.findIndex((msg) => { + return msg.payload[0]! === 0; + }); + expect(result).to.not.eq(-1); + }); + + it("Query generator for multiple messages with different message text format", async function () { + for (const testItem of TEST_STRING) { + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: utf8ToBytes(testItem["value"]), + contentTopic: TestContentTopic + }), + DefaultPubSubTopic + ) + ).to.be.true; + await delay(1); // to ensure each timestamp is unique. + } + + waku = await startAndConnectLightNode(nwaku); + const messageCollector = new MessageCollector(); + messageCollector.list = await processQueriedMessages( + waku, + [TestDecoder], + DefaultPubSubTopic + ); + + // checking that all message sent were retrieved + TEST_STRING.forEach((testItem) => { + expect( + messageCollector.hasMessage(TestContentTopic, testItem["value"]) + ).to.eq(true); + }); + }); + + it("Query generator for multiple messages with multiple decoders", async function () { + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: utf8ToBytes("M1"), + contentTopic: TestContentTopic + }), + DefaultPubSubTopic + ); + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: utf8ToBytes("M2"), + contentTopic: customContentTopic + }), + DefaultPubSubTopic + ); + waku = await startAndConnectLightNode(nwaku); + + const messageCollector = new MessageCollector(); + messageCollector.list = await processQueriedMessages( + waku, + [TestDecoder, secondDecoder], + DefaultPubSubTopic + ); + expect(messageCollector.hasMessage(TestContentTopic, "M1")).to.eq(true); + expect(messageCollector.hasMessage(customContentTopic, "M2")).to.eq(true); + }); + + it("Query generator for multiple messages with different content topic format", async function () { + for (const testItem of TEST_STRING) { + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: utf8ToBytes(messageText), + contentTopic: testItem["value"] + }), + DefaultPubSubTopic + ) + ).to.be.true; + await delay(1); // to ensure each timestamp is unique. + } + + waku = await startAndConnectLightNode(nwaku); + + for (const testItem of TEST_STRING) { + for await (const query of waku.store.queryGenerator([ + createDecoder(testItem["value"]) + ])) { + for await (const msg of query) { + expect(isEqual(msg!.payload, utf8ToBytes(messageText))).to.eq(true); + } + } + } + }); + + it("Callback on promise", async function () { + await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); + waku = await startAndConnectLightNode(nwaku); + + const messages: IMessage[] = []; + await waku.store.queryWithPromiseCallback( + [TestDecoder], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } + } + ); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payload[0]! === 0; + }); + expect(result).to.not.eq(-1); + }); + + it("Callback on promise, aborts when callback returns true", async function () { + await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); + waku = await startAndConnectLightNode(nwaku); + + const desiredMsgs = 14; + const messages: IMessage[] = []; + await waku.store.queryWithPromiseCallback( + [TestDecoder], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } + return messages.length >= desiredMsgs; + }, + { pageSize: 7 } + ); + + expect(messages?.length).eq(desiredMsgs); + }); + + it("Generator, with asymmetric & symmetric encrypted messages", async function () { + const asymText = "This message is encrypted for me using asymmetric"; + const asymTopic = "/test/1/asymmetric/proto"; + const symText = + "This message is encrypted for me using symmetric encryption"; + const symTopic = "/test/1/symmetric/proto"; + const clearText = "This is a clear text message for everyone to read"; + const otherText = + "This message is not for and I must not be able to read it"; + + const timestamp = new Date(); + + const asymMsg = { payload: utf8ToBytes(asymText), timestamp }; + const symMsg = { + payload: utf8ToBytes(symText), + timestamp: new Date(timestamp.valueOf() + 1) + }; + const clearMsg = { + payload: utf8ToBytes(clearText), + timestamp: new Date(timestamp.valueOf() + 2) + }; + const otherMsg = { + payload: utf8ToBytes(otherText), + timestamp: new Date(timestamp.valueOf() + 3) + }; + + const privateKey = generatePrivateKey(); + const symKey = generateSymmetricKey(); + const publicKey = getPublicKey(privateKey); + + const eciesEncoder = createEciesEncoder({ + contentTopic: asymTopic, + publicKey + }); + const symEncoder = createSymEncoder({ + contentTopic: symTopic, + symKey + }); + + const otherEncoder = createEciesEncoder({ + contentTopic: TestContentTopic, + publicKey: getPublicKey(generatePrivateKey()) + }); + + const eciesDecoder = createEciesDecoder(asymTopic, privateKey); + const symDecoder = createSymDecoder(symTopic, symKey); + + waku = await startAndConnectLightNode(nwaku); + waku2 = await startAndConnectLightNode(nwaku); + const nimWakuMultiaddr = await nwaku.getMultiaddrWithId(); + + await Promise.all([ + waku.dial(nimWakuMultiaddr), + waku2.dial(nimWakuMultiaddr) + ]); + + log("Waku nodes connected to nwaku"); + + await waitForRemotePeer(waku, [Protocols.LightPush]); + + log("Sending messages using light push"); + await Promise.all([ + waku.lightPush.send(eciesEncoder, asymMsg), + waku.lightPush.send(symEncoder, symMsg), + waku.lightPush.send(otherEncoder, otherMsg), + waku.lightPush.send(TestEncoder, clearMsg) + ]); + + await waitForRemotePeer(waku2, [Protocols.Store]); + + const messages: DecodedMessage[] = []; + log("Retrieve messages from store"); + + for await (const query of waku2.store.queryGenerator([ + eciesDecoder, + symDecoder, + TestDecoder + ])) { + for await (const msg of query) { + if (msg) { + messages.push(msg as DecodedMessage); + } + } + } + + // Messages are ordered from oldest to latest within a page (1 page query) + expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); + expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); + expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText); + expect(messages?.length).eq(3); + }); + + it("Ordered callback, aborts when callback returns true", async function () { + await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); + waku = await startAndConnectLightNode(nwaku); + + const desiredMsgs = 14; + const messages: IMessage[] = []; + await waku.store.queryWithOrderedCallback( + [TestDecoder], + async (msg) => { + messages.push(msg); + return messages.length >= desiredMsgs; + }, + { pageSize: 7 } + ); + + expect(messages?.length).eq(desiredMsgs); + }); + + it("Query generator for 2000 messages", async function () { + this.timeout(40000); + await sendMessages(nwaku, 2000, TestContentTopic, DefaultPubSubTopic); + waku = await startAndConnectLightNode(nwaku); + const messages = await processQueriedMessages( + waku, + [TestDecoder], + DefaultPubSubTopic + ); + + expect(messages?.length).eq(2000); + }); +}); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts new file mode 100644 index 0000000000..f64e80be7e --- /dev/null +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -0,0 +1,143 @@ +import { DefaultPubSubTopic, waitForRemotePeer } from "@waku/core"; +import type { IMessage, LightNode } from "@waku/interfaces"; +import { createLightNode, Protocols } from "@waku/sdk"; +import { expect } from "chai"; + +import { + makeLogFileName, + NimGoNode, + NOISE_KEY_1, + tearDownNodes +} from "../../src/index.js"; + +import { + customContentTopic, + customPubSubTopic, + customTestDecoder, + processQueriedMessages, + sendMessages, + startAndConnectLightNode, + TestContentTopic, + TestDecoder, + totalMsgs +} from "./utils.js"; + +describe("Waku Store, custom pubsub topic", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start({ + store: true, + topic: [customPubSubTopic, DefaultPubSubTopic], + relay: true + }); + await nwaku.ensureSubscriptions([customPubSubTopic, DefaultPubSubTopic]); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Generator, custom pubsub topic", async function () { + await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic); + waku = await startAndConnectLightNode(nwaku, [customPubSubTopic]); + const messages = await processQueriedMessages( + waku, + [customTestDecoder], + customPubSubTopic + ); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result).to.not.eq(-1); + }); + + it("Generator, 2 different pubsubtopics", async function () { + this.timeout(10000); + + const totalMsgs = 10; + await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic); + await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); + + waku = await startAndConnectLightNode(nwaku, [ + customPubSubTopic, + DefaultPubSubTopic + ]); + + const customMessages = await processQueriedMessages( + waku, + [customTestDecoder], + customPubSubTopic + ); + expect(customMessages?.length).eq(totalMsgs); + const result1 = customMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result1).to.not.eq(-1); + + const testMessages = await processQueriedMessages( + waku, + [TestDecoder], + DefaultPubSubTopic + ); + expect(testMessages?.length).eq(totalMsgs); + const result2 = testMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result2).to.not.eq(-1); + }); + + it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () { + this.timeout(10000); + + // Set up and start a new nwaku node with Default PubSubtopic + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + store: true, + topic: [DefaultPubSubTopic], + relay: true + }); + await nwaku2.ensureSubscriptions([DefaultPubSubTopic]); + + const totalMsgs = 10; + await sendMessages(nwaku, totalMsgs, customContentTopic, customPubSubTopic); + await sendMessages(nwaku2, totalMsgs, TestContentTopic, DefaultPubSubTopic); + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + pubSubTopics: [customPubSubTopic, DefaultPubSubTopic] + }); + await waku.start(); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + let customMessages: IMessage[] = []; + let testMessages: IMessage[] = []; + + while ( + customMessages.length != totalMsgs || + testMessages.length != totalMsgs + ) { + customMessages = await processQueriedMessages( + waku, + [customTestDecoder], + customPubSubTopic + ); + testMessages = await processQueriedMessages( + waku, + [TestDecoder], + DefaultPubSubTopic + ); + } + }); +}); diff --git a/packages/tests/tests/store/order.node.spec.ts b/packages/tests/tests/store/order.node.spec.ts new file mode 100644 index 0000000000..98f34b4bcb --- /dev/null +++ b/packages/tests/tests/store/order.node.spec.ts @@ -0,0 +1,129 @@ +import { DecodedMessage, DefaultPubSubTopic, PageDirection } from "@waku/core"; +import type { IMessage, LightNode } from "@waku/interfaces"; +import { expect } from "chai"; + +import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; + +import { + chunkAndReverseArray, + sendMessages, + startAndConnectLightNode, + TestContentTopic, + TestDecoder, + totalMsgs +} from "./utils.js"; + +describe("Waku Store, order", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.startWithRetries({ store: true, lightpush: true, relay: true }); + await nwaku.ensureSubscriptions(); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes(nwaku, waku); + }); + + [PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => { + it(`Query Generator - ${pageDirection}`, async function () { + await sendMessages( + nwaku, + totalMsgs, + TestContentTopic, + DefaultPubSubTopic + ); + waku = await startAndConnectLightNode(nwaku); + + const messages: IMessage[] = []; + for await (const query of waku.store.queryGenerator([TestDecoder], { + pageDirection: pageDirection + })) { + for await (const msg of query) { + if (msg) { + messages.push(msg as DecodedMessage); + } + } + } + + let expectedPayloads = Array.from(Array(totalMsgs).keys()); + if (pageDirection === PageDirection.BACKWARD) { + expectedPayloads = chunkAndReverseArray(expectedPayloads, 10); + } + + expect(messages?.length).eq(totalMsgs); + const payloads = messages.map((msg) => msg.payload[0]!); + expect(payloads).to.deep.eq(expectedPayloads); + }); + }); + + [PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => { + it(`Promise Callback - ${pageDirection}`, async function () { + await sendMessages( + nwaku, + totalMsgs, + TestContentTopic, + DefaultPubSubTopic + ); + waku = await startAndConnectLightNode(nwaku); + + const messages: IMessage[] = []; + await waku.store.queryWithPromiseCallback( + [TestDecoder], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } + }, + { + pageDirection: pageDirection + } + ); + + let expectedPayloads = Array.from(Array(totalMsgs).keys()); + if (pageDirection === PageDirection.BACKWARD) { + expectedPayloads = chunkAndReverseArray(expectedPayloads, 10); + } + + expect(messages?.length).eq(totalMsgs); + const payloads = messages.map((msg) => msg.payload[0]!); + expect(payloads).to.deep.eq(expectedPayloads); + }); + }); + + [PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => { + it(`Ordered Callback - ${pageDirection}`, async function () { + await sendMessages( + nwaku, + totalMsgs, + TestContentTopic, + DefaultPubSubTopic + ); + waku = await startAndConnectLightNode(nwaku); + + const messages: IMessage[] = []; + await waku.store.queryWithOrderedCallback( + [TestDecoder], + async (msg) => { + messages.push(msg); + }, + { + pageDirection: pageDirection + } + ); + + if (pageDirection === PageDirection.BACKWARD) { + messages.reverse(); + } + expect(messages?.length).eq(totalMsgs); + const payloads = messages.map((msg) => msg.payload[0]!); + expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys())); + }); + }); +}); diff --git a/packages/tests/tests/store/page_size.node.spec.ts b/packages/tests/tests/store/page_size.node.spec.ts new file mode 100644 index 0000000000..4f1b466a10 --- /dev/null +++ b/packages/tests/tests/store/page_size.node.spec.ts @@ -0,0 +1,99 @@ +import { DefaultPubSubTopic } from "@waku/core"; +import type { LightNode } from "@waku/interfaces"; +import { expect } from "chai"; + +import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; + +import { + sendMessages, + startAndConnectLightNode, + TestContentTopic, + TestDecoder +} from "./utils.js"; + +describe("Waku Store, page size", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.startWithRetries({ store: true, lightpush: true, relay: true }); + await nwaku.ensureSubscriptions(); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes(nwaku, waku); + }); + + [ + [0, 110], + [1, 4], + [3, 20], + [10, 10], + [11, 10], + [19, 20], + [110, 120] + ].forEach(([pageSize, messageCount]) => { + it(`Passing page size ${pageSize} when there are ${messageCount} messages`, async function () { + await sendMessages( + nwaku, + messageCount, + TestContentTopic, + DefaultPubSubTopic + ); + + // Determine effectivePageSize for test expectations + let effectivePageSize = pageSize; + if (pageSize === 0) { + if (nwaku.type() == "go-waku") { + effectivePageSize = 100; + } else { + effectivePageSize = 20; + } + } else if (pageSize > 100) { + effectivePageSize = 100; + } + + waku = await startAndConnectLightNode(nwaku); + let messagesRetrieved = 0; + for await (const query of waku.store.queryGenerator([TestDecoder], { + pageSize: pageSize + })) { + // Calculate expected page size + const expectedPageSize = Math.min( + effectivePageSize, + messageCount - messagesRetrieved + ); + expect(query.length).eq(expectedPageSize); + + for await (const msg of query) { + if (msg) { + messagesRetrieved++; + } + } + } + + expect(messagesRetrieved).eq(messageCount); + }); + }); + + // Possible issue here because pageSize differs across implementations + it("Default pageSize", async function () { + await sendMessages(nwaku, 20, TestContentTopic, DefaultPubSubTopic); + waku = await startAndConnectLightNode(nwaku); + + let messagesRetrieved = 0; + for await (const query of waku.store.queryGenerator([TestDecoder])) { + expect(query.length).eq(10); + for await (const msg of query) { + if (msg) { + messagesRetrieved++; + } + } + } + expect(messagesRetrieved).eq(20); + }); +}); diff --git a/packages/tests/tests/store/sorting.node.spec.ts b/packages/tests/tests/store/sorting.node.spec.ts new file mode 100644 index 0000000000..b7d232cec6 --- /dev/null +++ b/packages/tests/tests/store/sorting.node.spec.ts @@ -0,0 +1,110 @@ +import { DecodedMessage, DefaultPubSubTopic, PageDirection } from "@waku/core"; +import type { IMessage, LightNode } from "@waku/interfaces"; + +import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; + +import { + sendMessages, + startAndConnectLightNode, + TestContentTopic, + TestDecoder, + totalMsgs +} from "./utils.js"; + +describe("Waku Store, sorting", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.startWithRetries({ store: true, lightpush: true, relay: true }); + await nwaku.ensureSubscriptions(); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes(nwaku, waku); + }); + + [PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => { + it(`Query Generator sorting by timestamp while page direction is ${pageDirection}`, async function () { + await sendMessages( + nwaku, + totalMsgs, + TestContentTopic, + DefaultPubSubTopic + ); + waku = await startAndConnectLightNode(nwaku); + + for await (const query of waku.store.queryGenerator([TestDecoder], { + pageDirection: PageDirection.FORWARD + })) { + const page: IMessage[] = []; + for await (const msg of query) { + if (msg) { + page.push(msg as DecodedMessage); + } + } + // Extract timestamps + const timestamps = page.map( + (msg) => msg.timestamp as unknown as bigint + ); + // Check if timestamps are sorted + for (let i = 1; i < timestamps.length; i++) { + if (timestamps[i] < timestamps[i - 1]) { + throw new Error( + `Messages are not sorted by timestamp. Found out of order at index ${i}` + ); + } + } + } + }); + }); + + [PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => { + it(`Ordered Callback sorting by timestamp while page direction is ${pageDirection}`, async function () { + await sendMessages( + nwaku, + totalMsgs, + TestContentTopic, + DefaultPubSubTopic + ); + waku = await startAndConnectLightNode(nwaku); + + const messages: IMessage[] = []; + await waku.store.queryWithOrderedCallback( + [TestDecoder], + async (msg) => { + messages.push(msg); + }, + { + pageDirection: pageDirection + } + ); + // Extract timestamps + const timestamps = messages.map( + (msg) => msg.timestamp as unknown as bigint + ); + // Check if timestamps are sorted + for (let i = 1; i < timestamps.length; i++) { + if ( + pageDirection === PageDirection.FORWARD && + timestamps[i] < timestamps[i - 1] + ) { + throw new Error( + `Messages are not sorted by timestamp in FORWARD direction. Found out of order at index ${i}` + ); + } else if ( + pageDirection === PageDirection.BACKWARD && + timestamps[i] > timestamps[i - 1] + ) { + throw new Error( + `Messages are not sorted by timestamp in BACKWARD direction. Found out of order at index ${i}` + ); + } + } + }); + }); +}); diff --git a/packages/tests/tests/store/time_filter.node.spec.ts b/packages/tests/tests/store/time_filter.node.spec.ts new file mode 100644 index 0000000000..f53ff23b8f --- /dev/null +++ b/packages/tests/tests/store/time_filter.node.spec.ts @@ -0,0 +1,119 @@ +import type { IMessage, LightNode } from "@waku/interfaces"; +import { expect } from "chai"; + +import { makeLogFileName, NimGoNode, tearDownNodes } from "../../src/index.js"; + +import { + adjustDate, + startAndConnectLightNode, + TestContentTopic, + TestDecoder +} from "./utils.js"; + +describe("Waku Store, time filter", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.startWithRetries({ store: true, lightpush: true, relay: true }); + await nwaku.ensureSubscriptions(); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes(nwaku, waku); + }); + + [ + [-19000, -10, 10], + [-19000, 1, 4], + [-19000, -2, -1], + // [-19000, 0, 1000], // skipped for now because it fails on gowaku which returns messages > startTime + [-19000, -1000, 0], + [19000, -10, 10], // message in the future + [-19000, 10, -10] // startTime is newer than endTime + ].forEach(([msgTime, startTime, endTime]) => { + it(`msgTime: ${msgTime} ms from now, startTime: ${ + msgTime + startTime + }, endTime: ${msgTime + endTime}`, async function () { + const msgTimestamp = adjustDate(new Date(), msgTime); + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([0]), + contentTopic: TestContentTopic, + timestamp: msgTimestamp + }) + ) + ).to.be.true; + + waku = await startAndConnectLightNode(nwaku); + + const messages: IMessage[] = []; + await waku.store.queryWithOrderedCallback( + [TestDecoder], + (msg) => { + if (msg) { + messages.push(msg); + } + }, + { + timeFilter: { + startTime: adjustDate(msgTimestamp, startTime), + endTime: adjustDate(msgTimestamp, endTime) + } + } + ); + + // in this context 0 is the messageTimestamp + if ( + (startTime > 0 && endTime > 0) || + (startTime < 0 && endTime < 0) || + startTime > endTime + ) { + expect(messages.length).eq(0); + } else { + expect(messages.length).eq(1); + expect(messages[0].payload![0]!).eq(0); + } + }); + }); + + [-20000, 40000].forEach((msgTime) => { + it(`Timestamp too far from node time: ${msgTime} ms from now`, async function () { + const msgTimestamp = adjustDate(new Date(), msgTime); + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([0]), + contentTopic: TestContentTopic, + timestamp: msgTimestamp + }) + ) + ).to.be.true; + + waku = await startAndConnectLightNode(nwaku); + + const messages: IMessage[] = []; + await waku.store.queryWithOrderedCallback( + [TestDecoder], + (msg) => { + if (msg) { + messages.push(msg); + } + }, + { + timeFilter: { + startTime: adjustDate(msgTimestamp, -1000), + endTime: adjustDate(msgTimestamp, 1000) + } + } + ); + + expect(messages.length).eq(0); + }); + }); +}); diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts new file mode 100644 index 0000000000..d4ccac15ce --- /dev/null +++ b/packages/tests/tests/store/utils.ts @@ -0,0 +1,97 @@ +import { + createDecoder, + createEncoder, + DecodedMessage, + Decoder, + DefaultPubSubTopic, + waitForRemotePeer +} from "@waku/core"; +import { LightNode, Protocols } from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { expect } from "chai"; +import debug from "debug"; + +import { delay, NimGoNode, NOISE_KEY_1 } from "../../src"; + +export const log = debug("waku:test:store"); + +export const TestContentTopic = "/test/1/waku-store/utf8"; +export const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); +export const TestDecoder = createDecoder(TestContentTopic); +export const customContentTopic = "/test/2/waku-store/utf8"; +export const customPubSubTopic = "/waku/2/custom-dapp/proto"; +export const customTestDecoder = createDecoder( + customContentTopic, + customPubSubTopic +); +export const totalMsgs = 20; +export const messageText = "Store Push works!"; + +export async function sendMessages( + instance: NimGoNode, + numMessages: number, + contentTopic: string, + pubSubTopic: string +): Promise { + for (let i = 0; i < numMessages; i++) { + expect( + await instance.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: contentTopic + }), + pubSubTopic + ) + ).to.be.true; + await delay(1); // to ensure each timestamp is unique. + } +} + +export async function processQueriedMessages( + instance: LightNode, + decoders: Array, + expectedTopic?: string +): Promise { + const localMessages: DecodedMessage[] = []; + for await (const query of instance.store.queryGenerator(decoders)) { + for await (const msg of query) { + if (msg) { + expect(msg.pubSubTopic).to.eq(expectedTopic); + localMessages.push(msg as DecodedMessage); + } + } + } + return localMessages; +} + +export async function startAndConnectLightNode( + instance: NimGoNode, + pubSubTopics: string[] = [DefaultPubSubTopic] +): Promise { + const waku = await createLightNode({ + pubSubTopics: pubSubTopics, + staticNoiseKey: NOISE_KEY_1 + }); + await waku.start(); + await waku.dial(await instance.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + log("Waku node created"); + return waku; +} + +export function chunkAndReverseArray( + arr: number[], + chunkSize: number +): number[] { + const result: number[] = []; + for (let i = 0; i < arr.length; i += chunkSize) { + result.push(...arr.slice(i, i + chunkSize).reverse()); + } + return result.reverse(); +} + +export const adjustDate = (baseDate: Date, adjustMs: number): Date => { + const adjusted = new Date(baseDate); + adjusted.setTime(adjusted.getTime() + adjustMs); + return adjusted; +};