fix(store): update store query validation logic to support msg hash queries

This commit is contained in:
Arseniy Klempner 2025-05-30 11:44:03 -07:00
parent 1905558753
commit 9f7a15dfb1
No known key found for this signature in database
GPG Key ID: 51653F18863BD24B
8 changed files with 275 additions and 33 deletions

1
package-lock.json generated
View File

@ -44574,6 +44574,7 @@
"@waku/core": "*",
"@waku/enr": "*",
"@waku/interfaces": "*",
"@waku/message-hash": "^0.1.17",
"@waku/utils": "*",
"app-root-path": "^3.1.0",
"chai-as-promised": "^7.1.1",

View File

@ -0,0 +1,93 @@
import { expect } from "chai";
import { StoreQueryRequest } from "./rpc.js";
describe("StoreQueryRequest validation", () => {
it("accepts valid content-filtered query", () => {
const request = StoreQueryRequest.create({
pubsubTopic: "/waku/2/default-waku/proto",
contentTopics: ["/test/1/content/proto"],
includeData: true,
paginationForward: true
});
expect(request).to.exist;
});
it("rejects content-filtered query with only pubsubTopic", () => {
expect(() =>
StoreQueryRequest.create({
pubsubTopic: "/waku/2/default-waku/proto",
contentTopics: [], // Empty array
includeData: true,
paginationForward: true
})
).to.throw(
"Both pubsubTopic and contentTopics must be set together for content-filtered queries"
);
});
it("rejects content-filtered query with only contentTopics", () => {
expect(() =>
StoreQueryRequest.create({
pubsubTopic: "", // Empty string
contentTopics: ["/test/1/content/proto"],
includeData: true,
paginationForward: true
})
).to.throw(
"Both pubsubTopic and contentTopics must be set together for content-filtered queries"
);
});
it("accepts valid message hash query", () => {
const request = StoreQueryRequest.create({
pubsubTopic: "", // Required but ignored for hash queries
contentTopics: [], // Required but ignored for hash queries
messageHashes: [new Uint8Array([1, 2, 3, 4])],
includeData: true,
paginationForward: true
});
expect(request).to.exist;
});
it("rejects hash query with content filter parameters", () => {
expect(() =>
StoreQueryRequest.create({
messageHashes: [new Uint8Array([1, 2, 3, 4])],
pubsubTopic: "/waku/2/default-waku/proto",
contentTopics: ["/test/1/content/proto"],
includeData: true,
paginationForward: true
})
).to.throw(
"Message hash lookup queries cannot include content filter criteria"
);
});
it("rejects hash query with time filter", () => {
expect(() =>
StoreQueryRequest.create({
pubsubTopic: "",
contentTopics: [],
messageHashes: [new Uint8Array([1, 2, 3, 4])],
timeStart: new Date(),
includeData: true,
paginationForward: true
})
).to.throw(
"Message hash lookup queries cannot include content filter criteria"
);
});
it("accepts time-filtered query with content filter", () => {
const request = StoreQueryRequest.create({
pubsubTopic: "/waku/2/default-waku/proto",
contentTopics: ["/test/1/content/proto"],
timeStart: new Date(Date.now() - 3600000),
timeEnd: new Date(),
includeData: true,
paginationForward: true
});
expect(request).to.exist;
});
});

View File

@ -14,6 +14,7 @@ export class StoreQueryRequest {
public static create(params: QueryRequestParams): StoreQueryRequest {
const request = new StoreQueryRequest({
...params,
contentTopics: params.contentTopics || [],
requestId: uuid(),
timeStart: params.timeStart
? BigInt(params.timeStart.getTime() * ONE_MILLION)
@ -28,25 +29,32 @@ export class StoreQueryRequest {
});
// Validate request parameters based on RFC
if (
(params.pubsubTopic && !params.contentTopics) ||
(!params.pubsubTopic && params.contentTopics)
) {
throw new Error(
"Both pubsubTopic and contentTopics must be set or unset"
);
}
const isHashQuery = params.messageHashes && params.messageHashes.length > 0;
const hasContentTopics =
params.contentTopics && params.contentTopics.length > 0;
const hasTimeFilter = params.timeStart || params.timeEnd;
if (
params.messageHashes &&
(params.pubsubTopic ||
params.contentTopics ||
params.timeStart ||
params.timeEnd)
) {
throw new Error(
"Message hash lookup queries cannot include content filter criteria"
);
if (isHashQuery) {
// Message hash lookup queries cannot include content topics or time filters
// but pubsubTopic is allowed/required
if (hasContentTopics || hasTimeFilter) {
throw new Error(
"Message hash lookup queries cannot include content filter criteria (contentTopics, timeStart, or timeEnd)"
);
}
} else {
// Content-filtered queries require both pubsubTopic and contentTopics to be set together
if (
(params.pubsubTopic &&
(!params.contentTopics || params.contentTopics.length === 0)) ||
(!params.pubsubTopic &&
params.contentTopics &&
params.contentTopics.length > 0)
) {
throw new Error(
"Both pubsubTopic and contentTopics must be set together for content-filtered queries"
);
}
}
return request;

View File

@ -40,9 +40,14 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
decoders: Map<string, IDecoder<T>>,
peerId: PeerId
): AsyncGenerator<Promise<T | undefined>[]> {
// Only validate decoder content topics for content-filtered queries
const isHashQuery =
queryOpts.messageHashes && queryOpts.messageHashes.length > 0;
if (
!isHashQuery &&
queryOpts.contentTopics &&
queryOpts.contentTopics.toString() !==
Array.from(decoders.keys()).toString()
Array.from(decoders.keys()).toString()
) {
throw new Error(
"Internal error, the decoders should match the query's content topics"
@ -56,6 +61,13 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
paginationCursor: currentCursor
});
log.info("Sending store query request:", {
hasMessageHashes: !!queryOpts.messageHashes?.length,
messageHashCount: queryOpts.messageHashes?.length,
pubsubTopic: queryOpts.pubsubTopic,
contentTopics: queryOpts.contentTopics
});
let stream;
try {
stream = await this.getStream(peerId);

View File

@ -57,10 +57,32 @@ export class Store implements IStore {
decoders: IDecoder<T>[],
options?: Partial<QueryRequestParams>
): AsyncGenerator<Promise<T | undefined>[]> {
const { pubsubTopic, contentTopics, decodersAsMap } =
this.validateDecodersAndPubsubTopic(decoders);
// For message hash queries, don't validate decoders but still need decodersAsMap
const isHashQuery =
options?.messageHashes && options.messageHashes.length > 0;
const queryOpts = {
let pubsubTopic: string;
let contentTopics: string[];
let decodersAsMap: Map<string, IDecoder<T>>;
if (isHashQuery) {
// For hash queries, we still need decoders to decode messages
// but we don't validate pubsubTopic consistency
// Use pubsubTopic from options if provided, otherwise from first decoder
pubsubTopic = options.pubsubTopic || decoders[0]?.pubsubTopic || "";
contentTopics = [];
decodersAsMap = new Map();
decoders.forEach((dec) => {
decodersAsMap.set(dec.contentTopic, dec);
});
} else {
const validated = this.validateDecodersAndPubsubTopic(decoders);
pubsubTopic = validated.pubsubTopic;
contentTopics = validated.contentTopics;
decodersAsMap = validated.decodersAsMap;
}
const queryOpts: QueryRequestParams = {
pubsubTopic,
contentTopics,
includeData: true,

View File

@ -55,6 +55,7 @@
"@waku/core": "*",
"@waku/enr": "*",
"@waku/interfaces": "*",
"@waku/message-hash": "^0.1.17",
"@waku/utils": "*",
"app-root-path": "^3.1.0",
"chai-as-promised": "^7.1.1",

View File

@ -0,0 +1,104 @@
import { DecodedMessage } from "@waku/core";
import type { LightNode } from "@waku/interfaces";
import { messageHash } from "@waku/message-hash";
import { assert } from "chai";
import {
afterEachCustom,
beforeEachCustom,
ServiceNode,
tearDownNodes
} from "../../src/index.js";
import {
runStoreNodes,
sendMessages,
TestDecoder,
TestShardInfo,
totalMsgs
} from "./utils.js";
describe("Waku Store, message hash query", function () {
this.timeout(15000);
let waku: LightNode;
let nwaku: ServiceNode;
beforeEachCustom(this, async () => {
[nwaku, waku] = await runStoreNodes(this.ctx, TestShardInfo);
});
afterEachCustom(this, async () => {
await tearDownNodes(nwaku, [waku]);
});
it("can query messages normally", async function () {
await sendMessages(
nwaku,
totalMsgs,
TestDecoder.contentTopic,
TestDecoder.pubsubTopic,
true
);
const messages: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder])) {
for await (const msg of page) {
messages.push(msg as DecodedMessage);
}
}
assert.equal(messages.length, totalMsgs);
});
it("can query messages by message hash", async function () {
const sentMessages = await sendMessages(
nwaku,
totalMsgs,
TestDecoder.contentTopic,
TestDecoder.pubsubTopic,
true
);
const messageHashes = sentMessages.map((msg) =>
messageHash(TestDecoder.pubsubTopic, {
pubsubTopic: TestDecoder.pubsubTopic,
payload: Buffer.from(msg.payload, "base64"),
contentTopic: msg.contentTopic || TestDecoder.contentTopic,
timestamp: msg.timestamp
? new Date(Number(msg.timestamp / 1000000n))
: undefined,
meta: undefined,
rateLimitProof: undefined,
ephemeral: undefined
})
);
console.log("Sent messages:", sentMessages.length);
console.log("First message:", sentMessages[0]);
console.log("Message hashes:", messageHashes.length);
console.log("First hash:", messageHashes[0]);
const messages: DecodedMessage[] = [];
let pageCount = 0;
try {
for await (const page of waku.store.queryGenerator([TestDecoder], {
messageHashes,
pubsubTopic: TestDecoder.pubsubTopic
})) {
pageCount++;
console.log(`Page ${pageCount} received`);
for await (const msg of page) {
messages.push(msg as DecodedMessage);
}
}
} catch (error) {
console.error("Error during query:", error);
throw error;
}
console.log("Total pages:", pageCount);
console.log("Total messages received:", messages.length);
assert.equal(messages.length, messageHashes.length);
for (const msg of messages) {
assert.equal(msg.contentTopic, TestDecoder.contentTopic);
}
});
});

View File

@ -17,6 +17,7 @@ import { expect } from "chai";
import { Context } from "mocha";
import { delay, NOISE_KEY_1, runNodes, ServiceNode } from "../../src/index.js";
import { MessageRpcQuery } from "../../src/types.js";
export const log = new Logger("test:store");
@ -49,20 +50,20 @@ export async function sendMessages(
instance: ServiceNode,
numMessages: number,
contentTopic: string,
pubsubTopic: string
): Promise<void> {
pubsubTopic: string,
timestamp: boolean = false
): Promise<MessageRpcQuery[]> {
const messages: MessageRpcQuery[] = new Array<MessageRpcQuery>(numMessages);
for (let i = 0; i < numMessages; i++) {
expect(
await instance.sendMessage(
ServiceNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: contentTopic
}),
pubsubTopic
)
).to.eq(true);
messages[i] = ServiceNode.toMessageRpcQuery({
payload: new Uint8Array([i]),
contentTopic: contentTopic,
timestamp: timestamp ? new Date() : undefined
});
expect(await instance.sendMessage(messages[i], pubsubTopic)).to.eq(true);
await delay(1); // to ensure each timestamp is unique.
}
return messages;
}
export async function sendMessagesAutosharding(