feat: implement store query chunking (#2511)

* feat: implement store query chunking

* add unit tests for store core

* add Store SDK utils

* add e2e test
This commit is contained in:
Sasha 2025-07-19 01:37:28 +02:00 committed by GitHub
parent 7fc2895b6a
commit 36f6884d22
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
6 changed files with 644 additions and 49 deletions

View File

@ -6,6 +6,7 @@ import { v4 as uuid } from "uuid";
// https://github.com/waku-org/nwaku/blob/7205f95cff9f49ca0bb762e8fd0bf56a6a7f3b3b/waku/waku_store/common.nim#L12
export const DEFAULT_PAGE_SIZE = 20;
export const MAX_PAGE_SIZE = 100;
export const MAX_TIME_RANGE = 24 * 60 * 60 * 1000;
const ONE_MILLION = 1_000000;
export class StoreQueryRequest {

View File

@ -0,0 +1,230 @@
import type { PeerId } from "@libp2p/interface";
import {
IDecodedMessage,
IDecoder,
Libp2p,
QueryRequestParams
} from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
import { StreamManager } from "../stream_manager/index.js";
import {
MAX_PAGE_SIZE,
MAX_TIME_RANGE,
StoreQueryRequest,
StoreQueryResponse
} from "./rpc.js";
import { StoreCore } from "./store.js";
describe("StoreCore", () => {
let libp2p: Libp2p;
let storeCore: StoreCore;
let mockStreamManager: sinon.SinonStubbedInstance<StreamManager>;
let mockPeerId: PeerId;
let mockStream: any;
let mockDecoder: sinon.SinonStubbedInstance<IDecoder<IDecodedMessage>>;
let decoders: Map<string, IDecoder<IDecodedMessage>>;
const createMockPeerId = (id: string): PeerId =>
({
toString: () => id,
equals: (other: PeerId) => other.toString() === id
}) as PeerId;
beforeEach(() => {
libp2p = {
components: {
events: {
addEventListener: sinon.stub(),
removeEventListener: sinon.stub()
},
connectionManager: {
getConnections: sinon.stub().returns([])
}
}
} as unknown as Libp2p;
mockStreamManager = {
getStream: sinon.stub()
} as unknown as sinon.SinonStubbedInstance<StreamManager>;
mockPeerId = createMockPeerId("12D3KooWTest1");
mockStream = {
sink: sinon.stub(),
source: []
};
mockDecoder = {
fromProtoObj: sinon.stub()
} as unknown as sinon.SinonStubbedInstance<IDecoder<IDecodedMessage>>;
decoders = new Map([["test-topic", mockDecoder]]);
sinon
.stub(StreamManager.prototype, "getStream")
.callsFake(mockStreamManager.getStream);
storeCore = new StoreCore(libp2p);
});
afterEach(() => {
sinon.restore();
});
describe("queryPerPage", () => {
let queryOpts: QueryRequestParams;
let mockStoreQueryRequest: any;
let mockStoreQueryResponse: any;
beforeEach(() => {
queryOpts = {
pubsubTopic: "test-topic",
contentTopics: ["test-topic"],
paginationLimit: 10,
includeData: true,
paginationForward: true
};
mockStoreQueryRequest = {
encode: sinon.stub().returns(new Uint8Array([1, 2, 3]))
};
mockStoreQueryResponse = {
statusCode: 200,
statusDesc: "OK",
messages: [
{
messageHash: new Uint8Array([1]),
message: {
contentTopic: "test-topic"
},
pubsubTopic: "test-topic"
}
]
};
sinon.stub(StoreQueryRequest, "create").returns(mockStoreQueryRequest);
sinon.stub(StoreQueryResponse, "decode").returns(mockStoreQueryResponse);
});
it("throws if time range exceeds MAX_TIME_RANGE", async () => {
queryOpts.timeStart = new Date();
queryOpts.timeEnd = new Date(
queryOpts.timeStart.getTime() + MAX_TIME_RANGE + 1000
);
const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId);
try {
await generator.next();
expect.fail("Should have thrown an error");
} catch (error) {
expect((error as Error).message).to.equal("Time range bigger than 24h");
}
});
it("throws if decoders don't match content topics", async () => {
const differentDecoders = new Map([["different-topic", mockDecoder]]);
const generator = storeCore.queryPerPage(
queryOpts,
differentDecoders,
mockPeerId
);
try {
await generator.next();
expect.fail("Should have thrown an error");
} catch (error) {
expect((error as Error).message).to.equal(
"Internal error, the decoders should match the query's content topics"
);
}
});
it("does not validate decoders for hash queries", async () => {
queryOpts.messageHashes = [new Uint8Array([1, 2, 3])];
queryOpts.contentTopics = [];
const differentDecoders = new Map([["different-topic", mockDecoder]]);
mockStreamManager.getStream.resolves(mockStream);
const generator = storeCore.queryPerPage(
queryOpts,
differentDecoders,
mockPeerId
);
const result = await generator.next();
expect(result.done).to.be.false;
});
it("ends if stream creation fails", async () => {
mockStreamManager.getStream.rejects(new Error("Stream creation failed"));
const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId);
const result = await generator.next();
expect(result.done).to.be.true;
});
it("throws if store query response has error status", async () => {
mockStoreQueryResponse.statusCode = 400;
mockStoreQueryResponse.statusDesc = "Bad Request";
mockStreamManager.getStream.resolves(mockStream);
const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId);
try {
await generator.next();
expect.fail("Should have thrown an error");
} catch (error) {
expect((error as Error).message).to.equal(
"Store query failed with status code: 400, description: Bad Request"
);
}
});
it("ends if response has no messages", async () => {
mockStoreQueryResponse.messages = [];
mockStreamManager.getStream.resolves(mockStream);
const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId);
const result = await generator.next();
expect(result.done).to.be.true;
});
it("yields decoded messages", async () => {
const mockDecodedMessage = {
contentTopic: "test-topic"
} as IDecodedMessage;
mockDecoder.fromProtoObj.resolves(mockDecodedMessage);
mockStreamManager.getStream.resolves(mockStream);
const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId);
const result = await generator.next();
const decodedMessage = await result.value[0];
expect(decodedMessage).to.equal(mockDecodedMessage);
});
it("yields undefined for messages without content topic", async () => {
mockStoreQueryResponse.messages[0].message.contentTopic = undefined;
mockStreamManager.getStream.resolves(mockStream);
const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId);
const result = await generator.next();
const decodedMessage = await result.value[0];
expect(decodedMessage).to.be.undefined;
});
it("yields undefined for messages without decoder", async () => {
mockStoreQueryResponse.messages[0].message.contentTopic = "unknown-topic";
mockStreamManager.getStream.resolves(mockStream);
const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId);
const result = await generator.next();
const decodedMessage = await result.value[0];
expect(decodedMessage).to.be.undefined;
});
it("ends after yielding if response size indicates end", async () => {
queryOpts.paginationLimit = MAX_PAGE_SIZE + 10;
mockStoreQueryResponse.messages = new Array(MAX_PAGE_SIZE + 1).fill({
messageHash: new Uint8Array([1]),
message: { contentTopic: "test-topic" }
});
mockStreamManager.getStream.resolves(mockStream);
const generator = storeCore.queryPerPage(queryOpts, decoders, mockPeerId);
await generator.next();
const second = await generator.next();
expect(second.done).to.be.true;
});
});
});

View File

@ -17,6 +17,7 @@ import { toProtoMessage } from "../to_proto_message.js";
import {
DEFAULT_PAGE_SIZE,
MAX_PAGE_SIZE,
MAX_TIME_RANGE,
StoreQueryRequest,
StoreQueryResponse
} from "./rpc.js";
@ -34,11 +35,23 @@ export class StoreCore {
this.streamManager = new StreamManager(StoreCodec, libp2p.components);
}
public get maxTimeLimit(): number {
return MAX_TIME_RANGE;
}
public async *queryPerPage<T extends IDecodedMessage>(
queryOpts: QueryRequestParams,
decoders: Map<string, IDecoder<T>>,
peerId: PeerId
): AsyncGenerator<Promise<T | undefined>[]> {
if (queryOpts.timeStart && queryOpts.timeEnd) {
const timeDiff =
queryOpts.timeEnd.getTime() - queryOpts.timeStart.getTime();
if (timeDiff > MAX_TIME_RANGE) {
throw new Error("Time range bigger than 24h");
}
}
// Only validate decoder content topics for content-filtered queries
const isHashQuery =
queryOpts.messageHashes && queryOpts.messageHashes.length > 0;

View File

@ -0,0 +1,294 @@
import { StoreCore } from "@waku/core";
import type { IDecodedMessage, IDecoder, Libp2p } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
import { PeerManager } from "../peer_manager/index.js";
import { Store } from "./store.js";
describe("Store", () => {
let store: Store;
let mockLibp2p: Libp2p;
let mockPeerManager: sinon.SinonStubbedInstance<PeerManager>;
let mockStoreCore: sinon.SinonStubbedInstance<StoreCore>;
let mockPeerId: any;
beforeEach(() => {
mockPeerId = {
toString: () => "QmTestPeerId"
};
mockStoreCore = {
multicodec: "test-multicodec",
maxTimeLimit: 24 * 60 * 60 * 1000, // 24 hours
queryPerPage: sinon.stub()
} as unknown as sinon.SinonStubbedInstance<StoreCore>;
mockLibp2p = {
dial: sinon.stub(),
components: {
events: {
addEventListener: sinon.stub(),
removeEventListener: sinon.stub()
}
}
} as unknown as Libp2p;
mockPeerManager = {
getPeers: sinon.stub()
} as unknown as sinon.SinonStubbedInstance<PeerManager>;
// Stub the StoreCore methods
sinon
.stub(StoreCore.prototype, "queryPerPage")
.callsFake(mockStoreCore.queryPerPage);
// Stub the maxTimeLimit getter
sinon
.stub(StoreCore.prototype, "maxTimeLimit")
.get(() => 24 * 60 * 60 * 1000);
store = new Store({
libp2p: mockLibp2p,
peerManager: mockPeerManager
});
});
afterEach(() => {
sinon.restore();
});
describe("queryGenerator", () => {
const mockDecoder: IDecoder<IDecodedMessage> = {
pubsubTopic: "/waku/2/default-waku/proto",
contentTopic: "/test/1/test/proto",
fromWireToProtoObj: sinon.stub(),
fromProtoObj: sinon.stub()
};
const mockMessage: IDecodedMessage = {
version: 1,
pubsubTopic: "/waku/2/default-waku/proto",
contentTopic: "/test/1/test/proto",
payload: new Uint8Array([1, 2, 3]),
timestamp: new Date(),
rateLimitProof: undefined,
ephemeral: undefined,
meta: undefined
};
it("should successfully query store with valid decoders and options", async () => {
const mockMessages = [Promise.resolve(mockMessage)];
const mockResponseGenerator = (async function* () {
yield mockMessages;
})();
mockPeerManager.getPeers.resolves([mockPeerId]);
mockStoreCore.queryPerPage.returns(mockResponseGenerator);
const generator = store.queryGenerator([mockDecoder]);
const results = [];
for await (const messages of generator) {
results.push(messages);
}
expect(
mockPeerManager.getPeers.calledWith({
protocol: Protocols.Store,
pubsubTopic: "/waku/2/default-waku/proto"
})
).to.be.true;
expect(mockStoreCore.queryPerPage.called).to.be.true;
expect(results).to.have.length(1);
expect(results[0]).to.equal(mockMessages);
});
it("should throw error when no peers are available", async () => {
mockPeerManager.getPeers.resolves([]);
const generator = store.queryGenerator([mockDecoder]);
try {
for await (const _ of generator) {
// This should not be reached
}
expect.fail("Should have thrown an error");
} catch (error) {
expect(error).to.be.instanceOf(Error);
expect((error as Error).message).to.equal(
"No peers available to query"
);
}
});
it("should handle multiple query options for time ranges", async () => {
const timeStart = new Date("2023-01-01T00:00:00Z");
const timeEnd = new Date("2023-01-03T00:00:01Z"); // 48 hours + 1ms later
const mockMessages1 = [Promise.resolve(mockMessage)];
const mockMessages2 = [Promise.resolve(mockMessage)];
const mockResponseGenerator1 = (async function* () {
yield mockMessages1;
})();
const mockResponseGenerator2 = (async function* () {
yield mockMessages2;
})();
mockPeerManager.getPeers.resolves([mockPeerId]);
mockStoreCore.queryPerPage
.onFirstCall()
.returns(mockResponseGenerator1)
.onSecondCall()
.returns(mockResponseGenerator2);
const generator = store.queryGenerator([mockDecoder], {
timeStart,
timeEnd
});
const results = [];
for await (const messages of generator) {
results.push(messages);
}
expect(mockStoreCore.queryPerPage.callCount).to.equal(2);
expect(results).to.have.length(2);
});
it("should chunk queries when time window exceeds maxTimeLimit", async () => {
// Create a time range that's 3x the maxTimeLimit (72 hours)
const timeStart = new Date("2023-01-01T00:00:00Z");
const timeEnd = new Date("2023-01-04T00:00:01Z"); // 72 hours + 1ms later
const maxTimeLimit = 24 * 60 * 60 * 1000; // 24 hours in ms
// Should create 3 chunks: [0-24h], [24h-48h], [48h-72h+1ms]
const expectedChunks = 3;
const mockMessages1 = [Promise.resolve(mockMessage)];
const mockMessages2 = [Promise.resolve(mockMessage)];
const mockMessages3 = [Promise.resolve(mockMessage)];
const mockResponseGenerator1 = (async function* () {
yield mockMessages1;
})();
const mockResponseGenerator2 = (async function* () {
yield mockMessages2;
})();
const mockResponseGenerator3 = (async function* () {
yield mockMessages3;
})();
mockPeerManager.getPeers.resolves([mockPeerId]);
mockStoreCore.queryPerPage
.onFirstCall()
.returns(mockResponseGenerator1)
.onSecondCall()
.returns(mockResponseGenerator2)
.onThirdCall()
.returns(mockResponseGenerator3);
const generator = store.queryGenerator([mockDecoder], {
timeStart,
timeEnd
});
const results = [];
for await (const messages of generator) {
results.push(messages);
}
expect(mockStoreCore.queryPerPage.callCount).to.equal(expectedChunks);
expect(results).to.have.length(expectedChunks);
// Verify that each call was made with the correct time ranges
const calls = mockStoreCore.queryPerPage.getCalls();
// First chunk: timeStart to timeStart + maxTimeLimit
const firstCallArgs = calls[0].args[0] as any;
expect(firstCallArgs.timeStart).to.deep.equal(timeStart);
expect(firstCallArgs.timeEnd.getTime()).to.equal(
timeStart.getTime() + maxTimeLimit
);
// Second chunk: timeStart + maxTimeLimit to timeStart + 2*maxTimeLimit
const secondCallArgs = calls[1].args[0] as any;
expect(secondCallArgs.timeStart.getTime()).to.equal(
timeStart.getTime() + maxTimeLimit
);
expect(secondCallArgs.timeEnd.getTime()).to.equal(
timeStart.getTime() + 2 * maxTimeLimit
);
// Third chunk: timeStart + 2*maxTimeLimit to timeEnd
const thirdCallArgs = calls[2].args[0] as any;
expect(thirdCallArgs.timeStart.getTime()).to.equal(
timeStart.getTime() + 2 * maxTimeLimit
);
// The third chunk should end at timeStart + 3*maxTimeLimit, not timeEnd
expect(thirdCallArgs.timeEnd.getTime()).to.equal(
timeStart.getTime() + 3 * maxTimeLimit
);
});
it("should handle hash queries without validation", async () => {
const mockMessages = [Promise.resolve(mockMessage)];
const mockResponseGenerator = (async function* () {
yield mockMessages;
})();
mockPeerManager.getPeers.resolves([mockPeerId]);
mockStoreCore.queryPerPage.returns(mockResponseGenerator);
const generator = store.queryGenerator([mockDecoder], {
messageHashes: [new Uint8Array([1, 2, 3]), new Uint8Array([4, 5, 6])],
pubsubTopic: "/custom/topic"
});
const results = [];
for await (const messages of generator) {
results.push(messages);
}
expect(mockStoreCore.queryPerPage.called).to.be.true;
expect(results).to.have.length(1);
});
it("should use configured peers when available", async () => {
const configuredPeers = ["/ip4/127.0.0.1/tcp/30303/p2p/QmConfiguredPeer"];
store = new Store({
libp2p: mockLibp2p,
peerManager: mockPeerManager,
options: { peers: configuredPeers }
});
const mockMessages = [Promise.resolve(mockMessage)];
const mockResponseGenerator = (async function* () {
yield mockMessages;
})();
mockPeerManager.getPeers.resolves([mockPeerId]);
mockStoreCore.queryPerPage.returns(mockResponseGenerator);
const generator = store.queryGenerator([mockDecoder]);
for await (const _ of generator) {
// Just consume the generator
}
expect(mockPeerManager.getPeers.called).to.be.true;
});
});
});

View File

@ -16,7 +16,7 @@ import { isDefined, Logger } from "@waku/utils";
import { PeerManager } from "../peer_manager/index.js";
const log = new Logger("waku:store:sdk");
const log = new Logger("store-sdk");
type StoreConstructorParams = {
libp2p: Libp2p;
@ -59,55 +59,30 @@ export class Store implements IStore {
decoders: IDecoder<T>[],
options?: Partial<QueryRequestParams>
): AsyncGenerator<Promise<T | undefined>[]> {
// For message hash queries, don't validate decoders but still need decodersAsMap
const isHashQuery =
options?.messageHashes && options.messageHashes.length > 0;
let pubsubTopic: string;
let contentTopics: string[];
let decodersAsMap: Map<string, IDecoder<T>>;
if (isHashQuery) {
// For hash queries, we still need decoders to decode messages
// but we don't validate pubsubTopic consistency
// Use pubsubTopic from options if provided, otherwise from first decoder
pubsubTopic = options.pubsubTopic || decoders[0]?.pubsubTopic || "";
contentTopics = [];
decodersAsMap = new Map();
decoders.forEach((dec) => {
decodersAsMap.set(dec.contentTopic, dec);
});
} else {
const validated = this.validateDecodersAndPubsubTopic(decoders);
pubsubTopic = validated.pubsubTopic;
contentTopics = validated.contentTopics;
decodersAsMap = validated.decodersAsMap;
}
const queryOpts: QueryRequestParams = {
pubsubTopic,
contentTopics,
includeData: true,
paginationForward: true,
...options
};
const peer = await this.getPeerToUse(pubsubTopic);
if (!peer) {
log.error("No peers available to query");
throw new Error("No peers available to query");
}
log.info(`Querying store with options: ${JSON.stringify(options)}`);
const responseGenerator = this.protocol.queryPerPage(
queryOpts,
decodersAsMap,
peer
const { decodersAsMap, queryOptions } = this.buildQueryParams(
decoders,
options
);
for await (const messages of responseGenerator) {
yield messages;
for (const queryOption of queryOptions) {
const peer = await this.getPeerToUse(queryOption.pubsubTopic);
if (!peer) {
log.error("No peers available to query");
throw new Error("No peers available to query");
}
log.info(`Querying store with options: ${JSON.stringify(queryOption)}`);
const responseGenerator = this.protocol.queryPerPage(
queryOption,
decodersAsMap,
peer
);
for await (const messages of responseGenerator) {
yield messages;
}
}
}
@ -310,4 +285,84 @@ export class Store implements IStore {
return peerIds[0];
}
private buildQueryParams<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: Partial<QueryRequestParams>
): {
decodersAsMap: Map<string, IDecoder<T>>;
queryOptions: QueryRequestParams[];
} {
// For message hash queries, don't validate decoders but still need decodersAsMap
const isHashQuery =
options?.messageHashes && options.messageHashes.length > 0;
let pubsubTopic: string;
let contentTopics: string[];
let decodersAsMap: Map<string, IDecoder<T>>;
if (isHashQuery) {
// For hash queries, we still need decoders to decode messages
// but we don't validate pubsubTopic consistency
// Use pubsubTopic from options if provided, otherwise from first decoder
pubsubTopic = options.pubsubTopic || decoders[0]?.pubsubTopic || "";
contentTopics = [];
decodersAsMap = new Map();
decoders.forEach((dec) => {
decodersAsMap.set(dec.contentTopic, dec);
});
} else {
const validated = this.validateDecodersAndPubsubTopic(decoders);
pubsubTopic = validated.pubsubTopic;
contentTopics = validated.contentTopics;
decodersAsMap = validated.decodersAsMap;
}
const subTimeRanges: [Date, Date][] = [];
if (options?.timeStart && options?.timeEnd) {
let start = options.timeStart;
const end = options.timeEnd;
while (end.getTime() - start.getTime() > this.protocol.maxTimeLimit) {
const subEnd = new Date(start.getTime() + this.protocol.maxTimeLimit);
subTimeRanges.push([start, subEnd]);
start = subEnd;
}
if (subTimeRanges.length === 0) {
log.info("Using single time range");
subTimeRanges.push([start, end]);
}
}
if (subTimeRanges.length === 0) {
log.info("No sub time ranges");
return {
decodersAsMap,
queryOptions: [
{
pubsubTopic,
contentTopics,
includeData: true,
paginationForward: true,
...options
}
]
};
}
log.info(`Building ${subTimeRanges.length} sub time ranges`);
return {
decodersAsMap,
queryOptions: subTimeRanges.map(([start, end]) => ({
pubsubTopic,
contentTopics,
includeData: true,
paginationForward: true,
...options,
timeStart: start,
timeEnd: end
}))
};
}
}

View File

@ -35,7 +35,9 @@ describe("Waku Store, time filter", function () {
[-19000, 0, 1000],
[-19000, -1000, 0],
[19000, -10, 10], // message in the future
[-19000, 10, -10] // startTime is newer than endTime
[-19000, 10, -10], // startTime is newer than endTime
[0, Date.now() - 3 * 24 * 60 * 60 * 1000, Date.now()], // range longer than 24 hours
[0, Date.now() - 24 * 60 * 60 * 1000, Date.now()] // range is 24 hours
].forEach(([msgTime, startTime, endTime]) => {
it(`msgTime: ${msgTime} ms from now, startTime: ${
msgTime + startTime