From 0169a0ccb163189dcb2ebe7f4bdc3f3a9b6a75ab Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 15 Nov 2022 05:17:24 +0530 Subject: [PATCH] functionality works! test wip --- package-lock.json | 12 + packages/core/package.json | 3 +- packages/core/src/index.ts | 7 +- packages/core/src/lib/waku_store/index.ts | 30 ++- packages/interfaces/src/index.ts | 10 + packages/message-encryption/src/index.ts | 2 + packages/tests/tests/store.node.spec.ts | 258 ++++++++++++++-------- packages/tests/tsconfig.dev.json | 2 +- 8 files changed, 221 insertions(+), 103 deletions(-) diff --git a/package-lock.json b/package-lock.json index 354fe1f34a..cffaed34f1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8628,6 +8628,11 @@ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", "dev": true }, + "node_modules/fast-sha256": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/fast-sha256/-/fast-sha256-1.3.0.tgz", + "integrity": "sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ==" + }, "node_modules/fastq": { "version": "1.13.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", @@ -21910,6 +21915,7 @@ "@waku/byte-utils": "*", "@waku/interfaces": "*", "debug": "^4.3.4", + "fast-sha256": "^1.3.0", "it-all": "^1.0.6", "it-length-prefixed": "^8.0.2", "it-pipe": "^2.0.4", @@ -26287,6 +26293,7 @@ "eslint-plugin-import": "^2.25.3", "eslint-plugin-prettier": "^4.0.0", "fast-check": "^2.14.0", + "fast-sha256": "*", "gh-pages": "^3.2.3", "ignore-loader": "^0.1.2", "isomorphic-fetch": "^3.0.0", @@ -29381,6 +29388,11 @@ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", "dev": true }, + "fast-sha256": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/fast-sha256/-/fast-sha256-1.3.0.tgz", + "integrity": "sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ==" + }, "fastq": { "version": "1.13.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", diff --git a/packages/core/package.json b/packages/core/package.json index 84b24025bf..20a4857216 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -86,7 +86,6 @@ "node": ">=16" }, "dependencies": { - "@waku/byte-utils": "*", "@chainsafe/libp2p-gossipsub": "^4.1.1", "@libp2p/interface-connection": "^3.0.3", "@libp2p/interface-peer-discovery": "^1.0.0", @@ -97,8 +96,10 @@ "@libp2p/interfaces": "^3.0.2", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", + "@waku/byte-utils": "*", "@waku/interfaces": "*", "debug": "^4.3.4", + "fast-sha256": "^1.3.0", "it-all": "^1.0.6", "it-length-prefixed": "^8.0.2", "it-pipe": "^2.0.4", diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index e90b55e3f4..273158f974 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -20,4 +20,9 @@ export * as waku_relay from "./lib/waku_relay"; export { WakuRelay } from "./lib/waku_relay"; export * as waku_store from "./lib/waku_store"; -export { PageDirection, WakuStore, StoreCodec } from "./lib/waku_store"; +export { + PageDirection, + WakuStore, + StoreCodec, + createCursor, +} from "./lib/waku_store"; diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index 6bef492059..9b6990f521 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -1,8 +1,10 @@ import type { Connection } from "@libp2p/interface-connection"; import type { PeerId } from "@libp2p/interface-peer-id"; import { Peer } from "@libp2p/interface-peer-store"; +import { utf8ToBytes } from "@waku/byte-utils"; import { DecodedMessage, Decoder } from "@waku/interfaces"; import debug from "debug"; +import sha256 from "fast-sha256"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; @@ -75,6 +77,10 @@ export interface QueryOptions { * Retrieve messages with a timestamp within the provided values. */ timeFilter?: TimeFilter; + /** + * Cursor as an index to start a query from. + */ + cursor?: proto.Index; } /** @@ -251,7 +257,8 @@ export class WakuStore { connection, protocol, queryOpts, - decodersAsMap + decodersAsMap, + options?.cursor )) { yield messages; } @@ -270,7 +277,8 @@ async function* paginate( connection: Connection, protocol: string, queryOpts: Params, - decoders: Map> + decoders: Map>, + cursor?: proto.Index ): AsyncGenerator[]> { if ( queryOpts.contentTopics.toString() !== @@ -281,7 +289,6 @@ async function* paginate( ); } - let cursor = undefined; while (true) { queryOpts = Object.assign(queryOpts, { cursor }); @@ -370,3 +377,20 @@ async function* paginate( export function isDefined(msg: T | undefined): msg is T { return !!msg; } + +export async function createCursor( + message: string, + messageTimestamp: bigint, + contentTopic: string, + pubsubTopic: string = DefaultPubSubTopic +): Promise { + const contentTopicBytes = utf8ToBytes(contentTopic); + const messageBytes = utf8ToBytes(message); + const digest = sha256(Buffer.concat([contentTopicBytes, messageBytes])); + + return { + digest, + pubsubTopic, + senderTime: messageTimestamp, + }; +} diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 5551483137..0728bf795d 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -16,6 +16,12 @@ export interface PointToPointProtocol { libp2p: Libp2p; peers: () => Promise; } +export interface Index { + digest?: Uint8Array; + receivedTime?: bigint; + senderTime?: bigint; + pubsubTopic?: string; +} export type ProtocolOptions = { pubSubTopic?: string; @@ -73,6 +79,10 @@ export type StoreQueryOptions = { * Retrieve messages with a timestamp within the provided values. */ timeFilter?: TimeFilter; + /** + * Cursor as an index to start a query from. + */ + cursor?: Index; } & ProtocolOptions; export interface Store extends PointToPointProtocol { diff --git a/packages/message-encryption/src/index.ts b/packages/message-encryption/src/index.ts index ec84603f72..7964e0d9e3 100644 --- a/packages/message-encryption/src/index.ts +++ b/packages/message-encryption/src/index.ts @@ -62,6 +62,8 @@ export class MessageV1 extends MessageV0 implements DecodedMessage { } } +export { sha256 } from "./crypto"; + export class AsymEncoder implements Encoder { constructor( public contentTopic: string, diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index da6deeece3..e01b686a9a 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -1,5 +1,5 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils"; -import { PageDirection } from "@waku/core"; +import { createCursor, PageDirection } from "@waku/core"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0"; import { createFullNode } from "@waku/create"; @@ -40,8 +40,78 @@ describe("Waku Store", () => { !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); }); - it("Generator", async function () { - this.timeout(15_000); + // it("Generator", async function () { + // this.timeout(1000_000); + // const totalMsgs = 20; + + // for (let i = 0; i < totalMsgs; i++) { + // expect( + // await nwaku.sendMessage( + // Nwaku.toMessageRpcQuery({ + // payload: utf8ToBytes(`Message ${i}`), + // contentTopic: TestContentTopic, + // }) + // ) + // ).to.be.true; + // } + + // waku = await createFullNode({ + // staticNoiseKey: NOISE_KEY_1, + // }); + // await waku.start(); + // await waku.dial(await nwaku.getMultiaddrWithId()); + // await waitForRemotePeer(waku, [Protocols.Store]); + + // const messages: Message[] = []; + // let promises: Promise[] = []; + // for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + // const _promises = msgPromises.map(async (promise) => { + // const msg = await promise; + // if (msg) { + // messages.push(msg); + // } + // }); + + // promises = promises.concat(_promises); + // } + // await Promise.all(promises); + + // expect(messages?.length).eq(totalMsgs); + // const result = messages?.findIndex((msg) => { + // return bytesToUtf8(msg.payload!) === "Message 0"; + // }); + // expect(result).to.not.eq(-1); + // }); + + // it("Generator, no message returned", async function () { + // this.timeout(15_000); + + // waku = await createFullNode({ + // staticNoiseKey: NOISE_KEY_1, + // }); + // await waku.start(); + // await waku.dial(await nwaku.getMultiaddrWithId()); + // await waitForRemotePeer(waku, [Protocols.Store]); + + // const messages: Message[] = []; + // let promises: Promise[] = []; + // for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + // const _promises = msgPromises.map(async (promise) => { + // const msg = await promise; + // if (msg) { + // messages.push(msg); + // } + // }); + + // promises = promises.concat(_promises); + // } + // await Promise.all(promises); + + // expect(messages?.length).eq(0); + // }); + + it("Passing a cursor", async function () { + this.timeout(4_000); const totalMsgs = 20; for (let i = 0; i < totalMsgs; i++) { @@ -63,51 +133,45 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const messages: Message[] = []; - let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - const _promises = msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }); - promises = promises.concat(_promises); + const query = waku.store.queryGenerator([TestDecoder]); + + for await (const page of query) { + for await (const msg of page) { + messages.push(msg as Message); + } } - await Promise.all(promises); - expect(messages?.length).eq(totalMsgs); - const result = messages?.findIndex((msg) => { - return bytesToUtf8(msg.payload!) === "Message 0"; - }); - expect(result).to.not.eq(-1); - }); + const cursorIndex = 2; + const cursorMessage = messages[cursorIndex]; - it("Generator, no message returned", async function () { - this.timeout(15_000); + const cursor = await createCursor( + bytesToUtf8(cursorMessage.payload!), + BigInt(cursorMessage.timestamp!.getTime()) * BigInt(1000000), + TestContentTopic + ); - waku = await createFullNode({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); + const val = await waku.store + .queryGenerator([TestDecoder], { cursor }) + .next(); + //realIndexOfTest = (cursor-pageSize+test+len)%len + // the last message received on this page + const testMessage = await val.value[10 - 1]; - const messages: Message[] = []; - let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - const _promises = msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }); + // for (const msg of val.value) { + // const _msg = await msg; + // console.log({ + // msg: bytesToUtf8(_msg.payload!), + // }); + // } + // console.log({ + // cursorMessage: bytesToUtf8(cursorMessage.payload!), + // testMessage: bytesToUtf8(testMessage.payload!), + // }); - promises = promises.concat(_promises); - } - await Promise.all(promises); + expect(messages?.length).be.eq(totalMsgs); - expect(messages?.length).eq(0); + expect(testMessage).to.be.eq(messages[cursorIndex + 1]); }); it("Callback on promise", async function () { @@ -496,68 +560,68 @@ describe("Waku Store", () => { }); }); -describe("Waku Store, custom pubsub topic", () => { - const customPubSubTopic = "/waku/2/custom-dapp/proto"; - let waku: WakuFull; - let nwaku: Nwaku; +// describe("Waku Store, custom pubsub topic", () => { +// const customPubSubTopic = "/waku/2/custom-dapp/proto"; +// let waku: WakuFull; +// let nwaku: Nwaku; - beforeEach(async function () { - this.timeout(15_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ - persistMessages: true, - store: true, - topics: customPubSubTopic, - }); - }); +// beforeEach(async function () { +// this.timeout(15_000); +// nwaku = new Nwaku(makeLogFileName(this)); +// await nwaku.start({ +// persistMessages: true, +// store: true, +// topics: customPubSubTopic, +// }); +// }); - afterEach(async function () { - !!nwaku && nwaku.stop(); - !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); - }); +// afterEach(async function () { +// !!nwaku && nwaku.stop(); +// !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); +// }); - it("Generator, custom pubsub topic", async function () { - this.timeout(15_000); +// it("Generator, custom pubsub topic", async function () { +// this.timeout(15_000); - const totalMsgs = 20; - for (let i = 0; i < totalMsgs; i++) { - expect( - await nwaku.sendMessage( - Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), - contentTopic: TestContentTopic, - }), - customPubSubTopic - ) - ).to.be.true; - } +// const totalMsgs = 20; +// for (let i = 0; i < totalMsgs; i++) { +// expect( +// await nwaku.sendMessage( +// Nwaku.toMessageRpcQuery({ +// payload: utf8ToBytes(`Message ${i}`), +// contentTopic: TestContentTopic, +// }), +// customPubSubTopic +// ) +// ).to.be.true; +// } - waku = await createFullNode({ - staticNoiseKey: NOISE_KEY_1, - pubSubTopic: customPubSubTopic, - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); +// waku = await createFullNode({ +// staticNoiseKey: NOISE_KEY_1, +// pubSubTopic: customPubSubTopic, +// }); +// await waku.start(); +// await waku.dial(await nwaku.getMultiaddrWithId()); +// await waitForRemotePeer(waku, [Protocols.Store]); - const messages: Message[] = []; - let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - const _promises = msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }); +// const messages: Message[] = []; +// let promises: Promise[] = []; +// for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { +// const _promises = msgPromises.map(async (promise) => { +// const msg = await promise; +// if (msg) { +// messages.push(msg); +// } +// }); - promises = promises.concat(_promises); - } - await Promise.all(promises); +// promises = promises.concat(_promises); +// } +// await Promise.all(promises); - expect(messages?.length).eq(totalMsgs); - const result = messages?.findIndex((msg) => { - return bytesToUtf8(msg.payload!) === "Message 0"; - }); - expect(result).to.not.eq(-1); - }); -}); +// expect(messages?.length).eq(totalMsgs); +// const result = messages?.findIndex((msg) => { +// return bytesToUtf8(msg.payload!) === "Message 0"; +// }); +// expect(result).to.not.eq(-1); +// }); +// }); diff --git a/packages/tests/tsconfig.dev.json b/packages/tests/tsconfig.dev.json index d64199f6ad..0cec24c5cc 100644 --- a/packages/tests/tsconfig.dev.json +++ b/packages/tests/tsconfig.dev.json @@ -1,7 +1,7 @@ { "extends": "./tsconfig", "compilerOptions": { - "module": "esnext", + "module": "es2020", "noEmit": true }, "include": ["src", "tests"]