diff --git a/packages/core/src/lib/store/rpc.ts b/packages/core/src/lib/store/rpc.ts index 800c75af61..0055ed96a3 100644 --- a/packages/core/src/lib/store/rpc.ts +++ b/packages/core/src/lib/store/rpc.ts @@ -6,6 +6,7 @@ import { v4 as uuid } from "uuid"; // https://github.com/waku-org/nwaku/blob/7205f95cff9f49ca0bb762e8fd0bf56a6a7f3b3b/waku/waku_store/common.nim#L12 export const DEFAULT_PAGE_SIZE = 20; export const MAX_PAGE_SIZE = 100; +export const MAX_TIME_RANGE = 24 * 60 * 60 * 1000; const ONE_MILLION = 1_000000; export class StoreQueryRequest { diff --git a/packages/core/src/lib/store/store.spec.ts b/packages/core/src/lib/store/store.spec.ts new file mode 100644 index 0000000000..1cf61eb878 --- /dev/null +++ b/packages/core/src/lib/store/store.spec.ts @@ -0,0 +1,230 @@ +import type { PeerId } from "@libp2p/interface"; +import { + IDecodedMessage, + IDecoder, + Libp2p, + QueryRequestParams +} from "@waku/interfaces"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { StreamManager } from "../stream_manager/index.js"; + +import { + MAX_PAGE_SIZE, + MAX_TIME_RANGE, + StoreQueryRequest, + StoreQueryResponse +} from "./rpc.js"; +import { StoreCore } from "./store.js"; + +describe("StoreCore", () => { + let libp2p: Libp2p; + let storeCore: StoreCore; + let mockStreamManager: sinon.SinonStubbedInstance; + let mockPeerId: PeerId; + let mockStream: any; + let mockDecoder: sinon.SinonStubbedInstance>; + let decoders: Map>; + + const createMockPeerId = (id: string): PeerId => + ({ + toString: () => id, + equals: (other: PeerId) => other.toString() === id + }) as PeerId; + + beforeEach(() => { + libp2p = { + components: { + events: { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub() + }, + connectionManager: { + getConnections: sinon.stub().returns([]) + } + } + } as unknown as Libp2p; + + mockStreamManager = { + getStream: sinon.stub() + } as unknown as sinon.SinonStubbedInstance; + + mockPeerId = createMockPeerId("12D3KooWTest1"); + + mockStream = { + sink: sinon.stub(), + source: [] + }; + + mockDecoder = { + fromProtoObj: sinon.stub() + } as unknown as sinon.SinonStubbedInstance>; + + decoders = new Map([["test-topic", mockDecoder]]); + + sinon + .stub(StreamManager.prototype, "getStream") + .callsFake(mockStreamManager.getStream); + storeCore = new StoreCore(libp2p); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe("queryPerPage", () => { + let queryOpts: QueryRequestParams; + let mockStoreQueryRequest: any; + let mockStoreQueryResponse: any; + + beforeEach(() => { + queryOpts = { + pubsubTopic: "test-topic", + contentTopics: ["test-topic"], + paginationLimit: 10, + includeData: true, + paginationForward: true + }; + + mockStoreQueryRequest = { + encode: sinon.stub().returns(new Uint8Array([1, 2, 3])) + }; + + mockStoreQueryResponse = { + statusCode: 200, + statusDesc: "OK", + messages: [ + { + messageHash: new Uint8Array([1]), + message: { + contentTopic: "test-topic" + }, + pubsubTopic: "test-topic" + } + ] + }; + + sinon.stub(StoreQueryRequest, "create").returns(mockStoreQueryRequest); + sinon.stub(StoreQueryResponse, "decode").returns(mockStoreQueryResponse); + }); + + it("throws if time range exceeds MAX_TIME_RANGE", async () => { + queryOpts.timeStart = new Date(); + queryOpts.timeEnd = new Date( + queryOpts.timeStart.getTime() + MAX_TIME_RANGE + 1000 + ); + const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId); + try { + await generator.next(); + expect.fail("Should have thrown an error"); + } catch (error) { + expect((error as Error).message).to.equal("Time range bigger than 24h"); + } + }); + + it("throws if decoders don't match content topics", async () => { + const differentDecoders = new Map([["different-topic", mockDecoder]]); + const generator = storeCore.queryPerPage( + queryOpts, + differentDecoders, + mockPeerId + ); + try { + await generator.next(); + expect.fail("Should have thrown an error"); + } catch (error) { + expect((error as Error).message).to.equal( + "Internal error, the decoders should match the query's content topics" + ); + } + }); + + it("does not validate decoders for hash queries", async () => { + queryOpts.messageHashes = [new Uint8Array([1, 2, 3])]; + queryOpts.contentTopics = []; + const differentDecoders = new Map([["different-topic", mockDecoder]]); + mockStreamManager.getStream.resolves(mockStream); + const generator = storeCore.queryPerPage( + queryOpts, + differentDecoders, + mockPeerId + ); + const result = await generator.next(); + expect(result.done).to.be.false; + }); + + it("ends if stream creation fails", async () => { + mockStreamManager.getStream.rejects(new Error("Stream creation failed")); + const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId); + const result = await generator.next(); + expect(result.done).to.be.true; + }); + + it("throws if store query response has error status", async () => { + mockStoreQueryResponse.statusCode = 400; + mockStoreQueryResponse.statusDesc = "Bad Request"; + mockStreamManager.getStream.resolves(mockStream); + const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId); + try { + await generator.next(); + expect.fail("Should have thrown an error"); + } catch (error) { + expect((error as Error).message).to.equal( + "Store query failed with status code: 400, description: Bad Request" + ); + } + }); + + it("ends if response has no messages", async () => { + mockStoreQueryResponse.messages = []; + mockStreamManager.getStream.resolves(mockStream); + const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId); + const result = await generator.next(); + expect(result.done).to.be.true; + }); + + it("yields decoded messages", async () => { + const mockDecodedMessage = { + contentTopic: "test-topic" + } as IDecodedMessage; + mockDecoder.fromProtoObj.resolves(mockDecodedMessage); + mockStreamManager.getStream.resolves(mockStream); + const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId); + const result = await generator.next(); + const decodedMessage = await result.value[0]; + expect(decodedMessage).to.equal(mockDecodedMessage); + }); + + it("yields undefined for messages without content topic", async () => { + mockStoreQueryResponse.messages[0].message.contentTopic = undefined; + mockStreamManager.getStream.resolves(mockStream); + const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId); + const result = await generator.next(); + const decodedMessage = await result.value[0]; + expect(decodedMessage).to.be.undefined; + }); + + it("yields undefined for messages without decoder", async () => { + mockStoreQueryResponse.messages[0].message.contentTopic = "unknown-topic"; + mockStreamManager.getStream.resolves(mockStream); + const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId); + const result = await generator.next(); + const decodedMessage = await result.value[0]; + expect(decodedMessage).to.be.undefined; + }); + + it("ends after yielding if response size indicates end", async () => { + queryOpts.paginationLimit = MAX_PAGE_SIZE + 10; + mockStoreQueryResponse.messages = new Array(MAX_PAGE_SIZE + 1).fill({ + messageHash: new Uint8Array([1]), + message: { contentTopic: "test-topic" } + }); + mockStreamManager.getStream.resolves(mockStream); + const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId); + await generator.next(); + const second = await generator.next(); + expect(second.done).to.be.true; + }); + }); +}); diff --git a/packages/core/src/lib/store/store.ts b/packages/core/src/lib/store/store.ts index d5d6686685..ce61b7a553 100644 --- a/packages/core/src/lib/store/store.ts +++ b/packages/core/src/lib/store/store.ts @@ -17,6 +17,7 @@ import { toProtoMessage } from "../to_proto_message.js"; import { DEFAULT_PAGE_SIZE, MAX_PAGE_SIZE, + MAX_TIME_RANGE, StoreQueryRequest, StoreQueryResponse } from "./rpc.js"; @@ -34,11 +35,23 @@ export class StoreCore { this.streamManager = new StreamManager(StoreCodec, libp2p.components); } + public get maxTimeLimit(): number { + return MAX_TIME_RANGE; + } + public async *queryPerPage( queryOpts: QueryRequestParams, decoders: Map>, peerId: PeerId ): AsyncGenerator[]> { + if (queryOpts.timeStart && queryOpts.timeEnd) { + const timeDiff = + queryOpts.timeEnd.getTime() - queryOpts.timeStart.getTime(); + if (timeDiff > MAX_TIME_RANGE) { + throw new Error("Time range bigger than 24h"); + } + } + // Only validate decoder content topics for content-filtered queries const isHashQuery = queryOpts.messageHashes && queryOpts.messageHashes.length > 0; diff --git a/packages/sdk/src/store/store.spec.ts b/packages/sdk/src/store/store.spec.ts new file mode 100644 index 0000000000..025f2df425 --- /dev/null +++ b/packages/sdk/src/store/store.spec.ts @@ -0,0 +1,294 @@ +import { StoreCore } from "@waku/core"; +import type { IDecodedMessage, IDecoder, Libp2p } from "@waku/interfaces"; +import { Protocols } from "@waku/interfaces"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { PeerManager } from "../peer_manager/index.js"; + +import { Store } from "./store.js"; + +describe("Store", () => { + let store: Store; + let mockLibp2p: Libp2p; + let mockPeerManager: sinon.SinonStubbedInstance; + let mockStoreCore: sinon.SinonStubbedInstance; + let mockPeerId: any; + + beforeEach(() => { + mockPeerId = { + toString: () => "QmTestPeerId" + }; + + mockStoreCore = { + multicodec: "test-multicodec", + maxTimeLimit: 24 * 60 * 60 * 1000, // 24 hours + queryPerPage: sinon.stub() + } as unknown as sinon.SinonStubbedInstance; + + mockLibp2p = { + dial: sinon.stub(), + components: { + events: { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub() + } + } + } as unknown as Libp2p; + + mockPeerManager = { + getPeers: sinon.stub() + } as unknown as sinon.SinonStubbedInstance; + + // Stub the StoreCore methods + sinon + .stub(StoreCore.prototype, "queryPerPage") + .callsFake(mockStoreCore.queryPerPage); + + // Stub the maxTimeLimit getter + sinon + .stub(StoreCore.prototype, "maxTimeLimit") + .get(() => 24 * 60 * 60 * 1000); + + store = new Store({ + libp2p: mockLibp2p, + peerManager: mockPeerManager + }); + }); + + afterEach(() => { + sinon.restore(); + }); + + describe("queryGenerator", () => { + const mockDecoder: IDecoder = { + pubsubTopic: "/waku/2/default-waku/proto", + contentTopic: "/test/1/test/proto", + fromWireToProtoObj: sinon.stub(), + fromProtoObj: sinon.stub() + }; + + const mockMessage: IDecodedMessage = { + version: 1, + pubsubTopic: "/waku/2/default-waku/proto", + contentTopic: "/test/1/test/proto", + payload: new Uint8Array([1, 2, 3]), + timestamp: new Date(), + rateLimitProof: undefined, + ephemeral: undefined, + meta: undefined + }; + + it("should successfully query store with valid decoders and options", async () => { + const mockMessages = [Promise.resolve(mockMessage)]; + const mockResponseGenerator = (async function* () { + yield mockMessages; + })(); + + mockPeerManager.getPeers.resolves([mockPeerId]); + mockStoreCore.queryPerPage.returns(mockResponseGenerator); + + const generator = store.queryGenerator([mockDecoder]); + const results = []; + + for await (const messages of generator) { + results.push(messages); + } + + expect( + mockPeerManager.getPeers.calledWith({ + protocol: Protocols.Store, + pubsubTopic: "/waku/2/default-waku/proto" + }) + ).to.be.true; + + expect(mockStoreCore.queryPerPage.called).to.be.true; + + expect(results).to.have.length(1); + expect(results[0]).to.equal(mockMessages); + }); + + it("should throw error when no peers are available", async () => { + mockPeerManager.getPeers.resolves([]); + + const generator = store.queryGenerator([mockDecoder]); + + try { + for await (const _ of generator) { + // This should not be reached + } + expect.fail("Should have thrown an error"); + } catch (error) { + expect(error).to.be.instanceOf(Error); + expect((error as Error).message).to.equal( + "No peers available to query" + ); + } + }); + + it("should handle multiple query options for time ranges", async () => { + const timeStart = new Date("2023-01-01T00:00:00Z"); + const timeEnd = new Date("2023-01-03T00:00:01Z"); // 48 hours + 1ms later + + const mockMessages1 = [Promise.resolve(mockMessage)]; + const mockMessages2 = [Promise.resolve(mockMessage)]; + + const mockResponseGenerator1 = (async function* () { + yield mockMessages1; + })(); + + const mockResponseGenerator2 = (async function* () { + yield mockMessages2; + })(); + + mockPeerManager.getPeers.resolves([mockPeerId]); + mockStoreCore.queryPerPage + .onFirstCall() + .returns(mockResponseGenerator1) + .onSecondCall() + .returns(mockResponseGenerator2); + + const generator = store.queryGenerator([mockDecoder], { + timeStart, + timeEnd + }); + + const results = []; + for await (const messages of generator) { + results.push(messages); + } + + expect(mockStoreCore.queryPerPage.callCount).to.equal(2); + expect(results).to.have.length(2); + }); + + it("should chunk queries when time window exceeds maxTimeLimit", async () => { + // Create a time range that's 3x the maxTimeLimit (72 hours) + const timeStart = new Date("2023-01-01T00:00:00Z"); + const timeEnd = new Date("2023-01-04T00:00:01Z"); // 72 hours + 1ms later + + const maxTimeLimit = 24 * 60 * 60 * 1000; // 24 hours in ms + + // Should create 3 chunks: [0-24h], [24h-48h], [48h-72h+1ms] + const expectedChunks = 3; + + const mockMessages1 = [Promise.resolve(mockMessage)]; + const mockMessages2 = [Promise.resolve(mockMessage)]; + const mockMessages3 = [Promise.resolve(mockMessage)]; + + const mockResponseGenerator1 = (async function* () { + yield mockMessages1; + })(); + + const mockResponseGenerator2 = (async function* () { + yield mockMessages2; + })(); + + const mockResponseGenerator3 = (async function* () { + yield mockMessages3; + })(); + + mockPeerManager.getPeers.resolves([mockPeerId]); + mockStoreCore.queryPerPage + .onFirstCall() + .returns(mockResponseGenerator1) + .onSecondCall() + .returns(mockResponseGenerator2) + .onThirdCall() + .returns(mockResponseGenerator3); + + const generator = store.queryGenerator([mockDecoder], { + timeStart, + timeEnd + }); + + const results = []; + for await (const messages of generator) { + results.push(messages); + } + + expect(mockStoreCore.queryPerPage.callCount).to.equal(expectedChunks); + expect(results).to.have.length(expectedChunks); + + // Verify that each call was made with the correct time ranges + const calls = mockStoreCore.queryPerPage.getCalls(); + + // First chunk: timeStart to timeStart + maxTimeLimit + const firstCallArgs = calls[0].args[0] as any; + expect(firstCallArgs.timeStart).to.deep.equal(timeStart); + expect(firstCallArgs.timeEnd.getTime()).to.equal( + timeStart.getTime() + maxTimeLimit + ); + + // Second chunk: timeStart + maxTimeLimit to timeStart + 2*maxTimeLimit + const secondCallArgs = calls[1].args[0] as any; + expect(secondCallArgs.timeStart.getTime()).to.equal( + timeStart.getTime() + maxTimeLimit + ); + expect(secondCallArgs.timeEnd.getTime()).to.equal( + timeStart.getTime() + 2 * maxTimeLimit + ); + + // Third chunk: timeStart + 2*maxTimeLimit to timeEnd + const thirdCallArgs = calls[2].args[0] as any; + expect(thirdCallArgs.timeStart.getTime()).to.equal( + timeStart.getTime() + 2 * maxTimeLimit + ); + + // The third chunk should end at timeStart + 3*maxTimeLimit, not timeEnd + expect(thirdCallArgs.timeEnd.getTime()).to.equal( + timeStart.getTime() + 3 * maxTimeLimit + ); + }); + + it("should handle hash queries without validation", async () => { + const mockMessages = [Promise.resolve(mockMessage)]; + const mockResponseGenerator = (async function* () { + yield mockMessages; + })(); + + mockPeerManager.getPeers.resolves([mockPeerId]); + mockStoreCore.queryPerPage.returns(mockResponseGenerator); + + const generator = store.queryGenerator([mockDecoder], { + messageHashes: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])], + pubsubTopic: "/custom/topic" + }); + + const results = []; + for await (const messages of generator) { + results.push(messages); + } + + expect(mockStoreCore.queryPerPage.called).to.be.true; + + expect(results).to.have.length(1); + }); + + it("should use configured peers when available", async () => { + const configuredPeers = ["/ip4/127.0.0.1/tcp/30303/p2p/QmConfiguredPeer"]; + + store = new Store({ + libp2p: mockLibp2p, + peerManager: mockPeerManager, + options: { peers: configuredPeers } + }); + + const mockMessages = [Promise.resolve(mockMessage)]; + const mockResponseGenerator = (async function* () { + yield mockMessages; + })(); + + mockPeerManager.getPeers.resolves([mockPeerId]); + mockStoreCore.queryPerPage.returns(mockResponseGenerator); + + const generator = store.queryGenerator([mockDecoder]); + + for await (const _ of generator) { + // Just consume the generator + } + + expect(mockPeerManager.getPeers.called).to.be.true; + }); + }); +}); diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index 126cab8678..1297060cf2 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -16,7 +16,7 @@ import { isDefined, Logger } from "@waku/utils"; import { PeerManager } from "../peer_manager/index.js"; -const log = new Logger("waku:store:sdk"); +const log = new Logger("store-sdk"); type StoreConstructorParams = { libp2p: Libp2p; @@ -59,55 +59,30 @@ export class Store implements IStore { decoders: IDecoder[], options?: Partial ): AsyncGenerator[]> { - // For message hash queries, don't validate decoders but still need decodersAsMap - const isHashQuery = - options?.messageHashes && options.messageHashes.length > 0; - - let pubsubTopic: string; - let contentTopics: string[]; - let decodersAsMap: Map>; - - if (isHashQuery) { - // For hash queries, we still need decoders to decode messages - // but we don't validate pubsubTopic consistency - // Use pubsubTopic from options if provided, otherwise from first decoder - pubsubTopic = options.pubsubTopic || decoders[0]?.pubsubTopic || ""; - contentTopics = []; - decodersAsMap = new Map(); - decoders.forEach((dec) => { - decodersAsMap.set(dec.contentTopic, dec); - }); - } else { - const validated = this.validateDecodersAndPubsubTopic(decoders); - pubsubTopic = validated.pubsubTopic; - contentTopics = validated.contentTopics; - decodersAsMap = validated.decodersAsMap; - } - - const queryOpts: QueryRequestParams = { - pubsubTopic, - contentTopics, - includeData: true, - paginationForward: true, - ...options - }; - - const peer = await this.getPeerToUse(pubsubTopic); - - if (!peer) { - log.error("No peers available to query"); - throw new Error("No peers available to query"); - } - - log.info(`Querying store with options: ${JSON.stringify(options)}`); - const responseGenerator = this.protocol.queryPerPage( - queryOpts, - decodersAsMap, - peer + const { decodersAsMap, queryOptions } = this.buildQueryParams( + decoders, + options ); - for await (const messages of responseGenerator) { - yield messages; + for (const queryOption of queryOptions) { + const peer = await this.getPeerToUse(queryOption.pubsubTopic); + + if (!peer) { + log.error("No peers available to query"); + throw new Error("No peers available to query"); + } + + log.info(`Querying store with options: ${JSON.stringify(queryOption)}`); + + const responseGenerator = this.protocol.queryPerPage( + queryOption, + decodersAsMap, + peer + ); + + for await (const messages of responseGenerator) { + yield messages; + } } } @@ -310,4 +285,84 @@ export class Store implements IStore { return peerIds[0]; } + + private buildQueryParams( + decoders: IDecoder[], + options?: Partial + ): { + decodersAsMap: Map>; + queryOptions: QueryRequestParams[]; + } { + // For message hash queries, don't validate decoders but still need decodersAsMap + const isHashQuery = + options?.messageHashes && options.messageHashes.length > 0; + + let pubsubTopic: string; + let contentTopics: string[]; + let decodersAsMap: Map>; + + if (isHashQuery) { + // For hash queries, we still need decoders to decode messages + // but we don't validate pubsubTopic consistency + // Use pubsubTopic from options if provided, otherwise from first decoder + pubsubTopic = options.pubsubTopic || decoders[0]?.pubsubTopic || ""; + contentTopics = []; + decodersAsMap = new Map(); + decoders.forEach((dec) => { + decodersAsMap.set(dec.contentTopic, dec); + }); + } else { + const validated = this.validateDecodersAndPubsubTopic(decoders); + pubsubTopic = validated.pubsubTopic; + contentTopics = validated.contentTopics; + decodersAsMap = validated.decodersAsMap; + } + + const subTimeRanges: [Date, Date][] = []; + if (options?.timeStart && options?.timeEnd) { + let start = options.timeStart; + const end = options.timeEnd; + while (end.getTime() - start.getTime() > this.protocol.maxTimeLimit) { + const subEnd = new Date(start.getTime() + this.protocol.maxTimeLimit); + subTimeRanges.push([start, subEnd]); + start = subEnd; + } + + if (subTimeRanges.length === 0) { + log.info("Using single time range"); + subTimeRanges.push([start, end]); + } + } + + if (subTimeRanges.length === 0) { + log.info("No sub time ranges"); + return { + decodersAsMap, + queryOptions: [ + { + pubsubTopic, + contentTopics, + includeData: true, + paginationForward: true, + ...options + } + ] + }; + } + + log.info(`Building ${subTimeRanges.length} sub time ranges`); + + return { + decodersAsMap, + queryOptions: subTimeRanges.map(([start, end]) => ({ + pubsubTopic, + contentTopics, + includeData: true, + paginationForward: true, + ...options, + timeStart: start, + timeEnd: end + })) + }; + } } diff --git a/packages/tests/tests/store/time_filter.node.spec.ts b/packages/tests/tests/store/time_filter.node.spec.ts index 8addf95f9b..e149a38614 100644 --- a/packages/tests/tests/store/time_filter.node.spec.ts +++ b/packages/tests/tests/store/time_filter.node.spec.ts @@ -35,7 +35,9 @@ describe("Waku Store, time filter", function () { [-19000, 0, 1000], [-19000, -1000, 0], [19000, -10, 10], // message in the future - [-19000, 10, -10] // startTime is newer than endTime + [-19000, 10, -10], // startTime is newer than endTime + [0, Date.now() - 3 * 24 * 60 * 60 * 1000, Date.now()], // range longer than 24 hours + [0, Date.now() - 24 * 60 * 60 * 1000, Date.now()] // range is 24 hours ].forEach(([msgTime, startTime, endTime]) => { it(`msgTime: ${msgTime} ms from now, startTime: ${ msgTime + startTime