mirror of
https://github.com/waku-org/js-waku.git
synced 2025-02-17 06:47:29 +00:00
Merge pull request #1031 from waku-org/danisharora/improve-cursor-api
feat: improve the cursor API
This commit is contained in:
commit
50c6ea9ea9
8
.github/workflows/ci.yml
vendored
8
.github/workflows/ci.yml
vendored
@ -77,9 +77,9 @@ jobs:
|
|||||||
node-version: ${{ env.NODE_JS }}
|
node-version: ${{ env.NODE_JS }}
|
||||||
- uses: bahmutov/npm-install@v1
|
- uses: bahmutov/npm-install@v1
|
||||||
- run: npm run build
|
- run: npm run build
|
||||||
- run: npm run test:node > debug.log
|
- run: npm run test:node
|
||||||
env:
|
env:
|
||||||
DEBUG: "waku:nwaku*,waku:test*"
|
DEBUG: ""
|
||||||
|
|
||||||
- name: Upload debug logs on failure
|
- name: Upload debug logs on failure
|
||||||
uses: actions/upload-artifact@v3
|
uses: actions/upload-artifact@v3
|
||||||
@ -131,7 +131,7 @@ jobs:
|
|||||||
|
|
||||||
- uses: bahmutov/npm-install@v1
|
- uses: bahmutov/npm-install@v1
|
||||||
- run: npm run build
|
- run: npm run build
|
||||||
- run: npm run test:node > debug.log
|
- run: npm run test:node
|
||||||
env:
|
env:
|
||||||
DEBUG: "waku:nwaku*,waku:test*"
|
DEBUG: "waku:nwaku*,waku:test*"
|
||||||
|
|
||||||
@ -185,7 +185,7 @@ jobs:
|
|||||||
./build/wakunode2 --help
|
./build/wakunode2 --help
|
||||||
|
|
||||||
- run: npm run build
|
- run: npm run build
|
||||||
- run: npm run test:node > debug.log
|
- run: npm run test:node
|
||||||
env:
|
env:
|
||||||
DEBUG: "waku:nwaku*,waku:test*"
|
DEBUG: "waku:nwaku*,waku:test*"
|
||||||
|
|
||||||
|
@ -4,7 +4,13 @@ import type { PeerId } from "@libp2p/interface-peer-id";
|
|||||||
import type { Peer, PeerStore } from "@libp2p/interface-peer-store";
|
import type { Peer, PeerStore } from "@libp2p/interface-peer-store";
|
||||||
import { sha256 } from "@noble/hashes/sha256";
|
import { sha256 } from "@noble/hashes/sha256";
|
||||||
import { concat, utf8ToBytes } from "@waku/byte-utils";
|
import { concat, utf8ToBytes } from "@waku/byte-utils";
|
||||||
import { DecodedMessage, Decoder, Index, Store } from "@waku/interfaces";
|
import {
|
||||||
|
Cursor,
|
||||||
|
DecodedMessage,
|
||||||
|
Decoder,
|
||||||
|
Index,
|
||||||
|
Store,
|
||||||
|
} from "@waku/interfaces";
|
||||||
import debug from "debug";
|
import debug from "debug";
|
||||||
import all from "it-all";
|
import all from "it-all";
|
||||||
import * as lp from "it-length-prefixed";
|
import * as lp from "it-length-prefixed";
|
||||||
@ -84,8 +90,10 @@ export interface QueryOptions {
|
|||||||
timeFilter?: TimeFilter;
|
timeFilter?: TimeFilter;
|
||||||
/**
|
/**
|
||||||
* Cursor as an index to start a query from.
|
* Cursor as an index to start a query from.
|
||||||
|
* The cursor index will be exclusive (i.e. the message at the cursor index will not be included in the result).
|
||||||
|
* If undefined, the query will start from the beginning or end of the history, depending on the page direction.
|
||||||
*/
|
*/
|
||||||
cursor?: Index;
|
cursor?: Cursor;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -289,7 +297,7 @@ async function* paginate<T extends DecodedMessage>(
|
|||||||
protocol: string,
|
protocol: string,
|
||||||
queryOpts: Params,
|
queryOpts: Params,
|
||||||
decoders: Map<string, Decoder<T>>,
|
decoders: Map<string, Decoder<T>>,
|
||||||
cursor?: Index
|
cursor?: Cursor
|
||||||
): AsyncGenerator<Promise<T | undefined>[]> {
|
): AsyncGenerator<Promise<T | undefined>[]> {
|
||||||
if (
|
if (
|
||||||
queryOpts.contentTopics.toString() !==
|
queryOpts.contentTopics.toString() !==
|
||||||
@ -300,8 +308,9 @@ async function* paginate<T extends DecodedMessage>(
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let currentCursor = cursor;
|
||||||
while (true) {
|
while (true) {
|
||||||
queryOpts = Object.assign(queryOpts, { cursor });
|
queryOpts.cursor = currentCursor;
|
||||||
|
|
||||||
const stream = await connection.newStream(protocol);
|
const stream = await connection.newStream(protocol);
|
||||||
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
|
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
|
||||||
@ -362,8 +371,8 @@ async function* paginate<T extends DecodedMessage>(
|
|||||||
return Promise.resolve(undefined);
|
return Promise.resolve(undefined);
|
||||||
});
|
});
|
||||||
|
|
||||||
cursor = response.pagingInfo?.cursor;
|
const nextCursor = response.pagingInfo?.cursor;
|
||||||
if (typeof cursor === "undefined") {
|
if (typeof nextCursor === "undefined") {
|
||||||
// If the server does not return cursor then there is an issue,
|
// If the server does not return cursor then there is an issue,
|
||||||
// Need to abort, or we end up in an infinite loop
|
// Need to abort, or we end up in an infinite loop
|
||||||
log(
|
log(
|
||||||
@ -372,6 +381,8 @@ async function* paginate<T extends DecodedMessage>(
|
|||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
currentCursor = nextCursor;
|
||||||
|
|
||||||
const responsePageSize = response.pagingInfo?.pageSize;
|
const responsePageSize = response.pagingInfo?.pageSize;
|
||||||
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
|
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
|
||||||
if (
|
if (
|
||||||
|
@ -60,6 +60,12 @@ export interface TimeFilter {
|
|||||||
endTime: Date;
|
endTime: Date;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
export type Cursor = {
|
||||||
|
digest?: Uint8Array;
|
||||||
|
senderTime?: bigint;
|
||||||
|
pubsubTopic?: string;
|
||||||
|
};
|
||||||
|
|
||||||
export type StoreQueryOptions = {
|
export type StoreQueryOptions = {
|
||||||
/**
|
/**
|
||||||
* The direction in which pages are retrieved:
|
* The direction in which pages are retrieved:
|
||||||
@ -83,7 +89,7 @@ export type StoreQueryOptions = {
|
|||||||
/**
|
/**
|
||||||
* Cursor as an index to start a query from.
|
* Cursor as an index to start a query from.
|
||||||
*/
|
*/
|
||||||
cursor?: Index;
|
cursor?: Cursor;
|
||||||
} & ProtocolOptions;
|
} & ProtocolOptions;
|
||||||
|
|
||||||
export interface Store extends PointToPointProtocol {
|
export interface Store extends PointToPointProtocol {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user