From a8143897736160dabca756543f669060ae83162d Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 29 May 2024 18:12:07 +0530 Subject: [PATCH] add tests --- packages/tests/src/lib/index.ts | 7 +- .../tests/tests/store/cursor.node.spec.ts | 223 +++++++++++ .../tests/store/error_handling.node.spec.ts | 225 +++++++++++ packages/tests/tests/store/index.node.spec.ts | 371 ++++++++++++++++++ 4 files changed, 825 insertions(+), 1 deletion(-) create mode 100644 packages/tests/tests/store/cursor.node.spec.ts create mode 100644 packages/tests/tests/store/error_handling.node.spec.ts create mode 100644 packages/tests/tests/store/index.node.spec.ts diff --git a/packages/tests/src/lib/index.ts b/packages/tests/src/lib/index.ts index 924febf82a..44736f84cb 100644 --- a/packages/tests/src/lib/index.ts +++ b/packages/tests/src/lib/index.ts @@ -107,6 +107,11 @@ export class ServiceNodesFleet { message: MessageRpcQuery, pubsubTopic: string = DefaultPubsubTopic ): Promise { + message = { + ...message, + timestamp: + message.timestamp || BigInt(new Date().valueOf()) * BigInt(1_000_000) + }; const relayMessagePromises: Promise[] = this.nodes.map((node) => node.sendMessage(message, pubsubTopic) ); @@ -141,7 +146,7 @@ class MultipleNodesMessageCollector { callback: (msg: DecodedMessage) => void = () => {}; messageList: Array = []; constructor( - private messageCollectors: MessageCollector[], + public messageCollectors: MessageCollector[], private relayNodes?: ServiceNode[], private strictChecking: boolean = false ) { diff --git a/packages/tests/tests/store/cursor.node.spec.ts b/packages/tests/tests/store/cursor.node.spec.ts new file mode 100644 index 0000000000..383736cb6b --- /dev/null +++ b/packages/tests/tests/store/cursor.node.spec.ts @@ -0,0 +1,223 @@ +import { DecodedMessage } from "@waku/core"; +import type { LightNode } from "@waku/interfaces"; +import { bytesToUtf8 } from "@waku/utils/bytes"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + ServiceNodesFleet, + tearDownNodes +} from "../../src"; + +import { + TestDecoder, + TestDecoder2, + TestShardInfo, + totalMsgs +} from "./single_node/utils"; +import { + runMultipleNodes, + sendMessagesToMultipleNodes, + startAndConnectLightNodeWithMultipleServiceNodes +} from "./utils"; + +describe("Waku Store: Multiple Nodes: cursor", function () { + this.timeout(15000); + let waku: LightNode; + let waku2: LightNode; + let serviceNodesFleet: ServiceNodesFleet; + + beforeEachCustom(this, async () => { + [serviceNodesFleet, waku] = await runMultipleNodes(this.ctx, TestShardInfo); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(serviceNodesFleet.nodes, [waku, waku2]); + }); + + [ + [2, 4], + [0, 20], + [10, 40], + [19, 20], + [19, 50], + [110, 120] + ].forEach(([cursorIndex, messageCount]) => { + it(`Passing a valid cursor at ${cursorIndex} index when there are ${messageCount} messages`, async function () { + await sendMessagesToMultipleNodes( + serviceNodesFleet.nodes, + messageCount, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic + ); + + // messages in reversed order (first message at last index) + const messages: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder])) { + for await (const msg of page.reverse()) { + messages.push(msg as DecodedMessage); + } + } + + // create cursor to extract messages after the cursorIndex + const cursor = waku.store.createCursor(messages[cursorIndex]); + + const messagesAfterCursor: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder], { + cursor + })) { + for await (const msg of page.reverse()) { + if (msg) { + messagesAfterCursor.push(msg as DecodedMessage); + } + } + } + + expect(messages.length).be.eql(messageCount); + expect(messagesAfterCursor.length).be.eql(messageCount - cursorIndex - 1); + if (cursorIndex == messages.length - 1) { + // in this case the cursor will return nothin because it points at the end of the list + expect(messagesAfterCursor).be.eql([]); + } else { + expect(bytesToUtf8(messagesAfterCursor[0].payload)).to.be.eq( + bytesToUtf8(messages[cursorIndex + 1].payload) + ); + expect( + bytesToUtf8( + messagesAfterCursor[messagesAfterCursor.length - 1].payload + ) + ).to.be.eq(bytesToUtf8(messages[messages.length - 1].payload)); + } + }); + }); + + it("Reusing cursor across nodes", async function () { + await sendMessagesToMultipleNodes( + serviceNodesFleet.nodes, + totalMsgs, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic + ); + waku2 = await startAndConnectLightNodeWithMultipleServiceNodes( + serviceNodesFleet.nodes, + TestShardInfo + ); + + // messages in reversed order (first message at last index) + const messages: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder])) { + for await (const msg of page.reverse()) { + messages.push(msg as DecodedMessage); + } + } + + // create cursor to extract messages after the cursorIndex + const cursor = waku.store.createCursor(messages[5]); + + // query node2 with the cursor from node1 + const messagesAfterCursor: DecodedMessage[] = []; + for await (const page of waku2.store.queryGenerator([TestDecoder], { + cursor + })) { + for await (const msg of page.reverse()) { + if (msg) { + messagesAfterCursor.push(msg as DecodedMessage); + } + } + } + + expect(messages.length).be.eql(totalMsgs); + expect(messagesAfterCursor.length).be.eql(totalMsgs - 6); + expect(bytesToUtf8(messagesAfterCursor[0].payload)).to.be.eq( + bytesToUtf8(messages[6].payload) + ); + expect( + bytesToUtf8(messagesAfterCursor[messagesAfterCursor.length - 1].payload) + ).to.be.eq(bytesToUtf8(messages[messages.length - 1].payload)); + }); + + it("Passing cursor with wrong message digest", async function () { + await sendMessagesToMultipleNodes( + serviceNodesFleet.nodes, + totalMsgs, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic + ); + + const messages: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder])) { + for await (const msg of page.reverse()) { + messages.push(msg as DecodedMessage); + } + } + const cursor = waku.store.createCursor(messages[5]); + + // setting a wrong digest + cursor.digest = new Uint8Array([]); + + const messagesAfterCursor: DecodedMessage[] = []; + try { + for await (const page of waku.store.queryGenerator([TestDecoder], { + cursor + })) { + for await (const msg of page.reverse()) { + if (msg) { + messagesAfterCursor.push(msg as DecodedMessage); + } + } + } + // Should return same as go-waku. Raised bug: https://github.com/waku-org/nwaku/issues/2117 + expect(messagesAfterCursor.length).to.eql(0); + } catch (error) { + for (const node of serviceNodesFleet.nodes) { + if ( + node.type === "go-waku" && + typeof error === "string" && + error.includes("History response contains an Error: INVALID_CURSOR") + ) { + return; + } + } + throw error instanceof Error + ? new Error(`Unexpected error: ${error.message}`) + : error; + } + }); + + it("Passing cursor with wrong pubsubTopic", async function () { + await sendMessagesToMultipleNodes( + serviceNodesFleet.nodes, + totalMsgs, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic + ); + + const messages: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder])) { + for await (const msg of page.reverse()) { + messages.push(msg as DecodedMessage); + } + } + messages[5].pubsubTopic = TestDecoder2.pubsubTopic; + const cursor = waku.store.createCursor(messages[5]); + + try { + for await (const page of waku.store.queryGenerator([TestDecoder], { + cursor + })) { + void page; + } + throw new Error("Cursor with wrong pubsubtopic was accepted"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + `Cursor pubsub topic (${TestDecoder2.pubsubTopic}) does not match decoder pubsub topic (${TestDecoder.pubsubTopic})` + ) + ) { + throw err; + } + } + }); +}); diff --git a/packages/tests/tests/store/error_handling.node.spec.ts b/packages/tests/tests/store/error_handling.node.spec.ts new file mode 100644 index 0000000000..c32cb1bb16 --- /dev/null +++ b/packages/tests/tests/store/error_handling.node.spec.ts @@ -0,0 +1,225 @@ +import { createDecoder } from "@waku/core"; +import { IMessage, type LightNode } from "@waku/interfaces"; +import { determinePubsubTopic } from "@waku/utils"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + ServiceNodesFleet, + tearDownNodes +} from "../../src"; + +import { + processQueriedMessages, + TestContentTopic1, + TestDecoder, + TestDecoder2, + TestShardInfo +} from "./single_node/utils"; +import { runMultipleNodes } from "./utils"; + +describe("Waku Store: Multiple Peers: error handling", function () { + this.timeout(15000); + let waku: LightNode; + let ServiceNodesFleet: ServiceNodesFleet; + + beforeEachCustom(this, async () => { + [ServiceNodesFleet, waku] = await runMultipleNodes(this.ctx, TestShardInfo); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(ServiceNodesFleet.nodes, waku); + }); + + it("Query Generator, Wrong PubsubTopic", async function () { + const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic"); + + try { + for await (const msgPromises of waku.store.queryGenerator([ + wrongDecoder + ])) { + void msgPromises; + } + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + `Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance. Configured topics are: ${TestDecoder.pubsubTopic}` + ) + ) { + throw err; + } + } + }); + + it("Query Generator, Multiple PubsubTopics", async function () { + try { + for await (const msgPromises of waku.store.queryGenerator([ + TestDecoder, + TestDecoder2 + ])) { + void msgPromises; + } + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "API does not support querying multiple pubsub topics at once" + ) + ) { + throw err; + } + } + }); + + it("Query Generator, No Decoder", async function () { + try { + for await (const msgPromises of waku.store.queryGenerator([])) { + void msgPromises; + } + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes("No decoders provided") + ) { + throw err; + } + } + }); + + it("Query Generator, No message returned", async function () { + const WrongTestPubsubTopic = determinePubsubTopic("/test/1/wrong/utf8"); + const messages = await processQueriedMessages( + waku, + [TestDecoder], + WrongTestPubsubTopic + ); + expect(messages?.length).eq(0); + }); + + it("Query with Ordered Callback, Wrong PubsubTopic", async function () { + const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic"); + try { + await waku.store.queryWithOrderedCallback([wrongDecoder], async () => {}); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + `Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance. Configured topics are: ${TestDecoder.pubsubTopic}` + ) + ) { + throw err; + } + } + }); + + it("Query with Ordered Callback, Multiple PubsubTopics", async function () { + try { + await waku.store.queryWithOrderedCallback( + [TestDecoder, TestDecoder2], + async () => {} + ); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "API does not support querying multiple pubsub topics at once" + ) + ) { + throw err; + } + } + }); + + it("Query with Ordered Callback, No Decoder", async function () { + try { + await waku.store.queryWithOrderedCallback([], async () => {}); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes("No decoders provided") + ) { + throw err; + } + } + }); + + it("Query with Ordered Callback, No message returned", async function () { + const messages: IMessage[] = []; + await waku.store.queryWithOrderedCallback([TestDecoder], async (msg) => { + messages.push(msg); + }); + expect(messages?.length).eq(0); + }); + + it("Query with Promise Callback, Wrong PubsubTopic", async function () { + const wrongDecoder = createDecoder(TestContentTopic1, "WrongPubsubTopic"); + try { + await waku.store.queryWithPromiseCallback([wrongDecoder], async () => {}); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + `Pubsub topic ${wrongDecoder.pubsubTopic} has not been configured on this instance. Configured topics are: ${TestDecoder.pubsubTopic}` + ) + ) { + throw err; + } + } + }); + + it("Query with Promise Callback, Multiple PubsubTopics", async function () { + try { + await waku.store.queryWithPromiseCallback( + [TestDecoder, TestDecoder2], + async () => {} + ); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes( + "API does not support querying multiple pubsub topics at once" + ) + ) { + throw err; + } + } + }); + + it("Query with Promise Callback, No Decoder", async function () { + try { + await waku.store.queryWithPromiseCallback([], async () => {}); + throw new Error("QueryGenerator was successful but was expected to fail"); + } catch (err) { + if ( + !(err instanceof Error) || + !err.message.includes("No decoders provided") + ) { + throw err; + } + } + }); + + it("Query with Promise Callback, No message returned", async function () { + const messages: IMessage[] = []; + await waku.store.queryWithPromiseCallback( + [TestDecoder], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } + } + ); + expect(messages?.length).eq(0); + }); +}); diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts new file mode 100644 index 0000000000..c812e685ff --- /dev/null +++ b/packages/tests/tests/store/index.node.spec.ts @@ -0,0 +1,371 @@ +import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core"; +import type { IMessage, LightNode } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; +import { + generatePrivateKey, + generateSymmetricKey, + getPublicKey +} from "@waku/message-encryption"; +import { + createDecoder as createEciesDecoder, + createEncoder as createEciesEncoder +} from "@waku/message-encryption/ecies"; +import { + createDecoder as createSymDecoder, + createEncoder as createSymEncoder +} from "@waku/message-encryption/symmetric"; +import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; +import { expect } from "chai"; +import { equals } from "uint8arrays/equals"; + +import { + afterEachCustom, + beforeEachCustom, + delay, + ServiceNode, + ServiceNodesFleet, + tearDownNodes, + TEST_STRING +} from "../../src"; + +import { + messageText, + processQueriedMessages, + TestContentTopic1, + TestDecoder, + TestDecoder2, + TestEncoder, + TestPubsubTopic1, + TestShardInfo, + totalMsgs +} from "./single_node/utils"; +import { + runMultipleNodes, + sendMessagesToMultipleNodes, + startAndConnectLightNodeWithMultipleServiceNodes +} from "./utils"; + +describe.only("Waku Store, general", function () { + this.timeout(15000); + let waku: LightNode; + let waku2: LightNode; + let serviceNodesFleet: ServiceNodesFleet; + + beforeEachCustom(this, async () => { + [serviceNodesFleet, waku] = await runMultipleNodes(this.ctx, TestShardInfo); + }); + + afterEachCustom(this, async () => { + await tearDownNodes(serviceNodesFleet.nodes, [waku, waku2]); + }); + + it("Query generator for multiple messages", async function () { + await sendMessagesToMultipleNodes( + serviceNodesFleet.nodes, + totalMsgs, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic + ); + + const messages = await processQueriedMessages( + waku, + [TestDecoder], + TestDecoder.pubsubTopic + ); + + expect(messages?.length).eq(totalMsgs); + + // checking that the message with text 0 exists + const result = messages?.findIndex((msg) => { + return msg.payload[0]! === 0; + }); + expect(result).to.not.eq(-1); + }); + + it.only("Query generator for multiple messages with different message text format", async function () { + for (const testItem of TEST_STRING) { + expect( + await serviceNodesFleet.sendRelayMessage( + ServiceNode.toMessageRpcQuery({ + payload: utf8ToBytes(testItem["value"]), + contentTopic: TestDecoder.contentTopic + }), + TestDecoder.pubsubTopic + ) + ).to.eq(true); + await delay(1); // to ensure each timestamp is unique. + } + + serviceNodesFleet.messageCollector.messageList = + await processQueriedMessages( + waku, + [TestDecoder], + TestDecoder.pubsubTopic + ); + + serviceNodesFleet.messageCollector.messageCollectors.forEach( + (messageCollector) => + (messageCollector.list = serviceNodesFleet.messageCollector.messageList) + ); + + // checking that all message sent were retrieved + TEST_STRING.forEach((testItem) => { + expect( + serviceNodesFleet.messageCollector.hasMessage( + TestDecoder.contentTopic, + testItem["value"] + ) + ).to.eq(true); + }); + }); + + it("Query generator for multiple messages with multiple decoders", async function () { + const SecondDecoder = createDecoder( + TestDecoder2.contentTopic, + TestDecoder.pubsubTopic + ); + + await serviceNodesFleet.sendRelayMessage( + ServiceNode.toMessageRpcQuery({ + payload: utf8ToBytes("M1"), + contentTopic: TestDecoder.contentTopic + }), + TestDecoder.pubsubTopic + ); + await serviceNodesFleet.sendRelayMessage( + ServiceNode.toMessageRpcQuery({ + payload: utf8ToBytes("M2"), + contentTopic: SecondDecoder.contentTopic + }), + SecondDecoder.pubsubTopic + ); + + serviceNodesFleet.messageCollector.messageList = + await processQueriedMessages( + waku, + [TestDecoder, SecondDecoder], + TestDecoder.pubsubTopic + ); + expect( + serviceNodesFleet.messageCollector.hasMessage( + TestDecoder.contentTopic, + "M1" + ) + ).to.eq(true); + expect( + serviceNodesFleet.messageCollector.hasMessage( + SecondDecoder.contentTopic, + "M2" + ) + ).to.eq(true); + }); + + it("Query generator for multiple messages with different content topic format", async function () { + for (const testItem of TEST_STRING) { + expect( + await serviceNodesFleet.sendRelayMessage( + ServiceNode.toMessageRpcQuery({ + payload: utf8ToBytes(messageText), + contentTopic: testItem["value"] + }), + TestDecoder.pubsubTopic + ) + ).to.eq(true); + await delay(1); // to ensure each timestamp is unique. + } + + for (const testItem of TEST_STRING) { + for await (const query of waku.store.queryGenerator([ + createDecoder(testItem["value"], TestDecoder.pubsubTopic) + ])) { + for await (const msg of query) { + expect(equals(msg!.payload, utf8ToBytes(messageText))).to.eq(true); + } + } + } + }); + + it("Callback on promise", async function () { + await sendMessagesToMultipleNodes( + serviceNodesFleet.nodes, + totalMsgs, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic + ); + + const messages: IMessage[] = []; + await waku.store.queryWithPromiseCallback( + [TestDecoder], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } + } + ); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payload[0]! === 0; + }); + expect(result).to.not.eq(-1); + }); + + it("Callback on promise, aborts when callback returns true", async function () { + await sendMessagesToMultipleNodes( + serviceNodesFleet.nodes, + totalMsgs, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic + ); + + const desiredMsgs = 14; + const messages: IMessage[] = []; + await waku.store.queryWithPromiseCallback( + [TestDecoder], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } + return messages.length >= desiredMsgs; + }, + { pageSize: 7 } + ); + + expect(messages?.length).eq(desiredMsgs); + }); + + it("Generator, with asymmetric & symmetric encrypted messages", async function () { + const asymText = "This message is encrypted for me using asymmetric"; + const asymTopic = "/test/1/asymmetric/proto"; + const symText = + "This message is encrypted for me using symmetric encryption"; + const symTopic = "/test/1/symmetric/proto"; + const clearText = "This is a clear text message for everyone to read"; + const otherText = + "This message is not for and I must not be able to read it"; + + const timestamp = new Date(); + + const asymMsg = { payload: utf8ToBytes(asymText), timestamp }; + const symMsg = { + payload: utf8ToBytes(symText), + timestamp: new Date(timestamp.valueOf() + 1) + }; + const clearMsg = { + payload: utf8ToBytes(clearText), + timestamp: new Date(timestamp.valueOf() + 2) + }; + const otherMsg = { + payload: utf8ToBytes(otherText), + timestamp: new Date(timestamp.valueOf() + 3) + }; + + const privateKey = generatePrivateKey(); + const symKey = generateSymmetricKey(); + const publicKey = getPublicKey(privateKey); + + const eciesEncoder = createEciesEncoder({ + contentTopic: asymTopic, + publicKey, + pubsubTopic: TestPubsubTopic1 + }); + const symEncoder = createSymEncoder({ + contentTopic: symTopic, + symKey, + pubsubTopic: TestPubsubTopic1 + }); + + const otherEncoder = createEciesEncoder({ + contentTopic: TestContentTopic1, + pubsubTopic: TestPubsubTopic1, + publicKey: getPublicKey(generatePrivateKey()) + }); + + const eciesDecoder = createEciesDecoder( + asymTopic, + privateKey, + TestDecoder.pubsubTopic + ); + const symDecoder = createSymDecoder( + symTopic, + symKey, + TestDecoder.pubsubTopic + ); + + waku2 = await startAndConnectLightNodeWithMultipleServiceNodes( + serviceNodesFleet.nodes, + TestShardInfo + ); + + await Promise.all([ + waku.lightPush.send(eciesEncoder, asymMsg), + waku.lightPush.send(symEncoder, symMsg), + waku.lightPush.send(otherEncoder, otherMsg), + waku.lightPush.send(TestEncoder, clearMsg) + ]); + + await waitForRemotePeer(waku2, [Protocols.Store]); + + const messages: DecodedMessage[] = []; + + for await (const query of waku2.store.queryGenerator([ + eciesDecoder, + symDecoder, + TestDecoder + ])) { + for await (const msg of query) { + if (msg) { + messages.push(msg as DecodedMessage); + } + } + } + + // Messages are ordered from oldest to latest within a page (1 page query) + expect(bytesToUtf8(messages[0].payload!)).to.eq(asymText); + expect(bytesToUtf8(messages[1].payload!)).to.eq(symText); + expect(bytesToUtf8(messages[2].payload!)).to.eq(clearText); + expect(messages?.length).eq(3); + }); + + it("Ordered callback, aborts when callback returns true", async function () { + await sendMessagesToMultipleNodes( + serviceNodesFleet.nodes, + totalMsgs, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic + ); + + const desiredMsgs = 14; + const messages: IMessage[] = []; + await waku.store.queryWithOrderedCallback( + [TestDecoder], + async (msg) => { + messages.push(msg); + return messages.length >= desiredMsgs; + }, + { pageSize: 7 } + ); + + expect(messages?.length).eq(desiredMsgs); + }); + + it("Query generator for 2000 messages", async function () { + this.timeout(40000); + await sendMessagesToMultipleNodes( + serviceNodesFleet.nodes, + 2000, + TestDecoder.contentTopic, + TestDecoder.pubsubTopic + ); + + const messages = await processQueriedMessages( + waku, + [TestDecoder], + TestDecoder.pubsubTopic + ); + + expect(messages?.length).eq(2000); + }); +});