address comments

This commit is contained in:
danisharora099 2022-11-21 13:20:21 +05:30
parent 9f0493221e
commit 36a01c3c30
No known key found for this signature in database
GPG Key ID: FBD2BF500037F135
2 changed files with 25 additions and 7 deletions

View File

@ -4,7 +4,13 @@ import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; import type { Peer, PeerStore } from "@libp2p/interface-peer-store";
import { sha256 } from "@noble/hashes/sha256"; import { sha256 } from "@noble/hashes/sha256";
import { concat, utf8ToBytes } from "@waku/byte-utils"; import { concat, utf8ToBytes } from "@waku/byte-utils";
import { DecodedMessage, Decoder, Index, Store } from "@waku/interfaces"; import {
Cursor,
DecodedMessage,
Decoder,
Index,
Store,
} from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
@ -84,8 +90,10 @@ export interface QueryOptions {
timeFilter?: TimeFilter; timeFilter?: TimeFilter;
/** /**
* Cursor as an index to start a query from. * Cursor as an index to start a query from.
* The cursor index will be exclusive (i.e. the message at the cursor index will not be included in the result).
* If undefined, the query will start from the beginning or end of the history, depending on the page direction.
*/ */
cursor?: Index; cursor?: Cursor;
} }
/** /**
@ -289,7 +297,7 @@ async function* paginate<T extends DecodedMessage>(
protocol: string, protocol: string,
queryOpts: Params, queryOpts: Params,
decoders: Map<string, Decoder<T>>, decoders: Map<string, Decoder<T>>,
cursor?: Index cursor?: Cursor
): AsyncGenerator<Promise<T | undefined>[]> { ): AsyncGenerator<Promise<T | undefined>[]> {
if ( if (
queryOpts.contentTopics.toString() !== queryOpts.contentTopics.toString() !==
@ -300,8 +308,10 @@ async function* paginate<T extends DecodedMessage>(
); );
} }
let currentCursor = cursor;
while (true) { while (true) {
queryOpts = Object.assign(queryOpts, { cursor }); queryOpts = Object.assign(queryOpts, { currentCursor });
const stream = await connection.newStream(protocol); const stream = await connection.newStream(protocol);
const historyRpcQuery = HistoryRPC.createQuery(queryOpts); const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
@ -362,8 +372,8 @@ async function* paginate<T extends DecodedMessage>(
return Promise.resolve(undefined); return Promise.resolve(undefined);
}); });
cursor = response.pagingInfo?.cursor; const nextCursor = response.pagingInfo?.cursor;
if (typeof cursor === "undefined") { if (typeof nextCursor === "undefined") {
// If the server does not return cursor then there is an issue, // If the server does not return cursor then there is an issue,
// Need to abort, or we end up in an infinite loop // Need to abort, or we end up in an infinite loop
log( log(
@ -372,6 +382,8 @@ async function* paginate<T extends DecodedMessage>(
break; break;
} }
currentCursor = nextCursor;
const responsePageSize = response.pagingInfo?.pageSize; const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if ( if (

View File

@ -60,6 +60,12 @@ export interface TimeFilter {
endTime: Date; endTime: Date;
} }
export type Cursor = {
digest?: Uint8Array;
senderTime?: bigint;
pubsubTopic?: string;
};
export type StoreQueryOptions = { export type StoreQueryOptions = {
/** /**
* The direction in which pages are retrieved: * The direction in which pages are retrieved:
@ -83,7 +89,7 @@ export type StoreQueryOptions = {
/** /**
* Cursor as an index to start a query from. * Cursor as an index to start a query from.
*/ */
cursor?: Index; cursor?: Cursor;
} & ProtocolOptions; } & ProtocolOptions;
export interface Store extends PointToPointProtocol { export interface Store extends PointToPointProtocol {