diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index cff194b6f2..64aa99d1b4 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -77,9 +77,9 @@ jobs: node-version: ${{ env.NODE_JS }} - uses: bahmutov/npm-install@v1 - run: npm run build - - run: npm run test:node > debug.log + - run: npm run test:node env: - DEBUG: "waku:nwaku*,waku:test*" + DEBUG: "" - name: Upload debug logs on failure uses: actions/upload-artifact@v3 @@ -131,7 +131,7 @@ jobs: - uses: bahmutov/npm-install@v1 - run: npm run build - - run: npm run test:node > debug.log + - run: npm run test:node env: DEBUG: "waku:nwaku*,waku:test*" @@ -185,7 +185,7 @@ jobs: ./build/wakunode2 --help - run: npm run build - - run: npm run test:node > debug.log + - run: npm run test:node env: DEBUG: "waku:nwaku*,waku:test*" diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index 44e4f5087b..b4a5626fa9 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -4,7 +4,13 @@ import type { PeerId } from "@libp2p/interface-peer-id"; import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; import { sha256 } from "@noble/hashes/sha256"; import { concat, utf8ToBytes } from "@waku/byte-utils"; -import { DecodedMessage, Decoder, Index, Store } from "@waku/interfaces"; +import { + Cursor, + DecodedMessage, + Decoder, + Index, + Store, +} from "@waku/interfaces"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -84,8 +90,10 @@ export interface QueryOptions { timeFilter?: TimeFilter; /** * Cursor as an index to start a query from. + * The cursor index will be exclusive (i.e. the message at the cursor index will not be included in the result). + * If undefined, the query will start from the beginning or end of the history, depending on the page direction. */ - cursor?: Index; + cursor?: Cursor; } /** @@ -289,7 +297,7 @@ async function* paginate( protocol: string, queryOpts: Params, decoders: Map>, - cursor?: Index + cursor?: Cursor ): AsyncGenerator[]> { if ( queryOpts.contentTopics.toString() !== @@ -300,8 +308,9 @@ async function* paginate( ); } + let currentCursor = cursor; while (true) { - queryOpts = Object.assign(queryOpts, { cursor }); + queryOpts.cursor = currentCursor; const stream = await connection.newStream(protocol); const historyRpcQuery = HistoryRPC.createQuery(queryOpts); @@ -362,8 +371,8 @@ async function* paginate( return Promise.resolve(undefined); }); - cursor = response.pagingInfo?.cursor; - if (typeof cursor === "undefined") { + const nextCursor = response.pagingInfo?.cursor; + if (typeof nextCursor === "undefined") { // If the server does not return cursor then there is an issue, // Need to abort, or we end up in an infinite loop log( @@ -372,6 +381,8 @@ async function* paginate( break; } + currentCursor = nextCursor; + const responsePageSize = response.pagingInfo?.pageSize; const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; if ( diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 39aeeacf44..021aeaa114 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -60,6 +60,12 @@ export interface TimeFilter { endTime: Date; } +export type Cursor = { + digest?: Uint8Array; + senderTime?: bigint; + pubsubTopic?: string; +}; + export type StoreQueryOptions = { /** * The direction in which pages are retrieved: @@ -83,7 +89,7 @@ export type StoreQueryOptions = { /** * Cursor as an index to start a query from. */ - cursor?: Index; + cursor?: Cursor; } & ProtocolOptions; export interface Store extends PointToPointProtocol {