Merge pull request #1024 from waku-org/danisharora/cursor-support-store

feat!: support for cursors on store API
This commit is contained in:
Danish Arora 2022-11-17 13:07:56 +05:30 committed by GitHub
commit a3da9f4f9b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 130 additions and 6 deletions

18
package-lock.json generated
View File

@ -3144,6 +3144,17 @@
} }
] ]
}, },
"node_modules/@noble/hashes": {
"version": "1.1.3",
"resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.1.3.tgz",
"integrity": "sha512-CE0FCR57H2acVI5UOzIGSSIYxZ6v/HOhDR0Ro9VLyhnzLwx0o8W1mmgaqlEUx4049qJDlIBRztv5k+MM8vbO3A==",
"funding": [
{
"type": "individual",
"url": "https://paulmillr.com/funding/"
}
]
},
"node_modules/@noble/secp256k1": { "node_modules/@noble/secp256k1": {
"version": "1.7.0", "version": "1.7.0",
"resolved": "https://registry.npmjs.org/@noble/secp256k1/-/secp256k1-1.7.0.tgz", "resolved": "https://registry.npmjs.org/@noble/secp256k1/-/secp256k1-1.7.0.tgz",
@ -21840,6 +21851,7 @@
"@libp2p/interfaces": "^3.0.4", "@libp2p/interfaces": "^3.0.4",
"@libp2p/peer-id": "^1.1.10", "@libp2p/peer-id": "^1.1.10",
"@multiformats/multiaddr": "^11.0.6", "@multiformats/multiaddr": "^11.0.6",
"@noble/hashes": "^1.1.3",
"@waku/byte-utils": "*", "@waku/byte-utils": "*",
"@waku/interfaces": "*", "@waku/interfaces": "*",
"debug": "^4.3.4", "debug": "^4.3.4",
@ -24667,6 +24679,11 @@
"resolved": "https://registry.npmjs.org/@noble/ed25519/-/ed25519-1.7.1.tgz", "resolved": "https://registry.npmjs.org/@noble/ed25519/-/ed25519-1.7.1.tgz",
"integrity": "sha512-Rk4SkJFaXZiznFyC/t77Q0NKS4FL7TLJJsVG2V2oiEq3kJVeTdxysEe/yRWSpnWMe808XRDJ+VFh5pt/FN5plw==" "integrity": "sha512-Rk4SkJFaXZiznFyC/t77Q0NKS4FL7TLJJsVG2V2oiEq3kJVeTdxysEe/yRWSpnWMe808XRDJ+VFh5pt/FN5plw=="
}, },
"@noble/hashes": {
"version": "1.1.3",
"resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.1.3.tgz",
"integrity": "sha512-CE0FCR57H2acVI5UOzIGSSIYxZ6v/HOhDR0Ro9VLyhnzLwx0o8W1mmgaqlEUx4049qJDlIBRztv5k+MM8vbO3A=="
},
"@noble/secp256k1": { "@noble/secp256k1": {
"version": "1.7.0", "version": "1.7.0",
"resolved": "https://registry.npmjs.org/@noble/secp256k1/-/secp256k1-1.7.0.tgz", "resolved": "https://registry.npmjs.org/@noble/secp256k1/-/secp256k1-1.7.0.tgz",
@ -26071,6 +26088,7 @@
"@libp2p/interfaces": "^3.0.4", "@libp2p/interfaces": "^3.0.4",
"@libp2p/peer-id": "^1.1.10", "@libp2p/peer-id": "^1.1.10",
"@multiformats/multiaddr": "^11.0.6", "@multiformats/multiaddr": "^11.0.6",
"@noble/hashes": "^1.1.3",
"@rollup/plugin-commonjs": "^22.0.0", "@rollup/plugin-commonjs": "^22.0.0",
"@rollup/plugin-json": "^4.1.0", "@rollup/plugin-json": "^4.1.0",
"@rollup/plugin-node-resolve": "^13.3.0", "@rollup/plugin-node-resolve": "^13.3.0",

View File

@ -93,6 +93,7 @@
"@libp2p/interfaces": "^3.0.4", "@libp2p/interfaces": "^3.0.4",
"@libp2p/peer-id": "^1.1.10", "@libp2p/peer-id": "^1.1.10",
"@multiformats/multiaddr": "^11.0.6", "@multiformats/multiaddr": "^11.0.6",
"@noble/hashes": "^1.1.3",
"@waku/byte-utils": "*", "@waku/byte-utils": "*",
"@waku/interfaces": "*", "@waku/interfaces": "*",
"debug": "^4.3.4", "debug": "^4.3.4",

View File

@ -21,4 +21,9 @@ export * as waku_relay from "./lib/waku_relay";
export { wakuRelay } from "./lib/waku_relay"; export { wakuRelay } from "./lib/waku_relay";
export * as waku_store from "./lib/waku_store"; export * as waku_store from "./lib/waku_store";
export { PageDirection, wakuStore, StoreCodec } from "./lib/waku_store"; export {
PageDirection,
wakuStore,
StoreCodec,
createCursor,
} from "./lib/waku_store";

View File

@ -2,7 +2,9 @@ import type { Connection } from "@libp2p/interface-connection";
import type { ConnectionManager } from "@libp2p/interface-connection-manager"; import type { ConnectionManager } from "@libp2p/interface-connection-manager";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; import type { Peer, PeerStore } from "@libp2p/interface-peer-store";
import { DecodedMessage, Decoder, Store } from "@waku/interfaces"; import { sha256 } from "@noble/hashes/sha256";
import { concat, utf8ToBytes } from "@waku/byte-utils";
import { DecodedMessage, Decoder, Index, Store } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
@ -80,6 +82,10 @@ export interface QueryOptions {
* Retrieve messages with a timestamp within the provided values. * Retrieve messages with a timestamp within the provided values.
*/ */
timeFilter?: TimeFilter; timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
*/
cursor?: Index;
} }
/** /**
@ -258,7 +264,8 @@ class WakuStore implements Store {
connection, connection,
protocol, protocol,
queryOpts, queryOpts,
decodersAsMap decodersAsMap,
options?.cursor
)) { )) {
yield messages; yield messages;
} }
@ -281,7 +288,8 @@ async function* paginate<T extends DecodedMessage>(
connection: Connection, connection: Connection,
protocol: string, protocol: string,
queryOpts: Params, queryOpts: Params,
decoders: Map<string, Decoder<T>> decoders: Map<string, Decoder<T>>,
cursor?: Index
): AsyncGenerator<Promise<T | undefined>[]> { ): AsyncGenerator<Promise<T | undefined>[]> {
if ( if (
queryOpts.contentTopics.toString() !== queryOpts.contentTopics.toString() !==
@ -292,7 +300,6 @@ async function* paginate<T extends DecodedMessage>(
); );
} }
let cursor = undefined;
while (true) { while (true) {
queryOpts = Object.assign(queryOpts, { cursor }); queryOpts = Object.assign(queryOpts, { cursor });
@ -382,6 +389,33 @@ export function isDefined<T>(msg: T | undefined): msg is T {
return !!msg; return !!msg;
} }
export async function createCursor(
message: DecodedMessage,
pubsubTopic: string = DefaultPubSubTopic
): Promise<Index> {
if (
!message ||
!message.timestamp ||
!message.payload ||
!message.contentTopic
) {
throw new Error("Message is missing required fields");
}
const contentTopicBytes = utf8ToBytes(message.contentTopic);
const digest = sha256(concat([contentTopicBytes, message.payload]));
const messageTime = BigInt(message.timestamp.getTime()) * BigInt(1000000);
return {
digest,
pubsubTopic,
senderTime: messageTime,
receivedTime: messageTime,
};
}
export function wakuStore( export function wakuStore(
init: Partial<CreateOptions> = {} init: Partial<CreateOptions> = {}
): (components: StoreComponents) => Store { ): (components: StoreComponents) => Store {

View File

@ -17,6 +17,12 @@ export interface PointToPointProtocol {
peerStore: PeerStore; peerStore: PeerStore;
peers: () => Promise<Peer[]>; peers: () => Promise<Peer[]>;
} }
export interface Index {
digest?: Uint8Array;
receivedTime?: bigint;
senderTime?: bigint;
pubsubTopic?: string;
}
export type ProtocolOptions = { export type ProtocolOptions = {
pubSubTopic?: string; pubSubTopic?: string;
@ -74,6 +80,10 @@ export type StoreQueryOptions = {
* Retrieve messages with a timestamp within the provided values. * Retrieve messages with a timestamp within the provided values.
*/ */
timeFilter?: TimeFilter; timeFilter?: TimeFilter;
/**
* Cursor as an index to start a query from.
*/
cursor?: Index;
} & ProtocolOptions; } & ProtocolOptions;
export interface Store extends PointToPointProtocol { export interface Store extends PointToPointProtocol {

View File

@ -1,5 +1,5 @@
import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils";
import { PageDirection } from "@waku/core"; import { createCursor, PageDirection } from "@waku/core";
import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer";
import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0"; import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0";
import { createFullNode } from "@waku/create"; import { createFullNode } from "@waku/create";
@ -111,6 +111,62 @@ describe("Waku Store", () => {
expect(messages?.length).eq(0); expect(messages?.length).eq(0);
}); });
it("Passing a cursor", async function () {
this.timeout(4_000);
const totalMsgs = 20;
for (let i = 0; i < totalMsgs; i++) {
expect(
await nwaku.sendMessage(
Nwaku.toMessageRpcQuery({
payload: utf8ToBytes(`Message ${i}`),
contentTopic: TestContentTopic,
})
)
).to.be.true;
}
waku = await createFullNode({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const query = waku.store.queryGenerator([TestDecoder]);
// messages in reversed order (first message at last index)
const messages: DecodedMessage[] = [];
for await (const page of query) {
for await (const msg of page.reverse()) {
messages.push(msg as DecodedMessage);
}
}
// index 2 would mean the third last message sent
const cursorIndex = 2;
// create cursor to extract messages after the 3rd index
const cursor = await createCursor(messages[cursorIndex]);
const messagesAfterCursor: DecodedMessage[] = [];
for await (const page of waku.store.queryGenerator([TestDecoder], {
cursor,
})) {
for await (const msg of page.reverse()) {
messagesAfterCursor.push(msg as DecodedMessage);
}
}
const testMessage = messagesAfterCursor[0];
expect(messages.length).be.eq(totalMsgs);
expect(bytesToUtf8(testMessage.payload!)).to.be.eq(
bytesToUtf8(messages[cursorIndex + 1].payload!)
);
});
it("Callback on promise", async function () { it("Callback on promise", async function () {
this.timeout(15_000); this.timeout(15_000);