diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index 44e4f5087b..95ea807448 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -4,7 +4,13 @@ import type { PeerId } from "@libp2p/interface-peer-id"; import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; import { sha256 } from "@noble/hashes/sha256"; import { concat, utf8ToBytes } from "@waku/byte-utils"; -import { DecodedMessage, Decoder, Index, Store } from "@waku/interfaces"; +import { + Cursor, + DecodedMessage, + Decoder, + Index, + Store, +} from "@waku/interfaces"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -84,8 +90,10 @@ export interface QueryOptions { timeFilter?: TimeFilter; /** * Cursor as an index to start a query from. + * The cursor index will be exclusive (i.e. the message at the cursor index will not be included in the result). + * If undefined, the query will start from the beginning or end of the history, depending on the page direction. */ - cursor?: Index; + cursor?: Cursor; } /** @@ -289,7 +297,7 @@ async function* paginate( protocol: string, queryOpts: Params, decoders: Map>, - cursor?: Index + cursor?: Cursor ): AsyncGenerator[]> { if ( queryOpts.contentTopics.toString() !== @@ -300,8 +308,10 @@ async function* paginate( ); } + let currentCursor = cursor; + while (true) { - queryOpts = Object.assign(queryOpts, { cursor }); + queryOpts = Object.assign(queryOpts, { currentCursor }); const stream = await connection.newStream(protocol); const historyRpcQuery = HistoryRPC.createQuery(queryOpts); @@ -362,8 +372,8 @@ async function* paginate( return Promise.resolve(undefined); }); - cursor = response.pagingInfo?.cursor; - if (typeof cursor === "undefined") { + const nextCursor = response.pagingInfo?.cursor; + if (typeof nextCursor === "undefined") { // If the server does not return cursor then there is an issue, // Need to abort, or we end up in an infinite loop log( @@ -372,6 +382,8 @@ async function* paginate( break; } + currentCursor = nextCursor; + const responsePageSize = response.pagingInfo?.pageSize; const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; if ( diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 39aeeacf44..021aeaa114 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -60,6 +60,12 @@ export interface TimeFilter { endTime: Date; } +export type Cursor = { + digest?: Uint8Array; + senderTime?: bigint; + pubsubTopic?: string; +}; + export type StoreQueryOptions = { /** * The direction in which pages are retrieved: @@ -83,7 +89,7 @@ export type StoreQueryOptions = { /** * Cursor as an index to start a query from. */ - cursor?: Index; + cursor?: Cursor; } & ProtocolOptions; export interface Store extends PointToPointProtocol {