mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-02 13:53:12 +00:00
Merge pull request #2397 from waku-org/fix/store-hash-query
fix(store): update store query validation logic to support msg hash q…
This commit is contained in:
commit
49f26d89a8
14
package-lock.json
generated
14
package-lock.json
generated
@ -11727,6 +11727,18 @@
|
||||
"resolved": "packages/message-encryption",
|
||||
"link": true
|
||||
},
|
||||
"node_modules/@waku/message-hash": {
|
||||
"version": "0.1.19",
|
||||
"resolved": "https://registry.npmjs.org/@waku/message-hash/-/message-hash-0.1.19.tgz",
|
||||
"integrity": "sha512-fl+qky3MQK8l3HTT5wq23NcdYFYNqVcUVwBblX9/IArcDlDNjEEdK68K3n8rFWxBBd2JAK0RxU7MMkLiK3vWUA==",
|
||||
"dependencies": {
|
||||
"@noble/hashes": "^1.3.2",
|
||||
"@waku/utils": "0.0.23"
|
||||
},
|
||||
"engines": {
|
||||
"node": ">=20"
|
||||
}
|
||||
},
|
||||
"node_modules/@waku/proto": {
|
||||
"resolved": "packages/proto",
|
||||
"link": true
|
||||
@ -44183,6 +44195,7 @@
|
||||
"@types/lodash": "^4.17.15",
|
||||
"@types/sinon": "^17.0.3",
|
||||
"@waku/build-utils": "^1.0.0",
|
||||
"@waku/interfaces": "0.0.30",
|
||||
"@waku/message-encryption": "^0.0.33",
|
||||
"deep-equal-in-any-order": "^2.0.6",
|
||||
"fast-check": "^3.23.2",
|
||||
@ -44574,6 +44587,7 @@
|
||||
"@waku/core": "*",
|
||||
"@waku/enr": "*",
|
||||
"@waku/interfaces": "*",
|
||||
"@waku/message-hash": "^0.1.17",
|
||||
"@waku/utils": "*",
|
||||
"app-root-path": "^3.1.0",
|
||||
"chai-as-promised": "^7.1.1",
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
import type { IDecodedMessage, IProtoMessage } from "@waku/interfaces";
|
||||
import type { IProtoMessage } from "@waku/interfaces";
|
||||
import { bytesToHex, hexToBytes } from "@waku/utils/bytes";
|
||||
import { expect } from "chai";
|
||||
|
||||
@ -93,19 +93,20 @@ describe("Message Hash: RFC Test Vectors", () => {
|
||||
expect(bytesToHex(hash)).to.equal(expectedHash);
|
||||
});
|
||||
|
||||
it("Waku message hash computation (message is IDecodedMessage)", () => {
|
||||
it("Waku message hash computation (message is IProtoMessage with version)", () => {
|
||||
const expectedHash =
|
||||
"3f11bc950dce0e3ffdcf205ae6414c01130bb5d9f20644869bff80407fa52c8f";
|
||||
const pubsubTopic = "/waku/2/default-waku/proto";
|
||||
const message: IDecodedMessage = {
|
||||
version: 0,
|
||||
const message: IProtoMessage = {
|
||||
payload: new Uint8Array(),
|
||||
pubsubTopic,
|
||||
contentTopic: "/waku/2/default-content/proto",
|
||||
meta: hexToBytes("0x73757065722d736563726574"),
|
||||
timestamp: new Date("2024-04-30T10:54:14.978Z"),
|
||||
timestamp:
|
||||
BigInt(new Date("2024-04-30T10:54:14.978Z").getTime()) *
|
||||
BigInt(1000000),
|
||||
ephemeral: undefined,
|
||||
rateLimitProof: undefined
|
||||
rateLimitProof: undefined,
|
||||
version: 0
|
||||
};
|
||||
const hash = messageHash(pubsubTopic, message);
|
||||
expect(bytesToHex(hash)).to.equal(expectedHash);
|
||||
@ -144,16 +145,17 @@ describe("messageHash and messageHashStr", () => {
|
||||
expect(hashStr).to.equal(hashStrFromBytes);
|
||||
});
|
||||
|
||||
it("messageHashStr works with IDecodedMessage", () => {
|
||||
const decodedMessage: IDecodedMessage = {
|
||||
version: 0,
|
||||
it("messageHashStr works with IProtoMessage", () => {
|
||||
const decodedMessage: IProtoMessage = {
|
||||
payload: new Uint8Array([1, 2, 3, 4]),
|
||||
pubsubTopic,
|
||||
contentTopic: "/waku/2/default-content/proto",
|
||||
meta: new Uint8Array([5, 6, 7, 8]),
|
||||
timestamp: new Date("2024-04-30T10:54:14.978Z"),
|
||||
timestamp:
|
||||
BigInt(new Date("2024-04-30T10:54:14.978Z").getTime()) *
|
||||
BigInt(1000000),
|
||||
ephemeral: undefined,
|
||||
rateLimitProof: undefined
|
||||
rateLimitProof: undefined,
|
||||
version: 0
|
||||
};
|
||||
|
||||
const hashStr = messageHashStr(pubsubTopic, decodedMessage);
|
||||
|
||||
93
packages/core/src/lib/store/rpc.spec.ts
Normal file
93
packages/core/src/lib/store/rpc.spec.ts
Normal file
@ -0,0 +1,93 @@
|
||||
import { expect } from "chai";
|
||||
|
||||
import { StoreQueryRequest } from "./rpc.js";
|
||||
|
||||
describe("StoreQueryRequest validation", () => {
|
||||
it("accepts valid content-filtered query", () => {
|
||||
const request = StoreQueryRequest.create({
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
contentTopics: ["/test/1/content/proto"],
|
||||
includeData: true,
|
||||
paginationForward: true
|
||||
});
|
||||
expect(request).to.exist;
|
||||
});
|
||||
|
||||
it("rejects content-filtered query with only pubsubTopic", () => {
|
||||
expect(() =>
|
||||
StoreQueryRequest.create({
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
contentTopics: [],
|
||||
includeData: true,
|
||||
paginationForward: true
|
||||
})
|
||||
).to.throw(
|
||||
"Both pubsubTopic and contentTopics must be set together for content-filtered queries"
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects content-filtered query with only contentTopics", () => {
|
||||
expect(() =>
|
||||
StoreQueryRequest.create({
|
||||
pubsubTopic: "",
|
||||
contentTopics: ["/test/1/content/proto"],
|
||||
includeData: true,
|
||||
paginationForward: true
|
||||
})
|
||||
).to.throw(
|
||||
"Both pubsubTopic and contentTopics must be set together for content-filtered queries"
|
||||
);
|
||||
});
|
||||
|
||||
it("accepts valid message hash query", () => {
|
||||
const request = StoreQueryRequest.create({
|
||||
pubsubTopic: "",
|
||||
contentTopics: [],
|
||||
messageHashes: [new Uint8Array([1, 2, 3, 4])],
|
||||
includeData: true,
|
||||
paginationForward: true
|
||||
});
|
||||
expect(request).to.exist;
|
||||
});
|
||||
|
||||
it("rejects hash query with content filter parameters", () => {
|
||||
expect(() =>
|
||||
StoreQueryRequest.create({
|
||||
messageHashes: [new Uint8Array([1, 2, 3, 4])],
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
contentTopics: ["/test/1/content/proto"],
|
||||
includeData: true,
|
||||
paginationForward: true
|
||||
})
|
||||
).to.throw(
|
||||
"Message hash lookup queries cannot include content filter criteria"
|
||||
);
|
||||
});
|
||||
|
||||
it("rejects hash query with time filter", () => {
|
||||
expect(() =>
|
||||
StoreQueryRequest.create({
|
||||
pubsubTopic: "",
|
||||
contentTopics: [],
|
||||
messageHashes: [new Uint8Array([1, 2, 3, 4])],
|
||||
timeStart: new Date(),
|
||||
includeData: true,
|
||||
paginationForward: true
|
||||
})
|
||||
).to.throw(
|
||||
"Message hash lookup queries cannot include content filter criteria"
|
||||
);
|
||||
});
|
||||
|
||||
it("accepts time-filtered query with content filter", () => {
|
||||
const request = StoreQueryRequest.create({
|
||||
pubsubTopic: "/waku/2/default-waku/proto",
|
||||
contentTopics: ["/test/1/content/proto"],
|
||||
timeStart: new Date(Date.now() - 3600000),
|
||||
timeEnd: new Date(),
|
||||
includeData: true,
|
||||
paginationForward: true
|
||||
});
|
||||
expect(request).to.exist;
|
||||
});
|
||||
});
|
||||
@ -14,6 +14,7 @@ export class StoreQueryRequest {
|
||||
public static create(params: QueryRequestParams): StoreQueryRequest {
|
||||
const request = new StoreQueryRequest({
|
||||
...params,
|
||||
contentTopics: params.contentTopics || [],
|
||||
requestId: uuid(),
|
||||
timeStart: params.timeStart
|
||||
? BigInt(params.timeStart.getTime() * ONE_MILLION)
|
||||
@ -27,26 +28,29 @@ export class StoreQueryRequest {
|
||||
: undefined
|
||||
});
|
||||
|
||||
// Validate request parameters based on RFC
|
||||
if (
|
||||
(params.pubsubTopic && !params.contentTopics) ||
|
||||
(!params.pubsubTopic && params.contentTopics)
|
||||
) {
|
||||
throw new Error(
|
||||
"Both pubsubTopic and contentTopics must be set or unset"
|
||||
);
|
||||
}
|
||||
const isHashQuery = params.messageHashes && params.messageHashes.length > 0;
|
||||
const hasContentTopics =
|
||||
params.contentTopics && params.contentTopics.length > 0;
|
||||
const hasTimeFilter = params.timeStart || params.timeEnd;
|
||||
|
||||
if (
|
||||
params.messageHashes &&
|
||||
(params.pubsubTopic ||
|
||||
params.contentTopics ||
|
||||
params.timeStart ||
|
||||
params.timeEnd)
|
||||
) {
|
||||
throw new Error(
|
||||
"Message hash lookup queries cannot include content filter criteria"
|
||||
);
|
||||
if (isHashQuery) {
|
||||
if (hasContentTopics || hasTimeFilter) {
|
||||
throw new Error(
|
||||
"Message hash lookup queries cannot include content filter criteria (contentTopics, timeStart, or timeEnd)"
|
||||
);
|
||||
}
|
||||
} else {
|
||||
if (
|
||||
(params.pubsubTopic &&
|
||||
(!params.contentTopics || params.contentTopics.length === 0)) ||
|
||||
(!params.pubsubTopic &&
|
||||
params.contentTopics &&
|
||||
params.contentTopics.length > 0)
|
||||
) {
|
||||
throw new Error(
|
||||
"Both pubsubTopic and contentTopics must be set together for content-filtered queries"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
return request;
|
||||
|
||||
@ -40,9 +40,14 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
|
||||
decoders: Map<string, IDecoder<T>>,
|
||||
peerId: PeerId
|
||||
): AsyncGenerator<Promise<T | undefined>[]> {
|
||||
// Only validate decoder content topics for content-filtered queries
|
||||
const isHashQuery =
|
||||
queryOpts.messageHashes && queryOpts.messageHashes.length > 0;
|
||||
if (
|
||||
!isHashQuery &&
|
||||
queryOpts.contentTopics &&
|
||||
queryOpts.contentTopics.toString() !==
|
||||
Array.from(decoders.keys()).toString()
|
||||
Array.from(decoders.keys()).toString()
|
||||
) {
|
||||
throw new Error(
|
||||
"Internal error, the decoders should match the query's content topics"
|
||||
@ -56,6 +61,13 @@ export class StoreCore extends BaseProtocol implements IStoreCore {
|
||||
paginationCursor: currentCursor
|
||||
});
|
||||
|
||||
log.info("Sending store query request:", {
|
||||
hasMessageHashes: !!queryOpts.messageHashes?.length,
|
||||
messageHashCount: queryOpts.messageHashes?.length,
|
||||
pubsubTopic: queryOpts.pubsubTopic,
|
||||
contentTopics: queryOpts.contentTopics
|
||||
});
|
||||
|
||||
let stream;
|
||||
try {
|
||||
stream = await this.getStream(peerId);
|
||||
|
||||
@ -57,10 +57,32 @@ export class Store implements IStore {
|
||||
decoders: IDecoder<T>[],
|
||||
options?: Partial<QueryRequestParams>
|
||||
): AsyncGenerator<Promise<T | undefined>[]> {
|
||||
const { pubsubTopic, contentTopics, decodersAsMap } =
|
||||
this.validateDecodersAndPubsubTopic(decoders);
|
||||
// For message hash queries, don't validate decoders but still need decodersAsMap
|
||||
const isHashQuery =
|
||||
options?.messageHashes && options.messageHashes.length > 0;
|
||||
|
||||
const queryOpts = {
|
||||
let pubsubTopic: string;
|
||||
let contentTopics: string[];
|
||||
let decodersAsMap: Map<string, IDecoder<T>>;
|
||||
|
||||
if (isHashQuery) {
|
||||
// For hash queries, we still need decoders to decode messages
|
||||
// but we don't validate pubsubTopic consistency
|
||||
// Use pubsubTopic from options if provided, otherwise from first decoder
|
||||
pubsubTopic = options.pubsubTopic || decoders[0]?.pubsubTopic || "";
|
||||
contentTopics = [];
|
||||
decodersAsMap = new Map();
|
||||
decoders.forEach((dec) => {
|
||||
decodersAsMap.set(dec.contentTopic, dec);
|
||||
});
|
||||
} else {
|
||||
const validated = this.validateDecodersAndPubsubTopic(decoders);
|
||||
pubsubTopic = validated.pubsubTopic;
|
||||
contentTopics = validated.contentTopics;
|
||||
decodersAsMap = validated.decodersAsMap;
|
||||
}
|
||||
|
||||
const queryOpts: QueryRequestParams = {
|
||||
pubsubTopic,
|
||||
contentTopics,
|
||||
includeData: true,
|
||||
|
||||
@ -55,6 +55,7 @@
|
||||
"@waku/core": "*",
|
||||
"@waku/enr": "*",
|
||||
"@waku/interfaces": "*",
|
||||
"@waku/message-hash": "^0.1.17",
|
||||
"@waku/utils": "*",
|
||||
"app-root-path": "^3.1.0",
|
||||
"chai-as-promised": "^7.1.1",
|
||||
|
||||
86
packages/tests/tests/store/message_hash.spec.ts
Normal file
86
packages/tests/tests/store/message_hash.spec.ts
Normal file
@ -0,0 +1,86 @@
|
||||
import { messageHash } from "@waku/core";
|
||||
import type { IDecodedMessage, LightNode } from "@waku/interfaces";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
beforeEachCustom,
|
||||
ServiceNode,
|
||||
tearDownNodes
|
||||
} from "../../src/index.js";
|
||||
|
||||
import {
|
||||
runStoreNodes,
|
||||
sendMessages,
|
||||
TestDecoder,
|
||||
TestShardInfo,
|
||||
totalMsgs
|
||||
} from "./utils.js";
|
||||
|
||||
describe("Waku Store, message hash query", function () {
|
||||
this.timeout(15000);
|
||||
let waku: LightNode;
|
||||
let nwaku: ServiceNode;
|
||||
|
||||
beforeEachCustom(this, async () => {
|
||||
[nwaku, waku] = await runStoreNodes(this.ctx, TestShardInfo);
|
||||
});
|
||||
|
||||
afterEachCustom(this, async () => {
|
||||
await tearDownNodes(nwaku, [waku]);
|
||||
});
|
||||
|
||||
it("can query messages normally", async function () {
|
||||
await sendMessages(
|
||||
nwaku,
|
||||
totalMsgs,
|
||||
TestDecoder.contentTopic,
|
||||
TestDecoder.pubsubTopic,
|
||||
true
|
||||
);
|
||||
|
||||
const messages: IDecodedMessage[] = [];
|
||||
for await (const page of waku.store.queryGenerator([TestDecoder])) {
|
||||
for await (const msg of page) {
|
||||
messages.push(msg as IDecodedMessage);
|
||||
}
|
||||
}
|
||||
|
||||
expect(messages.length).to.equal(totalMsgs);
|
||||
});
|
||||
|
||||
it("can query messages by message hash", async function () {
|
||||
const sentMessages = await sendMessages(
|
||||
nwaku,
|
||||
totalMsgs,
|
||||
TestDecoder.contentTopic,
|
||||
TestDecoder.pubsubTopic,
|
||||
true
|
||||
);
|
||||
const messageHashes = sentMessages.map((msg) =>
|
||||
messageHash(TestDecoder.pubsubTopic, {
|
||||
payload: Buffer.from(msg.payload, "base64"),
|
||||
contentTopic: msg.contentTopic || TestDecoder.contentTopic,
|
||||
timestamp: msg.timestamp || undefined,
|
||||
meta: undefined,
|
||||
rateLimitProof: undefined,
|
||||
ephemeral: undefined,
|
||||
version: undefined
|
||||
})
|
||||
);
|
||||
|
||||
const messages: IDecodedMessage[] = [];
|
||||
for await (const page of waku.store.queryGenerator([TestDecoder], {
|
||||
messageHashes,
|
||||
pubsubTopic: TestDecoder.pubsubTopic
|
||||
})) {
|
||||
for await (const msg of page) {
|
||||
messages.push(msg as IDecodedMessage);
|
||||
}
|
||||
}
|
||||
expect(messages.length).to.equal(messageHashes.length);
|
||||
for (const msg of messages) {
|
||||
expect(msg.contentTopic).to.equal(TestDecoder.contentTopic);
|
||||
}
|
||||
});
|
||||
});
|
||||
@ -17,6 +17,7 @@ import { expect } from "chai";
|
||||
import { Context } from "mocha";
|
||||
|
||||
import { delay, NOISE_KEY_1, runNodes, ServiceNode } from "../../src/index.js";
|
||||
import { MessageRpcQuery } from "../../src/types.js";
|
||||
|
||||
export const log = new Logger("test:store");
|
||||
|
||||
@ -49,20 +50,20 @@ export async function sendMessages(
|
||||
instance: ServiceNode,
|
||||
numMessages: number,
|
||||
contentTopic: string,
|
||||
pubsubTopic: string
|
||||
): Promise<void> {
|
||||
pubsubTopic: string,
|
||||
timestamp: boolean = false
|
||||
): Promise<MessageRpcQuery[]> {
|
||||
const messages: MessageRpcQuery[] = new Array<MessageRpcQuery>(numMessages);
|
||||
for (let i = 0; i < numMessages; i++) {
|
||||
expect(
|
||||
await instance.sendMessage(
|
||||
ServiceNode.toMessageRpcQuery({
|
||||
payload: new Uint8Array([i]),
|
||||
contentTopic: contentTopic
|
||||
}),
|
||||
pubsubTopic
|
||||
)
|
||||
).to.eq(true);
|
||||
messages[i] = ServiceNode.toMessageRpcQuery({
|
||||
payload: new Uint8Array([i]),
|
||||
contentTopic: contentTopic,
|
||||
timestamp: timestamp ? new Date() : undefined
|
||||
});
|
||||
expect(await instance.sendMessage(messages[i], pubsubTopic)).to.eq(true);
|
||||
await delay(1); // to ensure each timestamp is unique.
|
||||
}
|
||||
return messages;
|
||||
}
|
||||
|
||||
export async function sendMessagesAutosharding(
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user