From 86f730f9587e3688b79c8e846e5c005bb4d5fae4 Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Tue, 6 Aug 2024 12:06:37 +0530 Subject: [PATCH] feat!: store v3 (#2036) * feat: write proto * chore: move store v2 to a subdir * chore: update v3 proto * feat: create custom RPC * feat: implement storev3 core * chore: set store v3 as default * chore: move v2 related code * chore: update v2 imports * feat: add store-v3 sdk implementation * fix: rebase * chore: add ts-doc for store query request params * chore: update tests for new API * fix: use nanoseconds instead of millisecond for timerange * chore: improve store * chore: remove store v2 * chore: update tests * chore: fix legacy imports & proto * tests: remove manual reversal as its part of the API, update incorrect cursor error msg * chore: update default page size * chore: account for MAX_PAGE_SIZE from nwaku * fix: test * fix: sorting tests --- packages/core/src/index.ts | 2 - packages/core/src/lib/store/history_rpc.ts | 93 --- packages/core/src/lib/store/index.ts | 151 ++-- packages/core/src/lib/store/rpc.ts | 92 +++ packages/interfaces/src/store.ts | 101 ++- .../src/generated/{store.ts => store_v3.ts} | 677 +++++++----------- packages/proto/src/index.ts | 2 +- packages/proto/src/lib/store.proto | 55 -- packages/proto/src/lib/store_v3.proto | 42 ++ packages/sdk/src/protocols/store.ts | 333 ++++----- .../tests/tests/store/cursor.node.spec.ts | 46 +- packages/tests/tests/store/index.node.spec.ts | 4 +- packages/tests/tests/store/order.node.spec.ts | 28 +- .../tests/tests/store/page_size.node.spec.ts | 10 +- .../tests/tests/store/sorting.node.spec.ts | 93 ++- .../tests/store/time_filter.node.spec.ts | 12 +- 16 files changed, 732 insertions(+), 1009 deletions(-) delete mode 100644 packages/core/src/lib/store/history_rpc.ts create mode 100644 packages/core/src/lib/store/rpc.ts rename packages/proto/src/generated/{store.ts => store_v3.ts} (51%) delete mode 100644 packages/proto/src/lib/store.proto create mode 100644 packages/proto/src/lib/store_v3.proto diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 7ce22bfdc2..3368021ead 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -15,8 +15,6 @@ export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js"; export * as waku_store from "./lib/store/index.js"; export { StoreCore } from "./lib/store/index.js"; -export { PageDirection } from "./lib/store/index.js"; - export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js"; export { ConnectionManager } from "./lib/connection_manager.js"; diff --git a/packages/core/src/lib/store/history_rpc.ts b/packages/core/src/lib/store/history_rpc.ts deleted file mode 100644 index fb9049437f..0000000000 --- a/packages/core/src/lib/store/history_rpc.ts +++ /dev/null @@ -1,93 +0,0 @@ -import { proto_store as proto } from "@waku/proto"; -import type { Uint8ArrayList } from "uint8arraylist"; -import { v4 as uuid } from "uuid"; - -const OneMillion = BigInt(1_000_000); - -export enum PageDirection { - BACKWARD = "backward", - FORWARD = "forward" -} - -export interface Params { - contentTopics: string[]; - pubsubTopic: string; - pageDirection: PageDirection; - pageSize: number; - startTime?: Date; - endTime?: Date; - cursor?: proto.Index; -} - -export class HistoryRpc { - private constructor(public readonly proto: proto.HistoryRpc) {} - - public get query(): proto.HistoryQuery | undefined { - return this.proto.query; - } - - public get response(): proto.HistoryResponse | undefined { - return this.proto.response; - } - - /** - * Create History Query. - */ - public static createQuery(params: Params): HistoryRpc { - const contentFilters = params.contentTopics.map((contentTopic) => { - return { contentTopic }; - }); - - const direction = directionToProto(params.pageDirection); - - const pagingInfo = { - pageSize: BigInt(params.pageSize), - cursor: params.cursor, - direction - } as proto.PagingInfo; - - let startTime, endTime; - if (params.startTime) { - // milliseconds 10^-3 to nanoseconds 10^-9 - startTime = BigInt(params.startTime.valueOf()) * OneMillion; - } - - if (params.endTime) { - // milliseconds 10^-3 to nanoseconds 10^-9 - endTime = BigInt(params.endTime.valueOf()) * OneMillion; - } - return new HistoryRpc({ - requestId: uuid(), - query: { - pubsubTopic: params.pubsubTopic, - contentFilters, - pagingInfo, - startTime, - endTime - }, - response: undefined - }); - } - - public decode(bytes: Uint8ArrayList): HistoryRpc { - const res = proto.HistoryRpc.decode(bytes); - return new HistoryRpc(res); - } - - public encode(): Uint8Array { - return proto.HistoryRpc.encode(this.proto); - } -} - -function directionToProto( - pageDirection: PageDirection -): proto.PagingInfo.Direction { - switch (pageDirection) { - case PageDirection.BACKWARD: - return proto.PagingInfo.Direction.BACKWARD; - case PageDirection.FORWARD: - return proto.PagingInfo.Direction.FORWARD; - default: - return proto.PagingInfo.Direction.BACKWARD; - } -} diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 64ce6c1c35..352461281f 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -1,13 +1,12 @@ import type { Peer } from "@libp2p/interface"; import { - Cursor, IDecodedMessage, IDecoder, IStoreCore, Libp2p, - ProtocolCreateOptions + ProtocolCreateOptions, + QueryRequestParams } from "@waku/interfaces"; -import { proto_store as proto } from "@waku/proto"; import { Logger } from "@waku/utils"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -17,63 +16,30 @@ import { Uint8ArrayList } from "uint8arraylist"; import { BaseProtocol } from "../base_protocol.js"; import { toProtoMessage } from "../to_proto_message.js"; -import { HistoryRpc, PageDirection, Params } from "./history_rpc.js"; - -import HistoryError = proto.HistoryResponse.HistoryError; +import { + DEFAULT_PAGE_SIZE, + MAX_PAGE_SIZE, + StoreQueryRequest, + StoreQueryResponse +} from "./rpc.js"; const log = new Logger("store"); -export const StoreCodec = "/vac/waku/store/2.0.0-beta4"; +export const StoreCodec = "/vac/waku/store-query/3.0.0"; -export { PageDirection, Params }; - -export interface TimeFilter { - startTime: Date; - endTime: Date; -} - -export interface QueryOptions { - /** - * The direction in which pages are retrieved: - * - { @link PageDirection.BACKWARD }: Most recent page first. - * - { @link PageDirection.FORWARD }: Oldest page first. - * - * Note: This does not affect the ordering of messages with the page - * (the oldest message is always first). - * - * @default { @link PageDirection.BACKWARD } - */ - pageDirection?: PageDirection; - /** - * The number of message per page. - * - * @default { @link DefaultPageSize } - */ - pageSize?: number; - /** - * Retrieve messages with a timestamp within the provided values. - */ - timeFilter?: TimeFilter; - /** - * Cursor as an index to start a query from. - * The cursor index will be exclusive (i.e. the message at the cursor index will not be included in the result). - * If undefined, the query will start from the beginning or end of the history, depending on the page direction. - */ - cursor?: Cursor; -} - -/** - * Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/). - * - * The Waku Store protocol can be used to retrieved historical messages. - */ export class StoreCore extends BaseProtocol implements IStoreCore { public constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(StoreCodec, libp2p.components, log, options!.pubsubTopics!, options); + super( + StoreCodec, + libp2p.components, + log, + options?.pubsubTopics || [], + options + ); } public async *queryPerPage( - queryOpts: Params, + queryOpts: QueryRequestParams, decoders: Map>, peer: Peer ): AsyncGenerator[]> { @@ -86,11 +52,12 @@ export class StoreCore extends BaseProtocol implements IStoreCore { ); } - let currentCursor = queryOpts.cursor; + let currentCursor = queryOpts.paginationCursor; while (true) { - queryOpts.cursor = currentCursor; - - const historyRpcQuery = HistoryRpc.createQuery(queryOpts); + const storeQueryRequest = StoreQueryRequest.create({ + ...queryOpts, + paginationCursor: currentCursor + }); let stream; try { @@ -101,7 +68,7 @@ export class StoreCore extends BaseProtocol implements IStoreCore { } const res = await pipe( - [historyRpcQuery.encode()], + [storeQueryRequest.encode()], lp.encode, stream, lp.decode, @@ -113,61 +80,57 @@ export class StoreCore extends BaseProtocol implements IStoreCore { bytes.append(chunk); }); - const reply = historyRpcQuery.decode(bytes); + const storeQueryResponse = StoreQueryResponse.decode(bytes); - if (!reply.response) { - log.warn("Stopping pagination due to store `response` field missing"); + if ( + !storeQueryResponse.statusCode || + storeQueryResponse.statusCode >= 300 + ) { + const errorMessage = `Store query failed with status code: ${storeQueryResponse.statusCode}, description: ${storeQueryResponse.statusDesc}`; + log.error(errorMessage); + throw new Error(errorMessage); + } + + if (!storeQueryResponse.messages || !storeQueryResponse.messages.length) { + log.warn("Stopping pagination due to empty messages in response"); break; } - const response = reply.response as proto.HistoryResponse; + log.info( + `${storeQueryResponse.messages.length} messages retrieved from store` + ); - if (response.error && response.error !== HistoryError.NONE) { - throw "History response contains an Error: " + response.error; - } - - if (!response.messages || !response.messages.length) { - log.warn( - "Stopping pagination due to store `response.messages` field missing or empty" - ); - break; - } - - log.error(`${response.messages.length} messages retrieved from store`); - - yield response.messages.map((protoMsg) => { - const contentTopic = protoMsg.contentTopic; - if (typeof contentTopic !== "undefined") { + const decodedMessages = storeQueryResponse.messages.map((protoMsg) => { + if (!protoMsg.message) { + return Promise.resolve(undefined); + } + const contentTopic = protoMsg.message.contentTopic; + if (contentTopic) { const decoder = decoders.get(contentTopic); if (decoder) { return decoder.fromProtoObj( - queryOpts.pubsubTopic, - toProtoMessage(protoMsg) + protoMsg.pubsubTopic || "", + toProtoMessage(protoMsg.message) ); } } return Promise.resolve(undefined); }); - const nextCursor = response.pagingInfo?.cursor; - if (typeof nextCursor === "undefined") { - // If the server does not return cursor then there is an issue, - // Need to abort, or we end up in an infinite loop - log.warn( - "Stopping pagination due to `response.pagingInfo.cursor` missing from store response" - ); - break; + yield decodedMessages; + + if (queryOpts.paginationForward) { + currentCursor = + storeQueryResponse.messages[storeQueryResponse.messages.length - 1] + .messageHash; + } else { + currentCursor = storeQueryResponse.messages[0].messageHash; } - currentCursor = nextCursor; - - const responsePageSize = response.pagingInfo?.pageSize; - const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; if ( - // Response page size smaller than query, meaning this is the last page - responsePageSize && - queryPageSize && - responsePageSize < queryPageSize + storeQueryResponse.messages.length > MAX_PAGE_SIZE && + storeQueryResponse.messages.length < + (queryOpts.paginationLimit || DEFAULT_PAGE_SIZE) ) { break; } diff --git a/packages/core/src/lib/store/rpc.ts b/packages/core/src/lib/store/rpc.ts new file mode 100644 index 0000000000..2ad63361e8 --- /dev/null +++ b/packages/core/src/lib/store/rpc.ts @@ -0,0 +1,92 @@ +import { QueryRequestParams } from "@waku/interfaces"; +import { proto_store as proto } from "@waku/proto"; +import type { Uint8ArrayList } from "uint8arraylist"; +import { v4 as uuid } from "uuid"; + +// https://github.com/waku-org/nwaku/blob/7205f95cff9f49ca0bb762e8fd0bf56a6a7f3b3b/waku/waku_store/common.nim#L12 +export const DEFAULT_PAGE_SIZE = 20; +export const MAX_PAGE_SIZE = 100; +const ONE_MILLION = 1_000000; + +export class StoreQueryRequest { + public constructor(public proto: proto.StoreQueryRequest) {} + + public static create(params: QueryRequestParams): StoreQueryRequest { + const request = new StoreQueryRequest({ + ...params, + requestId: uuid(), + timeStart: params.timeStart + ? BigInt(params.timeStart.getTime() * ONE_MILLION) + : undefined, + timeEnd: params.timeEnd + ? BigInt(params.timeEnd.getTime() * ONE_MILLION) + : undefined, + messageHashes: params.messageHashes || [], + paginationLimit: params.paginationLimit + ? BigInt(params.paginationLimit) + : undefined + }); + + // Validate request parameters based on RFC + if ( + (params.pubsubTopic && !params.contentTopics) || + (!params.pubsubTopic && params.contentTopics) + ) { + throw new Error( + "Both pubsubTopic and contentTopics must be set or unset" + ); + } + + if ( + params.messageHashes && + (params.pubsubTopic || + params.contentTopics || + params.timeStart || + params.timeEnd) + ) { + throw new Error( + "Message hash lookup queries cannot include content filter criteria" + ); + } + + return request; + } + + public static decode(bytes: Uint8ArrayList): StoreQueryRequest { + const res = proto.StoreQueryRequest.decode(bytes); + return new StoreQueryRequest(res); + } + + public encode(): Uint8Array { + return proto.StoreQueryRequest.encode(this.proto); + } +} + +export class StoreQueryResponse { + public constructor(public proto: proto.StoreQueryResponse) {} + + public static decode(bytes: Uint8ArrayList): StoreQueryResponse { + const res = proto.StoreQueryResponse.decode(bytes); + return new StoreQueryResponse(res); + } + + public encode(): Uint8Array { + return proto.StoreQueryResponse.encode(this.proto); + } + + public get statusCode(): number | undefined { + return this.proto.statusCode; + } + + public get statusDesc(): string | undefined { + return this.proto.statusDesc; + } + + public get messages(): proto.WakuMessageKeyValue[] { + return this.proto.messages; + } + + public get paginationCursor(): Uint8Array | undefined { + return this.proto.paginationCursor; + } +} diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts index ac6486995f..737a6267f7 100644 --- a/packages/interfaces/src/store.ts +++ b/packages/interfaces/src/store.ts @@ -1,72 +1,101 @@ -import { proto_store as proto } from "@waku/proto"; - import type { IDecodedMessage, IDecoder } from "./message.js"; import type { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js"; -export enum PageDirection { - BACKWARD = "backward", - FORWARD = "forward" -} +export type StoreCursor = Uint8Array; -export interface TimeFilter { - startTime: Date; - endTime: Date; -} +/** + * Parameters for a store query request, as specified in the Waku Store v3 RFC. + */ +export type QueryRequestParams = { + /** + * Whether to include the full message data in the response. + * - `true`: The response will include the message content and associated pubsub topic for each matching message. + * - `false`: The response will only include the message hashes for each matching message. + * @default true + */ + includeData: boolean; -export interface Cursor { - digest: Uint8Array; - receiverTime: bigint; - senderTime: bigint; + /** + * The pubsub topic to query. This field is mandatory. + * The query will only return messages that were published on this specific pubsub topic. + */ pubsubTopic: string; -} -export type StoreQueryOptions = { /** - * The direction in which pages are retrieved: - * - { @link PageDirection.BACKWARD }: Most recent page first. - * - { @link PageDirection.FORWARD }: Oldest page first. - * - * Note: This does not affect the ordering of messages with the page - * (the oldest message is always first). - * - * @default { @link PageDirection.BACKWARD } + * The content topics to filter the messages. + * The query will only return messages that have a content topic included in this array. + * This field MUST be populated together with the `pubsubTopic` field for content topic filtering to be applied. + * If either `contentTopics` or `pubsubTopic` is not provided or empty, no content topic filtering will be applied. */ - pageDirection?: PageDirection; + contentTopics: string[]; + /** - * The number of message per page. + * The start time for the time range filter. + * The query will only return messages with a timestamp greater than or equal to `timeStart`. + * If not provided, no start time filtering will be applied. */ - pageSize?: number; + timeStart?: Date; + /** - * Retrieve messages with a timestamp within the provided values. + * The end time for the time range filter. + * The query will only return messages with a timestamp strictly less than `timeEnd`. + * If not provided, no end time filtering will be applied. */ - timeFilter?: TimeFilter; + timeEnd?: Date; + /** - * Cursor as an index to start a query from. Must be generated from a Waku - * Message. + * The message hashes to lookup. + * If provided, the query will be a message hash lookup query and will only return messages that match the specified hashes. + * If not provided or empty, the query will be a content filtered query based on the other filter parameters. + * @default undefined */ - cursor?: proto.Index; + messageHashes?: Uint8Array[]; + + /** + * The cursor to start the query from. + * The cursor represents the message hash of the last message returned in the previous query. + * The query will start from the message immediately following the cursor, excluding the message at the cursor itself. + * If not provided, the query will start from the beginning or end of the store, depending on the `paginationForward` option. + * @default undefined + */ + paginationCursor?: Uint8Array; + + /** + * The direction of pagination. + * - `true`: Forward pagination, starting from the oldest message and moving towards the newest. + * - `false`: Backward pagination, starting from the newest message and moving towards the oldest. + * @default false + */ + paginationForward: boolean; + + /** + * The maximum number of messages to retrieve per page. + * If not provided, the store's default pagination limit will be used. + * @default undefined + */ + paginationLimit?: number; }; export type IStoreCore = IBaseProtocolCore; export type IStoreSDK = IBaseProtocolSDK & { protocol: IBaseProtocolCore; - createCursor(message: IDecodedMessage): Cursor; + createCursor(message: IDecodedMessage): StoreCursor; queryGenerator: ( decoders: IDecoder[], - options?: StoreQueryOptions + options?: Partial ) => AsyncGenerator[]>; queryWithOrderedCallback: ( decoders: IDecoder[], callback: (message: T) => Promise | boolean | void, - options?: StoreQueryOptions + options?: Partial ) => Promise; queryWithPromiseCallback: ( decoders: IDecoder[], callback: ( message: Promise ) => Promise | boolean | void, - options?: StoreQueryOptions + options?: Partial ) => Promise; }; diff --git a/packages/proto/src/generated/store.ts b/packages/proto/src/generated/store_v3.ts similarity index 51% rename from packages/proto/src/generated/store.ts rename to packages/proto/src/generated/store_v3.ts index 82eef846d8..1cbfa2e4ef 100644 --- a/packages/proto/src/generated/store.ts +++ b/packages/proto/src/generated/store_v3.ts @@ -4,148 +4,39 @@ /* eslint-disable @typescript-eslint/no-unnecessary-boolean-literal-compare */ /* eslint-disable @typescript-eslint/no-empty-interface */ -import { type Codec, CodeError, decodeMessage, type DecodeOptions, encodeMessage, enumeration, message } from 'protons-runtime' +import { type Codec, CodeError, decodeMessage, type DecodeOptions, encodeMessage, message } from 'protons-runtime' import { alloc as uint8ArrayAlloc } from 'uint8arrays/alloc' import type { Uint8ArrayList } from 'uint8arraylist' -export interface Index { - digest: Uint8Array - receiverTime: bigint - senderTime: bigint - pubsubTopic: string +export interface WakuMessageKeyValue { + messageHash?: Uint8Array + message?: WakuMessage + pubsubTopic?: string } -export namespace Index { - let _codec: Codec +export namespace WakuMessageKeyValue { + let _codec: Codec - export const codec = (): Codec => { + export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, w, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { w.fork() } - if ((obj.digest != null && obj.digest.byteLength > 0)) { + if (obj.messageHash != null) { w.uint32(10) - w.bytes(obj.digest) + w.bytes(obj.messageHash) } - if ((obj.receiverTime != null && obj.receiverTime !== 0n)) { - w.uint32(16) - w.sint64(obj.receiverTime) - } - - if ((obj.senderTime != null && obj.senderTime !== 0n)) { - w.uint32(24) - w.sint64(obj.senderTime) - } - - if ((obj.pubsubTopic != null && obj.pubsubTopic !== '')) { - w.uint32(34) - w.string(obj.pubsubTopic) - } - - if (opts.lengthDelimited !== false) { - w.ldelim() - } - }, (reader, length, opts = {}) => { - const obj: any = { - digest: uint8ArrayAlloc(0), - receiverTime: 0n, - senderTime: 0n, - pubsubTopic: '' - } - - const end = length == null ? reader.len : reader.pos + length - - while (reader.pos < end) { - const tag = reader.uint32() - - switch (tag >>> 3) { - case 1: { - obj.digest = reader.bytes() - break - } - case 2: { - obj.receiverTime = reader.sint64() - break - } - case 3: { - obj.senderTime = reader.sint64() - break - } - case 4: { - obj.pubsubTopic = reader.string() - break - } - default: { - reader.skipType(tag & 7) - break - } - } - } - - return obj - }) - } - - return _codec - } - - export const encode = (obj: Partial): Uint8Array => { - return encodeMessage(obj, Index.codec()) - } - - export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): Index => { - return decodeMessage(buf, Index.codec(), opts) - } -} - -export interface PagingInfo { - pageSize?: bigint - cursor?: Index - direction?: PagingInfo.Direction -} - -export namespace PagingInfo { - export enum Direction { - BACKWARD = 'BACKWARD', - FORWARD = 'FORWARD' - } - - enum __DirectionValues { - BACKWARD = 0, - FORWARD = 1 - } - - export namespace Direction { - export const codec = (): Codec => { - return enumeration(__DirectionValues) - } - } - - let _codec: Codec - - export const codec = (): Codec => { - if (_codec == null) { - _codec = message((obj, w, opts = {}) => { - if (opts.lengthDelimited !== false) { - w.fork() - } - - if (obj.pageSize != null) { - w.uint32(8) - w.uint64(obj.pageSize) - } - - if (obj.cursor != null) { + if (obj.message != null) { w.uint32(18) - Index.codec().encode(obj.cursor, w) + WakuMessage.codec().encode(obj.message, w) } - if (obj.direction != null) { - w.uint32(24) - PagingInfo.Direction.codec().encode(obj.direction, w) + if (obj.pubsubTopic != null) { + w.uint32(26) + w.string(obj.pubsubTopic) } if (opts.lengthDelimited !== false) { @@ -161,189 +52,19 @@ export namespace PagingInfo { switch (tag >>> 3) { case 1: { - obj.pageSize = reader.uint64() + obj.messageHash = reader.bytes() break } case 2: { - obj.cursor = Index.codec().decode(reader, reader.uint32(), { - limits: opts.limits?.cursor + obj.message = WakuMessage.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.message }) break } case 3: { - obj.direction = PagingInfo.Direction.codec().decode(reader) - break - } - default: { - reader.skipType(tag & 7) - break - } - } - } - - return obj - }) - } - - return _codec - } - - export const encode = (obj: Partial): Uint8Array => { - return encodeMessage(obj, PagingInfo.codec()) - } - - export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): PagingInfo => { - return decodeMessage(buf, PagingInfo.codec(), opts) - } -} - -export interface ContentFilter { - contentTopic: string -} - -export namespace ContentFilter { - let _codec: Codec - - export const codec = (): Codec => { - if (_codec == null) { - _codec = message((obj, w, opts = {}) => { - if (opts.lengthDelimited !== false) { - w.fork() - } - - if ((obj.contentTopic != null && obj.contentTopic !== '')) { - w.uint32(10) - w.string(obj.contentTopic) - } - - if (opts.lengthDelimited !== false) { - w.ldelim() - } - }, (reader, length, opts = {}) => { - const obj: any = { - contentTopic: '' - } - - const end = length == null ? reader.len : reader.pos + length - - while (reader.pos < end) { - const tag = reader.uint32() - - switch (tag >>> 3) { - case 1: { - obj.contentTopic = reader.string() - break - } - default: { - reader.skipType(tag & 7) - break - } - } - } - - return obj - }) - } - - return _codec - } - - export const encode = (obj: Partial): Uint8Array => { - return encodeMessage(obj, ContentFilter.codec()) - } - - export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): ContentFilter => { - return decodeMessage(buf, ContentFilter.codec(), opts) - } -} - -export interface HistoryQuery { - pubsubTopic?: string - contentFilters: ContentFilter[] - pagingInfo?: PagingInfo - startTime?: bigint - endTime?: bigint -} - -export namespace HistoryQuery { - let _codec: Codec - - export const codec = (): Codec => { - if (_codec == null) { - _codec = message((obj, w, opts = {}) => { - if (opts.lengthDelimited !== false) { - w.fork() - } - - if (obj.pubsubTopic != null) { - w.uint32(18) - w.string(obj.pubsubTopic) - } - - if (obj.contentFilters != null) { - for (const value of obj.contentFilters) { - w.uint32(26) - ContentFilter.codec().encode(value, w) - } - } - - if (obj.pagingInfo != null) { - w.uint32(34) - PagingInfo.codec().encode(obj.pagingInfo, w) - } - - if (obj.startTime != null) { - w.uint32(40) - w.sint64(obj.startTime) - } - - if (obj.endTime != null) { - w.uint32(48) - w.sint64(obj.endTime) - } - - if (opts.lengthDelimited !== false) { - w.ldelim() - } - }, (reader, length, opts = {}) => { - const obj: any = { - contentFilters: [] - } - - const end = length == null ? reader.len : reader.pos + length - - while (reader.pos < end) { - const tag = reader.uint32() - - switch (tag >>> 3) { - case 2: { obj.pubsubTopic = reader.string() break } - case 3: { - if (opts.limits?.contentFilters != null && obj.contentFilters.length === opts.limits.contentFilters) { - throw new CodeError('decode error - map field "contentFilters" had too many elements', 'ERR_MAX_LENGTH') - } - - obj.contentFilters.push(ContentFilter.codec().decode(reader, reader.uint32(), { - limits: opts.limits?.contentFilters$ - })) - break - } - case 4: { - obj.pagingInfo = PagingInfo.codec().decode(reader, reader.uint32(), { - limits: opts.limits?.pagingInfo - }) - break - } - case 5: { - obj.startTime = reader.sint64() - break - } - case 6: { - obj.endTime = reader.sint64() - break - } default: { reader.skipType(tag & 7) break @@ -358,138 +79,34 @@ export namespace HistoryQuery { return _codec } - export const encode = (obj: Partial): Uint8Array => { - return encodeMessage(obj, HistoryQuery.codec()) + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, WakuMessageKeyValue.codec()) } - export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): HistoryQuery => { - return decodeMessage(buf, HistoryQuery.codec(), opts) + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): WakuMessageKeyValue => { + return decodeMessage(buf, WakuMessageKeyValue.codec(), opts) } } -export interface HistoryResponse { - messages: WakuMessage[] - pagingInfo?: PagingInfo - error: HistoryResponse.HistoryError -} - -export namespace HistoryResponse { - export enum HistoryError { - NONE = 'NONE', - INVALID_CURSOR = 'INVALID_CURSOR', - TOO_MANY_REQUESTS = 'TOO_MANY_REQUESTS', - SERVICE_UNAVAILABLE = 'SERVICE_UNAVAILABLE' - } - - enum __HistoryErrorValues { - NONE = 0, - INVALID_CURSOR = 1, - TOO_MANY_REQUESTS = 429, - SERVICE_UNAVAILABLE = 503 - } - - export namespace HistoryError { - export const codec = (): Codec => { - return enumeration(__HistoryErrorValues) - } - } - - let _codec: Codec - - export const codec = (): Codec => { - if (_codec == null) { - _codec = message((obj, w, opts = {}) => { - if (opts.lengthDelimited !== false) { - w.fork() - } - - if (obj.messages != null) { - for (const value of obj.messages) { - w.uint32(18) - WakuMessage.codec().encode(value, w) - } - } - - if (obj.pagingInfo != null) { - w.uint32(26) - PagingInfo.codec().encode(obj.pagingInfo, w) - } - - if (obj.error != null && __HistoryErrorValues[obj.error] !== 0) { - w.uint32(32) - HistoryResponse.HistoryError.codec().encode(obj.error, w) - } - - if (opts.lengthDelimited !== false) { - w.ldelim() - } - }, (reader, length, opts = {}) => { - const obj: any = { - messages: [], - error: HistoryError.NONE - } - - const end = length == null ? reader.len : reader.pos + length - - while (reader.pos < end) { - const tag = reader.uint32() - - switch (tag >>> 3) { - case 2: { - if (opts.limits?.messages != null && obj.messages.length === opts.limits.messages) { - throw new CodeError('decode error - map field "messages" had too many elements', 'ERR_MAX_LENGTH') - } - - obj.messages.push(WakuMessage.codec().decode(reader, reader.uint32(), { - limits: opts.limits?.messages$ - })) - break - } - case 3: { - obj.pagingInfo = PagingInfo.codec().decode(reader, reader.uint32(), { - limits: opts.limits?.pagingInfo - }) - break - } - case 4: { - obj.error = HistoryResponse.HistoryError.codec().decode(reader) - break - } - default: { - reader.skipType(tag & 7) - break - } - } - } - - return obj - }) - } - - return _codec - } - - export const encode = (obj: Partial): Uint8Array => { - return encodeMessage(obj, HistoryResponse.codec()) - } - - export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): HistoryResponse => { - return decodeMessage(buf, HistoryResponse.codec(), opts) - } -} - -export interface HistoryRpc { +export interface StoreQueryRequest { requestId: string - query?: HistoryQuery - response?: HistoryResponse + includeData: boolean + pubsubTopic?: string + contentTopics: string[] + timeStart?: bigint + timeEnd?: bigint + messageHashes: Uint8Array[] + paginationCursor?: Uint8Array + paginationForward: boolean + paginationLimit?: bigint } -export namespace HistoryRpc { - let _codec: Codec +export namespace StoreQueryRequest { + let _codec: Codec - export const codec = (): Codec => { + export const codec = (): Codec => { if (_codec == null) { - _codec = message((obj, w, opts = {}) => { + _codec = message((obj, w, opts = {}) => { if (opts.lengthDelimited !== false) { w.fork() } @@ -499,14 +116,53 @@ export namespace HistoryRpc { w.string(obj.requestId) } - if (obj.query != null) { - w.uint32(18) - HistoryQuery.codec().encode(obj.query, w) + if ((obj.includeData != null && obj.includeData !== false)) { + w.uint32(16) + w.bool(obj.includeData) } - if (obj.response != null) { - w.uint32(26) - HistoryResponse.codec().encode(obj.response, w) + if (obj.pubsubTopic != null) { + w.uint32(82) + w.string(obj.pubsubTopic) + } + + if (obj.contentTopics != null) { + for (const value of obj.contentTopics) { + w.uint32(90) + w.string(value) + } + } + + if (obj.timeStart != null) { + w.uint32(96) + w.sint64(obj.timeStart) + } + + if (obj.timeEnd != null) { + w.uint32(104) + w.sint64(obj.timeEnd) + } + + if (obj.messageHashes != null) { + for (const value of obj.messageHashes) { + w.uint32(162) + w.bytes(value) + } + } + + if (obj.paginationCursor != null) { + w.uint32(410) + w.bytes(obj.paginationCursor) + } + + if ((obj.paginationForward != null && obj.paginationForward !== false)) { + w.uint32(416) + w.bool(obj.paginationForward) + } + + if (obj.paginationLimit != null) { + w.uint32(424) + w.uint64(obj.paginationLimit) } if (opts.lengthDelimited !== false) { @@ -514,7 +170,11 @@ export namespace HistoryRpc { } }, (reader, length, opts = {}) => { const obj: any = { - requestId: '' + requestId: '', + includeData: false, + contentTopics: [], + messageHashes: [], + paginationForward: false } const end = length == null ? reader.len : reader.pos + length @@ -528,15 +188,47 @@ export namespace HistoryRpc { break } case 2: { - obj.query = HistoryQuery.codec().decode(reader, reader.uint32(), { - limits: opts.limits?.query - }) + obj.includeData = reader.bool() break } - case 3: { - obj.response = HistoryResponse.codec().decode(reader, reader.uint32(), { - limits: opts.limits?.response - }) + case 10: { + obj.pubsubTopic = reader.string() + break + } + case 11: { + if (opts.limits?.contentTopics != null && obj.contentTopics.length === opts.limits.contentTopics) { + throw new CodeError('decode error - map field "contentTopics" had too many elements', 'ERR_MAX_LENGTH') + } + + obj.contentTopics.push(reader.string()) + break + } + case 12: { + obj.timeStart = reader.sint64() + break + } + case 13: { + obj.timeEnd = reader.sint64() + break + } + case 20: { + if (opts.limits?.messageHashes != null && obj.messageHashes.length === opts.limits.messageHashes) { + throw new CodeError('decode error - map field "messageHashes" had too many elements', 'ERR_MAX_LENGTH') + } + + obj.messageHashes.push(reader.bytes()) + break + } + case 51: { + obj.paginationCursor = reader.bytes() + break + } + case 52: { + obj.paginationForward = reader.bool() + break + } + case 53: { + obj.paginationLimit = reader.uint64() break } default: { @@ -553,12 +245,121 @@ export namespace HistoryRpc { return _codec } - export const encode = (obj: Partial): Uint8Array => { - return encodeMessage(obj, HistoryRpc.codec()) + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, StoreQueryRequest.codec()) } - export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): HistoryRpc => { - return decodeMessage(buf, HistoryRpc.codec(), opts) + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): StoreQueryRequest => { + return decodeMessage(buf, StoreQueryRequest.codec(), opts) + } +} + +export interface StoreQueryResponse { + requestId: string + statusCode?: number + statusDesc?: string + messages: WakuMessageKeyValue[] + paginationCursor?: Uint8Array +} + +export namespace StoreQueryResponse { + let _codec: Codec + + export const codec = (): Codec => { + if (_codec == null) { + _codec = message((obj, w, opts = {}) => { + if (opts.lengthDelimited !== false) { + w.fork() + } + + if ((obj.requestId != null && obj.requestId !== '')) { + w.uint32(10) + w.string(obj.requestId) + } + + if (obj.statusCode != null) { + w.uint32(80) + w.uint32(obj.statusCode) + } + + if (obj.statusDesc != null) { + w.uint32(90) + w.string(obj.statusDesc) + } + + if (obj.messages != null) { + for (const value of obj.messages) { + w.uint32(162) + WakuMessageKeyValue.codec().encode(value, w) + } + } + + if (obj.paginationCursor != null) { + w.uint32(410) + w.bytes(obj.paginationCursor) + } + + if (opts.lengthDelimited !== false) { + w.ldelim() + } + }, (reader, length, opts = {}) => { + const obj: any = { + requestId: '', + messages: [] + } + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: { + obj.requestId = reader.string() + break + } + case 10: { + obj.statusCode = reader.uint32() + break + } + case 11: { + obj.statusDesc = reader.string() + break + } + case 20: { + if (opts.limits?.messages != null && obj.messages.length === opts.limits.messages) { + throw new CodeError('decode error - map field "messages" had too many elements', 'ERR_MAX_LENGTH') + } + + obj.messages.push(WakuMessageKeyValue.codec().decode(reader, reader.uint32(), { + limits: opts.limits?.messages$ + })) + break + } + case 51: { + obj.paginationCursor = reader.bytes() + break + } + default: { + reader.skipType(tag & 7) + break + } + } + } + + return obj + }) + } + + return _codec + } + + export const encode = (obj: Partial): Uint8Array => { + return encodeMessage(obj, StoreQueryResponse.codec()) + } + + export const decode = (buf: Uint8Array | Uint8ArrayList, opts?: DecodeOptions): StoreQueryResponse => { + return decodeMessage(buf, StoreQueryResponse.codec(), opts) } } diff --git a/packages/proto/src/index.ts b/packages/proto/src/index.ts index 479dd6d40a..4dc3f16ba9 100644 --- a/packages/proto/src/index.ts +++ b/packages/proto/src/index.ts @@ -10,7 +10,7 @@ export * as proto_filter_v2 from "./generated/filter_v2.js"; export * as proto_lightpush from "./generated/light_push.js"; export { PushResponse } from "./generated/light_push.js"; -export * as proto_store from "./generated/store.js"; +export * as proto_store from './generated/store_v3.js' export * as proto_peer_exchange from "./generated/peer_exchange.js"; diff --git a/packages/proto/src/lib/store.proto b/packages/proto/src/lib/store.proto deleted file mode 100644 index a083f88524..0000000000 --- a/packages/proto/src/lib/store.proto +++ /dev/null @@ -1,55 +0,0 @@ -// 13/WAKU2-STORE rfc: https://rfc.vac.dev/spec/13/ -// Protocol identifier: /vac/waku/store/2.0.0-beta4 - -syntax = "proto3"; - -import "message.proto"; - -message Index { - bytes digest = 1; - sint64 receiver_time = 2; - sint64 sender_time = 3; - string pubsub_topic = 4; -} - -message PagingInfo { - optional uint64 page_size = 1; - optional Index cursor = 2; - enum Direction { - BACKWARD = 0; - FORWARD = 1; - } - optional Direction direction = 3; -} - -message ContentFilter { - string content_topic = 1; -} - -message HistoryQuery { - // The first field is reserved for future use - optional string pubsub_topic = 2; - repeated ContentFilter content_filters = 3; - optional PagingInfo paging_info = 4; - optional sint64 start_time = 5; - optional sint64 end_time = 6; -} - -message HistoryResponse { - // The first field is reserved for future use - repeated WakuMessage messages = 2; - optional PagingInfo paging_info = 3; - enum HistoryError { - NONE = 0; - INVALID_CURSOR = 1; - TOO_MANY_REQUESTS = 429; - SERVICE_UNAVAILABLE = 503; - } - HistoryError error = 4; -} - -message HistoryRpc { - string request_id = 1; - optional HistoryQuery query = 2; - optional HistoryResponse response = 3; -} diff --git a/packages/proto/src/lib/store_v3.proto b/packages/proto/src/lib/store_v3.proto new file mode 100644 index 0000000000..976039e62a --- /dev/null +++ b/packages/proto/src/lib/store_v3.proto @@ -0,0 +1,42 @@ +// Protocol identifier: /vac/waku/store-query/3.0.0 +syntax = "proto3"; + +import "message.proto"; + +message WakuMessageKeyValue { + optional bytes message_hash = 1; // Globally unique key for a Waku Message + + // Full message content and associated pubsub_topic as value + optional WakuMessage message = 2; + optional string pubsub_topic = 3; +} + +message StoreQueryRequest { + string request_id = 1; + bool include_data = 2; // Response should include full message content + + // Filter criteria for content-filtered queries + optional string pubsub_topic = 10; + repeated string content_topics = 11; + optional sint64 time_start = 12; + optional sint64 time_end = 13; + + // List of key criteria for lookup queries + repeated bytes message_hashes = 20; // Message hashes (keys) to lookup + + // Pagination info. 50 Reserved + optional bytes pagination_cursor = 51; // Message hash (key) from where to start query (exclusive) + bool pagination_forward = 52; + optional uint64 pagination_limit = 53; +} + +message StoreQueryResponse { + string request_id = 1; + + optional uint32 status_code = 10; + optional string status_desc = 11; + + repeated WakuMessageKeyValue messages = 20; + + optional bytes pagination_cursor = 51; +} \ No newline at end of file diff --git a/packages/sdk/src/protocols/store.ts b/packages/sdk/src/protocols/store.ts index 6d6deb75e4..c84e5e87f0 100644 --- a/packages/sdk/src/protocols/store.ts +++ b/packages/sdk/src/protocols/store.ts @@ -1,27 +1,26 @@ -import { sha256 } from "@noble/hashes/sha256"; -import { ConnectionManager, StoreCore, waku_store } from "@waku/core"; +import { ConnectionManager, StoreCore } from "@waku/core"; import { - Cursor, IDecodedMessage, IDecoder, IStoreSDK, - type Libp2p, - PageDirection, - type ProtocolCreateOptions + Libp2p, + ProtocolCreateOptions, + QueryRequestParams, + StoreCursor } from "@waku/interfaces"; +import { messageHash } from "@waku/message-hash"; import { ensurePubsubTopicIsConfigured, isDefined, Logger } from "@waku/utils"; -import { concat } from "@waku/utils/bytes"; -import { utf8ToBytes } from "../index.js"; - -import { BaseProtocolSDK } from "./base_protocol.js"; - -export const DefaultPageSize = 10; +import { BaseProtocolSDK } from "./base_protocol"; const DEFAULT_NUM_PEERS = 1; -const log = new Logger("waku:store:protocol"); +const log = new Logger("waku:store:sdk"); +/** + * StoreSDK is an implementation of the IStoreSDK interface. + * It provides methods to interact with the Waku Store protocol. + */ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { public readonly protocol: StoreCore; @@ -30,46 +29,35 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { libp2p: Libp2p, options?: ProtocolCreateOptions ) { - // TODO: options.numPeersToUse is disregarded: https://github.com/waku-org/js-waku/issues/1685 super(new StoreCore(libp2p, options), connectionManager, { numPeersToUse: DEFAULT_NUM_PEERS }); - this.protocol = this.core as StoreCore; } /** - * Do a query to a Waku Store to retrieve historical/missed messages. + * Queries the Waku Store for historical messages using the provided decoders and options. + * Returns an asynchronous generator that yields promises of decoded messages. * - * This is a generator, useful if you want most control on how messages - * are processed. - * - * The order of the messages returned by the remote Waku node SHOULD BE - * as follows: - * - within a page, messages SHOULD be ordered from oldest to most recent - * - pages direction depends on { @link QueryOptions.pageDirection } - * @throws If not able to reach a Waku Store peer to query, - * or if an error is encountered when processing the reply, - * or if two decoders with the same content topic are passed. - * - * This API only supports querying a single pubsub topic at a time. - * If multiple decoders are provided, they must all have the same pubsub topic. - * @throws If multiple decoders with different pubsub topics are provided. - * @throws If no decoders are provided. - * @throws If no decoders are found for the provided pubsub topic. + * @param decoders - An array of message decoders. + * @param options - Optional query parameters. + * @returns An asynchronous generator of promises of decoded messages. + * @throws If no peers are available to query or if an error occurs during the query. */ public async *queryGenerator( decoders: IDecoder[], - options?: waku_store.QueryOptions + options?: Partial ): AsyncGenerator[]> { const { pubsubTopic, contentTopics, decodersAsMap } = - this.validateDecodersAndPubsubTopic(decoders, options); + this.validateDecodersAndPubsubTopic(decoders); - const queryOpts = this.constructOptions( + const queryOpts = { pubsubTopic, contentTopics, - options - ); + includeData: true, + paginationForward: true, + ...options + }; const peer = ( await this.protocol.getPeers({ @@ -77,9 +65,12 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { maxBootstrapPeers: 1 }) )[0]; + if (!peer) { + log.error("No peers available to query"); + throw new Error("No peers available to query"); + } - if (!peer) throw new Error("No peers available to query"); - + log.info(`Querying store with options: ${JSON.stringify(options)}`); const responseGenerator = this.protocol.queryPerPage( queryOpts, decodersAsMap, @@ -92,56 +83,40 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { } /** - * Do a query to a Waku Store to retrieve historical/missed messages. + * Queries the Waku Store for historical messages and processes them with the provided callback in order. * - * The callback function takes a `WakuMessage` in input, - * messages are processed in order: - * - oldest to latest if `options.pageDirection` == { @link PageDirection.FORWARD } - * - latest to oldest if `options.pageDirection` == { @link PageDirection.BACKWARD } - * - * The ordering may affect performance. - * The ordering depends on the behavior of the remote store node. - * If strong ordering is needed, you may need to handle this at application level - * and set your own timestamps too (the WakuMessage timestamps are not certified). - * - * @throws If not able to reach a Waku Store peer to query, - * or if an error is encountered when processing the reply, - * or if two decoders with the same content topic are passed. + * @param decoders - An array of message decoders. + * @param callback - A callback function to process each decoded message. + * @param options - Optional query parameters. + * @returns A promise that resolves when the query and message processing are completed. */ public async queryWithOrderedCallback( decoders: IDecoder[], callback: (message: T) => Promise | boolean | void, - options?: waku_store.QueryOptions + options?: Partial ): Promise { + log.info("Querying store with ordered callback"); for await (const promises of this.queryGenerator(decoders, options)) { - if (await this.processMessages(promises, callback, options)) break; + if (await this.processMessages(promises, callback)) break; } } /** - * Do a query to a Waku Store to retrieve historical/missed messages. - * The callback function takes a `Promise` in input, - * useful if messages need to be decrypted and performance matters. + * Queries the Waku Store for historical messages and processes them with the provided callback using promises. * - * The order of the messages passed to the callback is as follows: - * - within a page, messages are expected to be ordered from oldest to most recent - * - pages direction depends on { @link QueryOptions.pageDirection } - * - * Do note that the resolution of the `Promise( decoders: IDecoder[], callback: ( message: Promise ) => Promise | boolean | void, - options?: waku_store.QueryOptions + options?: Partial ): Promise { + log.info("Querying store with promise callback"); let abort = false; for await (const page of this.queryGenerator(decoders, options)) { const _promises = page.map(async (msgPromise) => { @@ -154,144 +129,21 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { } } - public createCursor(message: IDecodedMessage): Cursor { - if ( - !message || - !message.timestamp || - !message.payload || - !message.contentTopic - ) { - throw new Error("Message is missing required fields"); - } - - const contentTopicBytes = utf8ToBytes(message.contentTopic); - - const digest = sha256(concat([contentTopicBytes, message.payload])); - - const messageTime = BigInt(message.timestamp.getTime()) * BigInt(1000000); - - return { - digest, - pubsubTopic: message.pubsubTopic, - senderTime: messageTime, - receiverTime: messageTime - }; - } - - private validateDecodersAndPubsubTopic( - decoders: IDecoder[], - options?: waku_store.QueryOptions - ): { - pubsubTopic: string; - contentTopics: string[]; - decodersAsMap: Map>; - } { - if (decoders.length === 0) { - throw new Error("No decoders provided"); - } - - // convert array to set to remove duplicates - const uniquePubsubTopicsInQuery = Array.from( - new Set(decoders.map((decoder) => decoder.pubsubTopic)) - ); - // If multiple pubsub topics are provided, throw an error - if (uniquePubsubTopicsInQuery.length > 1) { - throw new Error( - "API does not support querying multiple pubsub topics at once" - ); - } - - // we can be certain that there is only one pubsub topic in the query - const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0]; - - ensurePubsubTopicIsConfigured( - pubsubTopicForQuery, - this.protocol.pubsubTopics - ); - - // check that the pubsubTopic from the Cursor and Decoder match - if ( - options?.cursor?.pubsubTopic && - options.cursor.pubsubTopic !== pubsubTopicForQuery - ) { - throw new Error( - `Cursor pubsub topic (${options?.cursor?.pubsubTopic}) does not match decoder pubsub topic (${pubsubTopicForQuery})` - ); - } - - const decodersAsMap = new Map(); - decoders.forEach((dec) => { - if (decodersAsMap.has(dec.contentTopic)) { - throw new Error( - "API does not support different decoder per content topic" - ); - } - decodersAsMap.set(dec.contentTopic, dec); - }); - - const contentTopics = decoders - .filter((decoder) => decoder.pubsubTopic === pubsubTopicForQuery) - .map((dec) => dec.contentTopic); - - if (contentTopics.length === 0) { - throw new Error("No decoders found for topic " + pubsubTopicForQuery); - } - - return { - pubsubTopic: pubsubTopicForQuery, - contentTopics, - decodersAsMap - }; - } - - private constructOptions( - pubsubTopic: string, - contentTopics: string[], - options: waku_store.QueryOptions = {} - ): waku_store.Params { - let startTime, endTime; - - if (options?.timeFilter) { - startTime = options.timeFilter.startTime; - endTime = options.timeFilter.endTime; - } - - if (!startTime) { - log.warn("No start time provided"); - } - if (!endTime) { - log.warn("No end time provided"); - } - - const queryOpts = Object.assign( - { - pubsubTopic: pubsubTopic, - pageDirection: PageDirection.BACKWARD, - pageSize: DefaultPageSize - }, - options, - { contentTopics, startTime, endTime } - ); - - return queryOpts; - } - /** * Processes messages based on the provided callback and options. + * + * @param messages - An array of promises of decoded messages. + * @param callback - A callback function to process each decoded message. + * @returns A promise that resolves to a boolean indicating whether the processing should abort. * @private */ private async processMessages( messages: Promise[], - callback: (message: T) => Promise | boolean | void, - options?: waku_store.QueryOptions + callback: (message: T) => Promise | boolean | void ): Promise { let abort = false; const messagesOrUndef: Array = await Promise.all(messages); - let processedMessages: Array = messagesOrUndef.filter(isDefined); - - if (this.shouldReverseOrder(options)) { - processedMessages = processedMessages.reverse(); - } + const processedMessages: Array = messagesOrUndef.filter(isDefined); await Promise.all( processedMessages.map(async (msg) => { @@ -305,24 +157,91 @@ export class StoreSDK extends BaseProtocolSDK implements IStoreSDK { } /** - * Determines whether to reverse the order of messages based on the provided options. + * Creates a cursor based on the provided decoded message. * - * Messages in pages are ordered from oldest (first) to most recent (last). - * https://github.com/vacp2p/rfc/issues/533 + * @param message - The decoded message. + * @returns A StoreCursor representing the message. + */ + public createCursor(message: IDecodedMessage): StoreCursor { + return messageHash(message.pubsubTopic, message); + } + + /** + * Validates the provided decoders and pubsub topic. * + * @param decoders - An array of message decoders. + * @returns An object containing the pubsub topic, content topics, and a map of decoders. + * @throws If no decoders are provided, if multiple pubsub topics are provided, or if no decoders are found for the pubsub topic. * @private */ - private shouldReverseOrder(options?: waku_store.QueryOptions): boolean { - return ( - typeof options?.pageDirection === "undefined" || - options?.pageDirection === PageDirection.BACKWARD + private validateDecodersAndPubsubTopic( + decoders: IDecoder[] + ): { + pubsubTopic: string; + contentTopics: string[]; + decodersAsMap: Map>; + } { + if (decoders.length === 0) { + log.error("No decoders provided"); + throw new Error("No decoders provided"); + } + + const uniquePubsubTopicsInQuery = Array.from( + new Set(decoders.map((decoder) => decoder.pubsubTopic)) ); + if (uniquePubsubTopicsInQuery.length > 1) { + log.error("API does not support querying multiple pubsub topics at once"); + throw new Error( + "API does not support querying multiple pubsub topics at once" + ); + } + + const pubsubTopicForQuery = uniquePubsubTopicsInQuery[0]; + + ensurePubsubTopicIsConfigured( + pubsubTopicForQuery, + this.protocol.pubsubTopics + ); + + const decodersAsMap = new Map(); + decoders.forEach((dec) => { + if (decodersAsMap.has(dec.contentTopic)) { + log.error("API does not support different decoder per content topic"); + throw new Error( + "API does not support different decoder per content topic" + ); + } + decodersAsMap.set(dec.contentTopic, dec); + }); + + const contentTopics = decoders + .filter((decoder) => decoder.pubsubTopic === pubsubTopicForQuery) + .map((dec) => dec.contentTopic); + + if (contentTopics.length === 0) { + log.error(`No decoders found for topic ${pubsubTopicForQuery}`); + throw new Error("No decoders found for topic " + pubsubTopicForQuery); + } + + return { + pubsubTopic: pubsubTopicForQuery, + contentTopics, + decodersAsMap + }; } } +/** + * Factory function to create an instance of the StoreSDK. + * + * @param init - Partial options for protocol creation. + * @returns A function that takes a Libp2p instance and returns a StoreSDK instance. + */ export function wakuStore( connectionManager: ConnectionManager, init: Partial = {} ): (libp2p: Libp2p) => IStoreSDK { - return (libp2p: Libp2p) => new StoreSDK(connectionManager, libp2p, init); + return (libp2p: Libp2p) => { + return new StoreSDK(connectionManager, libp2p, init); + }; } diff --git a/packages/tests/tests/store/cursor.node.spec.ts b/packages/tests/tests/store/cursor.node.spec.ts index 60be3f1869..cfcad90b2a 100644 --- a/packages/tests/tests/store/cursor.node.spec.ts +++ b/packages/tests/tests/store/cursor.node.spec.ts @@ -53,7 +53,7 @@ describe("Waku Store, cursor", function () { // messages in reversed order (first message at last index) const messages: DecodedMessage[] = []; for await (const page of waku.store.queryGenerator([TestDecoder])) { - for await (const msg of page.reverse()) { + for await (const msg of page) { messages.push(msg as DecodedMessage); } } @@ -63,9 +63,9 @@ describe("Waku Store, cursor", function () { const messagesAfterCursor: DecodedMessage[] = []; for await (const page of waku.store.queryGenerator([TestDecoder], { - cursor + paginationCursor: cursor })) { - for await (const msg of page.reverse()) { + for await (const msg of page) { if (msg) { messagesAfterCursor.push(msg as DecodedMessage); } @@ -102,7 +102,7 @@ describe("Waku Store, cursor", function () { // messages in reversed order (first message at last index) const messages: DecodedMessage[] = []; for await (const page of waku.store.queryGenerator([TestDecoder])) { - for await (const msg of page.reverse()) { + for await (const msg of page) { messages.push(msg as DecodedMessage); } } @@ -113,9 +113,9 @@ describe("Waku Store, cursor", function () { // query node2 with the cursor from node1 const messagesAfterCursor: DecodedMessage[] = []; for await (const page of waku2.store.queryGenerator([TestDecoder], { - cursor + paginationCursor: cursor })) { - for await (const msg of page.reverse()) { + for await (const msg of page) { if (msg) { messagesAfterCursor.push(msg as DecodedMessage); } @@ -132,7 +132,7 @@ describe("Waku Store, cursor", function () { ).to.be.eq(bytesToUtf8(messages[messages.length - 1].payload)); }); - it("Passing cursor with wrong message digest", async function () { + it("Passing invalid cursor", async function () { await sendMessages( nwaku, totalMsgs, @@ -142,39 +142,35 @@ describe("Waku Store, cursor", function () { const messages: DecodedMessage[] = []; for await (const page of waku.store.queryGenerator([TestDecoder])) { - for await (const msg of page.reverse()) { + for await (const msg of page) { messages.push(msg as DecodedMessage); } } - const cursor = waku.store.createCursor(messages[5]); - // setting a wrong digest - cursor.digest = new Uint8Array([]); + // setting an invalid cursor + const cursor = new Uint8Array([2, 3]); const messagesAfterCursor: DecodedMessage[] = []; try { for await (const page of waku.store.queryGenerator([TestDecoder], { - cursor + paginationCursor: cursor })) { - for await (const msg of page.reverse()) { + for await (const msg of page) { if (msg) { messagesAfterCursor.push(msg as DecodedMessage); } } } - // Should return same as go-waku. Raised bug: https://github.com/waku-org/nwaku/issues/2117 expect(messagesAfterCursor.length).to.eql(0); - } catch (error) { + } catch (err) { if ( - nwaku.type === "go-waku" && - typeof error === "string" && - error.includes("History response contains an Error: INVALID_CURSOR") + !(err instanceof Error) || + !err.message.includes( + `Store query failed with status code: 300, description: BAD_RESPONSE: archive error: DIRVER_ERROR: cursor not found` + ) ) { - return; + throw err; } - throw error instanceof Error - ? new Error(`Unexpected error: ${error.message}`) - : error; } }); @@ -188,7 +184,7 @@ describe("Waku Store, cursor", function () { const messages: DecodedMessage[] = []; for await (const page of waku.store.queryGenerator([TestDecoder])) { - for await (const msg of page.reverse()) { + for await (const msg of page) { messages.push(msg as DecodedMessage); } } @@ -197,7 +193,7 @@ describe("Waku Store, cursor", function () { try { for await (const page of waku.store.queryGenerator([TestDecoder], { - cursor + paginationCursor: cursor })) { void page; } @@ -206,7 +202,7 @@ describe("Waku Store, cursor", function () { if ( !(err instanceof Error) || !err.message.includes( - `Cursor pubsub topic (${TestDecoder2.pubsubTopic}) does not match decoder pubsub topic (${TestDecoder.pubsubTopic})` + `Store query failed with status code: 300, description: BAD_RESPONSE: archive error: DIRVER_ERROR: cursor not found` ) ) { throw err; diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts index 05a981e8bd..e34864d604 100644 --- a/packages/tests/tests/store/index.node.spec.ts +++ b/packages/tests/tests/store/index.node.spec.ts @@ -215,7 +215,7 @@ describe("Waku Store, general", function () { } return messages.length >= desiredMsgs; }, - { pageSize: 7 } + { paginationLimit: 7 } ); expect(messages?.length).eq(desiredMsgs); @@ -334,7 +334,7 @@ describe("Waku Store, general", function () { messages.push(msg); return messages.length >= desiredMsgs; }, - { pageSize: 7 } + { paginationLimit: 7 } ); expect(messages?.length).eq(desiredMsgs); diff --git a/packages/tests/tests/store/order.node.spec.ts b/packages/tests/tests/store/order.node.spec.ts index e99785a2bd..3de1f30005 100644 --- a/packages/tests/tests/store/order.node.spec.ts +++ b/packages/tests/tests/store/order.node.spec.ts @@ -1,4 +1,4 @@ -import { DecodedMessage, PageDirection } from "@waku/core"; +import { DecodedMessage } from "@waku/core"; import type { IMessage, LightNode } from "@waku/interfaces"; import { expect } from "chai"; @@ -10,7 +10,6 @@ import { } from "../../src/index.js"; import { - chunkAndReverseArray, runStoreNodes, sendMessages, TestDecoder, @@ -31,7 +30,7 @@ describe("Waku Store, order", function () { await tearDownNodes(nwaku, waku); }); - [PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => { + [true, false].forEach((pageDirection) => { it(`Query Generator - ${pageDirection}`, async function () { await sendMessages( nwaku, @@ -42,7 +41,7 @@ describe("Waku Store, order", function () { const messages: IMessage[] = []; for await (const query of waku.store.queryGenerator([TestDecoder], { - pageDirection: pageDirection + paginationForward: pageDirection })) { for await (const msg of query) { if (msg) { @@ -51,10 +50,7 @@ describe("Waku Store, order", function () { } } - let expectedPayloads = Array.from(Array(totalMsgs).keys()); - if (pageDirection === PageDirection.BACKWARD) { - expectedPayloads = chunkAndReverseArray(expectedPayloads, 10); - } + const expectedPayloads = Array.from(Array(totalMsgs).keys()); expect(messages?.length).eq(totalMsgs); const payloads = messages.map((msg) => msg.payload[0]!); @@ -62,7 +58,7 @@ describe("Waku Store, order", function () { }); }); - [PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => { + [true, false].forEach((pageDirection) => { it(`Promise Callback - ${pageDirection}`, async function () { await sendMessages( nwaku, @@ -81,14 +77,11 @@ describe("Waku Store, order", function () { } }, { - pageDirection: pageDirection + paginationForward: pageDirection } ); - let expectedPayloads = Array.from(Array(totalMsgs).keys()); - if (pageDirection === PageDirection.BACKWARD) { - expectedPayloads = chunkAndReverseArray(expectedPayloads, 10); - } + const expectedPayloads = Array.from(Array(totalMsgs).keys()); expect(messages?.length).eq(totalMsgs); const payloads = messages.map((msg) => msg.payload[0]!); @@ -96,7 +89,7 @@ describe("Waku Store, order", function () { }); }); - [PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => { + [true, false].forEach((pageDirection) => { it(`Ordered Callback - ${pageDirection}`, async function () { await sendMessages( nwaku, @@ -112,13 +105,10 @@ describe("Waku Store, order", function () { messages.push(msg); }, { - pageDirection: pageDirection + paginationForward: pageDirection } ); - if (pageDirection === PageDirection.BACKWARD) { - messages.reverse(); - } expect(messages?.length).eq(totalMsgs); const payloads = messages.map((msg) => msg.payload[0]!); expect(payloads).to.deep.eq(Array.from(Array(totalMsgs).keys())); diff --git a/packages/tests/tests/store/page_size.node.spec.ts b/packages/tests/tests/store/page_size.node.spec.ts index fab0185d35..019b58bd51 100644 --- a/packages/tests/tests/store/page_size.node.spec.ts +++ b/packages/tests/tests/store/page_size.node.spec.ts @@ -50,16 +50,12 @@ describe("Waku Store, page size", function () { if (pageSize === 0) { effectivePageSize = 20; } else if (pageSize > 100) { - if (nwaku.type == "go-waku") { - effectivePageSize = 20; - } else { - effectivePageSize = 100; - } + effectivePageSize = 100; } let messagesRetrieved = 0; for await (const query of waku.store.queryGenerator([TestDecoder], { - pageSize: pageSize + paginationLimit: pageSize })) { // Calculate expected page size const expectedPageSize = Math.min( @@ -90,7 +86,7 @@ describe("Waku Store, page size", function () { let messagesRetrieved = 0; for await (const query of waku.store.queryGenerator([TestDecoder])) { - expect(query.length).eq(10); + expect(query.length).eq(20); for await (const msg of query) { if (msg) { messagesRetrieved++; diff --git a/packages/tests/tests/store/sorting.node.spec.ts b/packages/tests/tests/store/sorting.node.spec.ts index e8cc45d59e..63bd8e4591 100644 --- a/packages/tests/tests/store/sorting.node.spec.ts +++ b/packages/tests/tests/store/sorting.node.spec.ts @@ -1,4 +1,4 @@ -import { DecodedMessage, PageDirection } from "@waku/core"; +import { DecodedMessage } from "@waku/core"; import type { IMessage, LightNode } from "@waku/interfaces"; import { @@ -29,7 +29,7 @@ describe("Waku Store, sorting", function () { await tearDownNodes(nwaku, waku); }); - [PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => { + [true, false].forEach((pageDirection) => { it(`Query Generator sorting by timestamp while page direction is ${pageDirection}`, async function () { await sendMessages( nwaku, @@ -38,8 +38,10 @@ describe("Waku Store, sorting", function () { TestDecoder.pubsubTopic ); + const pages: IMessage[][] = []; + for await (const query of waku.store.queryGenerator([TestDecoder], { - pageDirection: PageDirection.FORWARD + paginationForward: pageDirection })) { const page: IMessage[] = []; for await (const msg of query) { @@ -47,23 +49,48 @@ describe("Waku Store, sorting", function () { page.push(msg as DecodedMessage); } } - // Extract timestamps + pages.push(page); + + // Check order within the current page const timestamps = page.map( (msg) => msg.timestamp as unknown as bigint ); - // Check if timestamps are sorted for (let i = 1; i < timestamps.length; i++) { if (timestamps[i] < timestamps[i - 1]) { throw new Error( - `Messages are not sorted by timestamp. Found out of order at index ${i}` + `Messages within page ${pages.length - 1} are not in sequential order. Found out of order at index ${i}` ); } } } + + // Check order between pages + for (let i = 1; i < pages.length; i++) { + const prevPageLastTimestamp = pages[i - 1][pages[i - 1].length - 1] + .timestamp as unknown as bigint; + const currentPageFirstTimestamp = pages[i][0] + .timestamp as unknown as bigint; + + if ( + pageDirection === true && + prevPageLastTimestamp < currentPageFirstTimestamp + ) { + throw new Error( + `Pages are not in reversed order for FORWARD direction. Issue found between page ${i - 1} and ${i}` + ); + } else if ( + pageDirection === false && + prevPageLastTimestamp > currentPageFirstTimestamp + ) { + throw new Error( + `Pages are not in reversed order for BACKWARD direction. Issue found between page ${i - 1} and ${i}` + ); + } + } }); }); - [PageDirection.FORWARD, PageDirection.BACKWARD].forEach((pageDirection) => { + [true, false].forEach((pageDirection) => { it(`Ordered Callback sorting by timestamp while page direction is ${pageDirection}`, async function () { await sendMessages( nwaku, @@ -73,34 +100,56 @@ describe("Waku Store, sorting", function () { ); const messages: IMessage[] = []; + const pageSize = 5; + // receive 4 pages, 5 messages each (20/4) await waku.store.queryWithOrderedCallback( [TestDecoder], async (msg) => { messages.push(msg); }, - { - pageDirection: pageDirection + { paginationLimit: pageSize, paginationForward: pageDirection } + ); + + // Split messages into pages + const pages: IMessage[][] = []; + for (let i = 0; i < messages.length; i += pageSize) { + pages.push(messages.slice(i, i + pageSize)); + } + + // Check order within each page + pages.forEach((page, pageIndex) => { + const pageTimestamps = page.map( + (msg) => msg.timestamp as unknown as bigint + ); + for (let i = 1; i < pageTimestamps.length; i++) { + if (pageTimestamps[i] < pageTimestamps[i - 1]) { + throw new Error( + `Messages within page ${pageIndex} are not in sequential order. Found out of order at index ${i}` + ); + } } - ); - // Extract timestamps - const timestamps = messages.map( - (msg) => msg.timestamp as unknown as bigint - ); - // Check if timestamps are sorted - for (let i = 1; i < timestamps.length; i++) { + }); + + // Check order between pages + for (let i = 1; i < pages.length; i++) { + const prevPageLastTimestamp = pages[i - 1][pages[i - 1].length - 1] + .timestamp as unknown as bigint; + const currentPageFirstTimestamp = pages[i][0] + .timestamp as unknown as bigint; + if ( - pageDirection === PageDirection.FORWARD && - timestamps[i] < timestamps[i - 1] + pageDirection === true && + prevPageLastTimestamp > currentPageFirstTimestamp ) { throw new Error( - `Messages are not sorted by timestamp in FORWARD direction. Found out of order at index ${i}` + `Pages are not in reversed order for FORWARD direction. Issue found between page ${i - 1} and ${i}` ); } else if ( - pageDirection === PageDirection.BACKWARD && - timestamps[i] > timestamps[i - 1] + pageDirection === false && + prevPageLastTimestamp < currentPageFirstTimestamp ) { throw new Error( - `Messages are not sorted by timestamp in BACKWARD direction. Found out of order at index ${i}` + `Pages are not in reversed order for BACKWARD direction. Issue found between page ${i - 1} and ${i}` ); } } diff --git a/packages/tests/tests/store/time_filter.node.spec.ts b/packages/tests/tests/store/time_filter.node.spec.ts index 17687407b0..8addf95f9b 100644 --- a/packages/tests/tests/store/time_filter.node.spec.ts +++ b/packages/tests/tests/store/time_filter.node.spec.ts @@ -60,10 +60,8 @@ describe("Waku Store, time filter", function () { } }, { - timeFilter: { - startTime: adjustDate(msgTimestamp, startTime), - endTime: adjustDate(msgTimestamp, endTime) - } + timeStart: adjustDate(msgTimestamp, startTime), + timeEnd: adjustDate(msgTimestamp, endTime) } ); @@ -103,10 +101,8 @@ describe("Waku Store, time filter", function () { } }, { - timeFilter: { - startTime: adjustDate(msgTimestamp, -1000), - endTime: adjustDate(msgTimestamp, 1000) - } + timeStart: adjustDate(msgTimestamp, -1000), + timeEnd: adjustDate(msgTimestamp, 1000) } );