From f307e9b6c6ebac3bae35da1b1acb44cb9b6a0a4f Mon Sep 17 00:00:00 2001 From: fbarbu15 Date: Fri, 29 Sep 2023 14:03:43 +0300 Subject: [PATCH] intermediat commit --- packages/tests/src/node/node.ts | 4 +- packages/tests/src/teardown.ts | 35 +- packages/tests/tests/filter/ping.node.spec.ts | 3 +- packages/tests/tests/filter/push.node.spec.ts | 3 +- .../tests/tests/filter/subscribe.node.spec.ts | 3 +- .../tests/filter/unsubscribe.node.spec.ts | 3 +- packages/tests/tests/filter/utils.ts | 9 +- .../light-push/custom_pubsub.node.spec.ts | 3 +- packages/tests/tests/light-push/index.spec.ts | 3 +- packages/tests/tests/light-push/utils.ts | 13 +- .../tests/tests/store/multiple_pubsub.spec.ts | 87 +++ packages/tests/tests/store/store.node.spec.ts | 529 ++++++++++++++++++ packages/tests/tests/store/utils.ts | 60 ++ 13 files changed, 725 insertions(+), 30 deletions(-) create mode 100644 packages/tests/tests/store/multiple_pubsub.spec.ts create mode 100644 packages/tests/tests/store/store.node.spec.ts create mode 100644 packages/tests/tests/store/utils.ts diff --git a/packages/tests/src/node/node.ts b/packages/tests/src/node/node.ts index 5cd8c85d14..47345db09a 100644 --- a/packages/tests/src/node/node.ts +++ b/packages/tests/src/node/node.ts @@ -168,8 +168,8 @@ export class NimGoNode { async startWithRetries( args: Args, options: { - retries: number; - } + retries?: number; + } = { retries: 3 } ): Promise { await pRetry( async () => { diff --git a/packages/tests/src/teardown.ts b/packages/tests/src/teardown.ts index caecec6e44..c4605d9c43 100644 --- a/packages/tests/src/teardown.ts +++ b/packages/tests/src/teardown.ts @@ -1,23 +1,46 @@ import { LightNode } from "@waku/interfaces"; import debug from "debug"; +import pRetry from "p-retry"; import { NimGoNode } from "./index.js"; const log = debug("waku:test"); -export function tearDownNodes( +export async function tearDownNodes( nwakuNodes: NimGoNode[], wakuNodes: LightNode[] -): void { - nwakuNodes.forEach((nwaku) => { +): Promise { + const stopNwakuNodes = nwakuNodes.map(async (nwaku) => { if (nwaku) { - nwaku.stop().catch((e) => log("Nwaku failed to stop", e)); + await pRetry( + async () => { + try { + await nwaku.stop(); + } catch (error) { + log("Nwaku failed to stop:", error); + throw error; + } + }, + { retries: 3 } + ); } }); - wakuNodes.forEach((waku) => { + const stopWakuNodes = wakuNodes.map(async (waku) => { if (waku) { - waku.stop().catch((e) => log("Waku failed to stop", e)); + await pRetry( + async () => { + try { + await waku.stop(); + } catch (error) { + log("Waku failed to stop:", error); + throw error; + } + }, + { retries: 3 } + ); } }); + + await Promise.all([...stopNwakuNodes, ...stopWakuNodes]); } diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index b35da7281b..c911e819d1 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -28,7 +28,8 @@ describe("Waku Filter V2: Ping", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku], [waku]); + this.timeout(15000); + await tearDownNodes([nwaku], [waku]); }); it("Ping on subscribed peer", async function () { diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index b695193d34..3a37fdac2c 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -37,7 +37,8 @@ describe("Waku Filter V2: FilterPush", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku], [waku]); + this.timeout(15000); + await tearDownNodes([nwaku], [waku]); }); TEST_STRING.forEach((testItem) => { diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index 0ded20d724..93bf672cab 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -48,7 +48,8 @@ describe("Waku Filter V2: Subscribe", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku, nwaku2], [waku]); + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], [waku]); }); it("Subscribe and receive messages via lightPush", async function () { diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index b1591c2911..bf9a99b1a1 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -34,7 +34,8 @@ describe("Waku Filter V2: Unsubscribe", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku], [waku]); + this.timeout(15000); + await tearDownNodes([nwaku], [waku]); }); it("Unsubscribe 1 topic - node subscribed to 1 topic", async function () { diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 988bf49dbb..1842af001b 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -67,14 +67,7 @@ export async function runNodes( currentTest: Context ): Promise<[NimGoNode, LightNode]> { const nwaku = new NimGoNode(makeLogFileName(currentTest)); - await nwaku.startWithRetries( - { - filter: true, - lightpush: true, - relay: true - }, - { retries: 3 } - ); + await nwaku.startWithRetries({ filter: true, lightpush: true, relay: true }); let waku: LightNode | undefined; try { diff --git a/packages/tests/tests/light-push/custom_pubsub.node.spec.ts b/packages/tests/tests/light-push/custom_pubsub.node.spec.ts index fa8247a786..d1aa4c6921 100644 --- a/packages/tests/tests/light-push/custom_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/custom_pubsub.node.spec.ts @@ -26,7 +26,8 @@ describe("Waku Light Push [node only] - custom pubsub topic", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku], [waku]); + this.timeout(15000); + await tearDownNodes([nwaku], [waku]); }); it("Push message", async function () { diff --git a/packages/tests/tests/light-push/index.spec.ts b/packages/tests/tests/light-push/index.spec.ts index db2e29680b..c62797c93e 100644 --- a/packages/tests/tests/light-push/index.spec.ts +++ b/packages/tests/tests/light-push/index.spec.ts @@ -39,7 +39,8 @@ describe("Waku Light Push [node only]", function () { }); this.afterEach(async function () { - tearDownNodes([nwaku], [waku]); + this.timeout(15000); + await tearDownNodes([nwaku], [waku]); }); TEST_STRING.forEach((testItem) => { diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index 9e2d4b03b7..e669d10938 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -18,14 +18,11 @@ export async function runNodes( ): Promise<[NimGoNode, LightNode]> { const nwakuOptional = pubSubTopic ? { topic: pubSubTopic } : {}; const nwaku = new NimGoNode(makeLogFileName(context)); - await nwaku.startWithRetries( - { - lightpush: true, - relay: true, - ...nwakuOptional - }, - { retries: 3 } - ); + await nwaku.startWithRetries({ + lightpush: true, + relay: true, + ...nwakuOptional + }); let waku: LightNode | undefined; try { diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts new file mode 100644 index 0000000000..05367aa46a --- /dev/null +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -0,0 +1,87 @@ +import { createDecoder, waitForRemotePeer } from "@waku/core"; +import type { IMessage, LightNode } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { expect } from "chai"; + +import { + makeLogFileName, + NimGoNode, + NOISE_KEY_1, + tearDownNodes +} from "../../src/index.js"; + +const customPubSubTopic = "/waku/2/custom-dapp/proto"; +const TestContentTopic = "/test/1/waku-store/utf8"; +const CustomPubSubTestDecoder = createDecoder( + TestContentTopic, + customPubSubTopic +); + +describe("Waku Store, custom pubsub topic", () => { + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.startWithRetries({ + store: true, + topic: customPubSubTopic, + relay: true + }); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku], [waku]); + }); + + it("Generator, custom pubsub topic", async function () { + this.timeout(15000); + + const totalMsgs = 20; + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: TestContentTopic + }), + customPubSubTopic + ) + ).to.be.true; + } + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + pubSubTopics: [customPubSubTopic] + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: IMessage[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([ + CustomPubSubTestDecoder + ])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + expect(msg.pubSubTopic).to.eq(customPubSubTopic); + } + }); + + promises = promises.concat(_promises); + } + await Promise.all(promises); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result).to.not.eq(-1); + }); +}); diff --git a/packages/tests/tests/store/store.node.spec.ts b/packages/tests/tests/store/store.node.spec.ts new file mode 100644 index 0000000000..4efd1389b9 --- /dev/null +++ b/packages/tests/tests/store/store.node.spec.ts @@ -0,0 +1,529 @@ +import { + createCursor, + createDecoder, + createEncoder, + DecodedMessage, + DefaultPubSubTopic, + PageDirection, + waitForRemotePeer +} from "@waku/core"; +import type { IMessage, LightNode } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; +import { + createDecoder as createEciesDecoder, + createEncoder as createEciesEncoder, + generatePrivateKey, + getPublicKey +} from "@waku/message-encryption/ecies"; +import { + createDecoder as createSymDecoder, + createEncoder as createSymEncoder, + generateSymmetricKey +} from "@waku/message-encryption/symmetric"; +import { createLightNode } from "@waku/sdk"; +import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import debug from "debug"; + +import { + delay, + makeLogFileName, + NimGoNode, + NOISE_KEY_1, + NOISE_KEY_2, + tearDownNodes +} from "../../src/index.js"; + +import { + processMessages, + sendMessages, + startAndConnectLightNode +} from "./utils.js"; + +const log = debug("waku:test:store"); + +const TestContentTopic = "/test/1/waku-store/utf8"; +const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); +const TestDecoder = createDecoder(TestContentTopic); + +describe.only("Waku Store", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.startWithRetries({ store: true, lightpush: true, relay: true }); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku], [waku]); + }); + + it("Generator", async function () { + const totalMsgs = 20; + + await sendMessages(nwaku, totalMsgs, TestContentTopic, DefaultPubSubTopic); + waku = await startAndConnectLightNode(nwaku); + const messages = await processMessages( + waku, + [TestDecoder], + DefaultPubSubTopic + ); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payload[0]! === 0; + }); + expect(result).to.not.eq(-1); + }); + + it.only("Generator, no message returned", async function () { + waku = await startAndConnectLightNode(nwaku); + const messages = await processMessages( + waku, + [TestDecoder], + DefaultPubSubTopic + ); + + expect(messages?.length).eq(0); + }); + + it("Passing a cursor", async function () { + this.timeout(4_000); + const totalMsgs = 20; + + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic + }) + ) + ).to.be.true; + } + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1 + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const query = waku.store.queryGenerator([TestDecoder]); + + // messages in reversed order (first message at last index) + const messages: DecodedMessage[] = []; + for await (const page of query) { + for await (const msg of page.reverse()) { + messages.push(msg as DecodedMessage); + } + } + + // index 2 would mean the third last message sent + const cursorIndex = 2; + + // create cursor to extract messages after the 3rd index + const cursor = await createCursor(messages[cursorIndex]); + + const messagesAfterCursor: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder], { + cursor + })) { + for await (const msg of page.reverse()) { + messagesAfterCursor.push(msg as DecodedMessage); + } + } + + const testMessage = messagesAfterCursor[0]; + + expect(messages.length).be.eq(totalMsgs); + + expect(bytesToUtf8(testMessage.payload)).to.be.eq( + bytesToUtf8(messages[cursorIndex + 1].payload) + ); + }); + + it("Callback on promise", async function () { + this.timeout(15_000); + + const totalMsgs = 15; + + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: TestContentTopic + }) + ) + ).to.be.true; + } + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1 + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: IMessage[] = []; + await waku.store.queryWithPromiseCallback( + [TestDecoder], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } + } + ); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payload[0]! === 0; + }); + expect(result).to.not.eq(-1); + }); + + it("Callback on promise, aborts when callback returns true", async function () { + this.timeout(15_000); + + const totalMsgs = 20; + + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: TestContentTopic + }) + ) + ).to.be.true; + } + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1 + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const desiredMsgs = 14; + const messages: IMessage[] = []; + await waku.store.queryWithPromiseCallback( + [TestDecoder], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } + return messages.length >= desiredMsgs; + }, + { pageSize: 7 } + ); + + expect(messages?.length).eq(desiredMsgs); + }); + + it("Ordered Callback - Forward", async function () { + this.timeout(15_000); + + const totalMsgs = 18; + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: TestContentTopic + }) + ) + ).to.be.true; + await delay(1); // to ensure each timestamp is unique. + } + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1 + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: IMessage[] = []; + await waku.store.queryWithOrderedCallback( + [TestDecoder], + async (msg) => { + messages.push(msg); + }, + { + pageDirection: PageDirection.FORWARD + } + ); + + expect(messages?.length).eq(totalMsgs); + const payloads = messages.map((msg) => msg.payload[0]!); + expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys())); + }); + + it("Ordered Callback - Backward", async function () { + this.timeout(15_000); + + const totalMsgs = 18; + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: TestContentTopic + }) + ) + ).to.be.true; + await delay(1); // to ensure each timestamp is unique. + } + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1 + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + let messages: IMessage[] = []; + await waku.store.queryWithOrderedCallback( + [TestDecoder], + async (msg) => { + messages.push(msg); + }, + { + pageDirection: PageDirection.BACKWARD + } + ); + + messages = messages.reverse(); + + expect(messages?.length).eq(totalMsgs); + const payloads = messages.map((msg) => msg.payload![0]!); + expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys())); + }); + + it("Generator, with asymmetric & symmetric encrypted messages", async function () { + this.timeout(15_000); + + const asymText = "This message is encrypted for me using asymmetric"; + const asymTopic = "/test/1/asymmetric/proto"; + const symText = + "This message is encrypted for me using symmetric encryption"; + const symTopic = "/test/1/symmetric/proto"; + const clearText = "This is a clear text message for everyone to read"; + const otherText = + "This message is not for and I must not be able to read it"; + + const timestamp = new Date(); + + const asymMsg = { payload: utf8ToBytes(asymText), timestamp }; + const symMsg = { + payload: utf8ToBytes(symText), + timestamp: new Date(timestamp.valueOf() + 1) + }; + const clearMsg = { + payload: utf8ToBytes(clearText), + timestamp: new Date(timestamp.valueOf() + 2) + }; + const otherMsg = { + payload: utf8ToBytes(otherText), + timestamp: new Date(timestamp.valueOf() + 3) + }; + + const privateKey = generatePrivateKey(); + const symKey = generateSymmetricKey(); + const publicKey = getPublicKey(privateKey); + + const eciesEncoder = createEciesEncoder({ + contentTopic: asymTopic, + publicKey + }); + const symEncoder = createSymEncoder({ + contentTopic: symTopic, + symKey + }); + + const otherEncoder = createEciesEncoder({ + contentTopic: TestContentTopic, + publicKey: getPublicKey(generatePrivateKey()) + }); + + const eciesDecoder = createEciesDecoder(asymTopic, privateKey); + const symDecoder = createSymDecoder(symTopic, symKey); + + const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ + createLightNode({ + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createLightNode({ + staticNoiseKey: NOISE_KEY_2 + }).then((waku) => waku.start().then(() => waku)), + nwaku.getMultiaddrWithId() + ]); + + log("Waku nodes created"); + + await Promise.all([ + waku1.dial(nimWakuMultiaddr), + waku2.dial(nimWakuMultiaddr) + ]); + + log("Waku nodes connected to nwaku"); + + await waitForRemotePeer(waku1, [Protocols.LightPush]); + + log("Sending messages using light push"); + await Promise.all([ + waku1.lightPush.send(eciesEncoder, asymMsg), + waku1.lightPush.send(symEncoder, symMsg), + waku1.lightPush.send(otherEncoder, otherMsg), + waku1.lightPush.send(TestEncoder, clearMsg) + ]); + + await waitForRemotePeer(waku2, [Protocols.Store]); + + const messages: DecodedMessage[] = []; + log("Retrieve messages from store"); + + for await (const msgPromises of waku2.store.queryGenerator([ + eciesDecoder, + symDecoder, + TestDecoder + ])) { + for (const promise of msgPromises) { + const msg = await promise; + if (msg) { + messages.push(msg); + } + } + } + + // Messages are ordered from oldest to latest within a page (1 page query) + expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); + expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); + expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText); + expect(messages?.length).eq(3); + + !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); + !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + + it("Ordered callback, using start and end time", async function () { + this.timeout(20000); + + const now = new Date(); + + const startTime = new Date(); + // Set start time 15 seconds in the past + startTime.setTime(now.getTime() - 15 * 1000); + + const message1Timestamp = new Date(); + // Set first message was 10 seconds in the past + message1Timestamp.setTime(now.getTime() - 10 * 1000); + + const message2Timestamp = new Date(); + // Set second message 2 seconds in the past + message2Timestamp.setTime(now.getTime() - 2 * 1000); + const messageTimestamps = [message1Timestamp, message2Timestamp]; + + const endTime = new Date(); + // Set end time 1 second in the past + endTime.setTime(now.getTime() - 1000); + + for (let i = 0; i < 2; i++) { + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: TestContentTopic, + timestamp: messageTimestamps[i] + }) + ) + ).to.be.true; + } + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1 + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const firstMessages: IMessage[] = []; + await waku.store.queryWithOrderedCallback( + [TestDecoder], + (msg) => { + if (msg) { + firstMessages.push(msg); + } + }, + { + timeFilter: { startTime, endTime: message1Timestamp } + } + ); + + const bothMessages: IMessage[] = []; + await waku.store.queryWithOrderedCallback( + [TestDecoder], + async (msg) => { + bothMessages.push(msg); + }, + { + timeFilter: { + startTime, + endTime + } + } + ); + + expect(firstMessages?.length).eq(1); + + expect(firstMessages[0].payload![0]!).eq(0); + + expect(bothMessages?.length).eq(2); + }); + + it("Ordered callback, aborts when callback returns true", async function () { + this.timeout(15_000); + + const totalMsgs = 20; + + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: TestContentTopic + }) + ) + ).to.be.true; + await delay(1); // to ensure each timestamp is unique. + } + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1 + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const desiredMsgs = 14; + const messages: IMessage[] = []; + await waku.store.queryWithOrderedCallback( + [TestDecoder], + async (msg) => { + messages.push(msg); + return messages.length >= desiredMsgs; + }, + { pageSize: 7 } + ); + + expect(messages?.length).eq(desiredMsgs); + }); +}); diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts new file mode 100644 index 0000000000..455d5a6821 --- /dev/null +++ b/packages/tests/tests/store/utils.ts @@ -0,0 +1,60 @@ +import { Decoder, waitForRemotePeer } from "@waku/core"; +import { IMessage, LightNode, Protocols } from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { expect } from "chai"; + +import { delay, NimGoNode, NOISE_KEY_1 } from "../../src"; + +export async function sendMessages( + instance: NimGoNode, + numMessages: number, + contentTopic: string, + pubSubTopic: string +): Promise { + for (let i = 0; i < numMessages; i++) { + expect( + await instance.sendMessage( + NimGoNode.toMessageRpcQuery({ + payload: new Uint8Array([i]), + contentTopic: contentTopic + }), + pubSubTopic + ) + ).to.be.true; + } + await delay(1); // to ensure each timestamp is unique. +} + +export async function processMessages( + instance: LightNode, + decoders: Array, + expectedTopic: string +): Promise { + const localMessages: IMessage[] = []; + let localPromises: Promise[] = []; + for await (const msgPromises of instance.store.queryGenerator(decoders)) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + localMessages.push(msg); + expect(msg.pubSubTopic).to.eq(expectedTopic); + } + }); + + localPromises = localPromises.concat(_promises); + } + await Promise.all(localPromises); + return localMessages; +} + +export async function startAndConnectLightNode( + instance: NimGoNode +): Promise { + const waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1 + }); + await waku.start(); + await waku.dial(await instance.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + return waku; +}