diff --git a/package-lock.json b/package-lock.json index 63237e1889..aba730c2b7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3144,6 +3144,17 @@ } ] }, + "node_modules/@noble/hashes": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.1.3.tgz", + "integrity": "sha512-CE0FCR57H2acVI5UOzIGSSIYxZ6v/HOhDR0Ro9VLyhnzLwx0o8W1mmgaqlEUx4049qJDlIBRztv5k+MM8vbO3A==", + "funding": [ + { + "type": "individual", + "url": "https://paulmillr.com/funding/" + } + ] + }, "node_modules/@noble/secp256k1": { "version": "1.7.0", "resolved": "https://registry.npmjs.org/@noble/secp256k1/-/secp256k1-1.7.0.tgz", @@ -21840,6 +21851,7 @@ "@libp2p/interfaces": "^3.0.4", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", + "@noble/hashes": "^1.1.3", "@waku/byte-utils": "*", "@waku/interfaces": "*", "debug": "^4.3.4", @@ -24667,6 +24679,11 @@ "resolved": "https://registry.npmjs.org/@noble/ed25519/-/ed25519-1.7.1.tgz", "integrity": "sha512-Rk4SkJFaXZiznFyC/t77Q0NKS4FL7TLJJsVG2V2oiEq3kJVeTdxysEe/yRWSpnWMe808XRDJ+VFh5pt/FN5plw==" }, + "@noble/hashes": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.1.3.tgz", + "integrity": "sha512-CE0FCR57H2acVI5UOzIGSSIYxZ6v/HOhDR0Ro9VLyhnzLwx0o8W1mmgaqlEUx4049qJDlIBRztv5k+MM8vbO3A==" + }, "@noble/secp256k1": { "version": "1.7.0", "resolved": "https://registry.npmjs.org/@noble/secp256k1/-/secp256k1-1.7.0.tgz", @@ -26071,6 +26088,7 @@ "@libp2p/interfaces": "^3.0.4", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", + "@noble/hashes": "^1.1.3", "@rollup/plugin-commonjs": "^22.0.0", "@rollup/plugin-json": "^4.1.0", "@rollup/plugin-node-resolve": "^13.3.0", diff --git a/packages/core/package.json b/packages/core/package.json index 37e0294353..6d6b9a5ddd 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -93,6 +93,7 @@ "@libp2p/interfaces": "^3.0.4", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", + "@noble/hashes": "^1.1.3", "@waku/byte-utils": "*", "@waku/interfaces": "*", "debug": "^4.3.4", diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index ad1166ba91..12a1ea8c07 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -21,4 +21,9 @@ export * as waku_relay from "./lib/waku_relay"; export { wakuRelay } from "./lib/waku_relay"; export * as waku_store from "./lib/waku_store"; -export { PageDirection, wakuStore, StoreCodec } from "./lib/waku_store"; +export { + PageDirection, + wakuStore, + StoreCodec, + createCursor, +} from "./lib/waku_store"; diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index c93005bced..44e4f5087b 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -2,7 +2,9 @@ import type { Connection } from "@libp2p/interface-connection"; import type { ConnectionManager } from "@libp2p/interface-connection-manager"; import type { PeerId } from "@libp2p/interface-peer-id"; import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; -import { DecodedMessage, Decoder, Store } from "@waku/interfaces"; +import { sha256 } from "@noble/hashes/sha256"; +import { concat, utf8ToBytes } from "@waku/byte-utils"; +import { DecodedMessage, Decoder, Index, Store } from "@waku/interfaces"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -80,6 +82,10 @@ export interface QueryOptions { * Retrieve messages with a timestamp within the provided values. */ timeFilter?: TimeFilter; + /** + * Cursor as an index to start a query from. + */ + cursor?: Index; } /** @@ -258,7 +264,8 @@ class WakuStore implements Store { connection, protocol, queryOpts, - decodersAsMap + decodersAsMap, + options?.cursor )) { yield messages; } @@ -281,7 +288,8 @@ async function* paginate( connection: Connection, protocol: string, queryOpts: Params, - decoders: Map> + decoders: Map>, + cursor?: Index ): AsyncGenerator[]> { if ( queryOpts.contentTopics.toString() !== @@ -292,7 +300,6 @@ async function* paginate( ); } - let cursor = undefined; while (true) { queryOpts = Object.assign(queryOpts, { cursor }); @@ -382,6 +389,33 @@ export function isDefined(msg: T | undefined): msg is T { return !!msg; } +export async function createCursor( + message: DecodedMessage, + pubsubTopic: string = DefaultPubSubTopic +): Promise { + if ( + !message || + !message.timestamp || + !message.payload || + !message.contentTopic + ) { + throw new Error("Message is missing required fields"); + } + + const contentTopicBytes = utf8ToBytes(message.contentTopic); + + const digest = sha256(concat([contentTopicBytes, message.payload])); + + const messageTime = BigInt(message.timestamp.getTime()) * BigInt(1000000); + + return { + digest, + pubsubTopic, + senderTime: messageTime, + receivedTime: messageTime, + }; +} + export function wakuStore( init: Partial = {} ): (components: StoreComponents) => Store { diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index f32176ff00..39aeeacf44 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -17,6 +17,12 @@ export interface PointToPointProtocol { peerStore: PeerStore; peers: () => Promise; } +export interface Index { + digest?: Uint8Array; + receivedTime?: bigint; + senderTime?: bigint; + pubsubTopic?: string; +} export type ProtocolOptions = { pubSubTopic?: string; @@ -74,6 +80,10 @@ export type StoreQueryOptions = { * Retrieve messages with a timestamp within the provided values. */ timeFilter?: TimeFilter; + /** + * Cursor as an index to start a query from. + */ + cursor?: Index; } & ProtocolOptions; export interface Store extends PointToPointProtocol { diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 59cac15265..5b4b653bb2 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -1,5 +1,5 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils"; -import { PageDirection } from "@waku/core"; +import { createCursor, PageDirection } from "@waku/core"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0"; import { createFullNode } from "@waku/create"; @@ -111,6 +111,62 @@ describe("Waku Store", () => { expect(messages?.length).eq(0); }); + it("Passing a cursor", async function () { + this.timeout(4_000); + const totalMsgs = 20; + + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic, + }) + ) + ).to.be.true; + } + + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const query = waku.store.queryGenerator([TestDecoder]); + + // messages in reversed order (first message at last index) + const messages: DecodedMessage[] = []; + for await (const page of query) { + for await (const msg of page.reverse()) { + messages.push(msg as DecodedMessage); + } + } + + // index 2 would mean the third last message sent + const cursorIndex = 2; + + // create cursor to extract messages after the 3rd index + const cursor = await createCursor(messages[cursorIndex]); + + const messagesAfterCursor: DecodedMessage[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder], { + cursor, + })) { + for await (const msg of page.reverse()) { + messagesAfterCursor.push(msg as DecodedMessage); + } + } + + const testMessage = messagesAfterCursor[0]; + + expect(messages.length).be.eq(totalMsgs); + + expect(bytesToUtf8(testMessage.payload!)).to.be.eq( + bytesToUtf8(messages[cursorIndex + 1].payload!) + ); + }); + it("Callback on promise", async function () { this.timeout(15_000);