From 0169a0ccb163189dcb2ebe7f4bdc3f3a9b6a75ab Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 15 Nov 2022 05:17:24 +0530 Subject: [PATCH 01/12] functionality works! test wip --- package-lock.json | 12 + packages/core/package.json | 3 +- packages/core/src/index.ts | 7 +- packages/core/src/lib/waku_store/index.ts | 30 ++- packages/interfaces/src/index.ts | 10 + packages/message-encryption/src/index.ts | 2 + packages/tests/tests/store.node.spec.ts | 258 ++++++++++++++-------- packages/tests/tsconfig.dev.json | 2 +- 8 files changed, 221 insertions(+), 103 deletions(-) diff --git a/package-lock.json b/package-lock.json index 354fe1f34a..cffaed34f1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -8628,6 +8628,11 @@ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", "dev": true }, + "node_modules/fast-sha256": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/fast-sha256/-/fast-sha256-1.3.0.tgz", + "integrity": "sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ==" + }, "node_modules/fastq": { "version": "1.13.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", @@ -21910,6 +21915,7 @@ "@waku/byte-utils": "*", "@waku/interfaces": "*", "debug": "^4.3.4", + "fast-sha256": "^1.3.0", "it-all": "^1.0.6", "it-length-prefixed": "^8.0.2", "it-pipe": "^2.0.4", @@ -26287,6 +26293,7 @@ "eslint-plugin-import": "^2.25.3", "eslint-plugin-prettier": "^4.0.0", "fast-check": "^2.14.0", + "fast-sha256": "*", "gh-pages": "^3.2.3", "ignore-loader": "^0.1.2", "isomorphic-fetch": "^3.0.0", @@ -29381,6 +29388,11 @@ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", "dev": true }, + "fast-sha256": { + "version": "1.3.0", + "resolved": "https://registry.npmjs.org/fast-sha256/-/fast-sha256-1.3.0.tgz", + "integrity": "sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ==" + }, "fastq": { "version": "1.13.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", diff --git a/packages/core/package.json b/packages/core/package.json index 84b24025bf..20a4857216 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -86,7 +86,6 @@ "node": ">=16" }, "dependencies": { - "@waku/byte-utils": "*", "@chainsafe/libp2p-gossipsub": "^4.1.1", "@libp2p/interface-connection": "^3.0.3", "@libp2p/interface-peer-discovery": "^1.0.0", @@ -97,8 +96,10 @@ "@libp2p/interfaces": "^3.0.2", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", + "@waku/byte-utils": "*", "@waku/interfaces": "*", "debug": "^4.3.4", + "fast-sha256": "^1.3.0", "it-all": "^1.0.6", "it-length-prefixed": "^8.0.2", "it-pipe": "^2.0.4", diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index e90b55e3f4..273158f974 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -20,4 +20,9 @@ export * as waku_relay from "./lib/waku_relay"; export { WakuRelay } from "./lib/waku_relay"; export * as waku_store from "./lib/waku_store"; -export { PageDirection, WakuStore, StoreCodec } from "./lib/waku_store"; +export { + PageDirection, + WakuStore, + StoreCodec, + createCursor, +} from "./lib/waku_store"; diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index 6bef492059..9b6990f521 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -1,8 +1,10 @@ import type { Connection } from "@libp2p/interface-connection"; import type { PeerId } from "@libp2p/interface-peer-id"; import { Peer } from "@libp2p/interface-peer-store"; +import { utf8ToBytes } from "@waku/byte-utils"; import { DecodedMessage, Decoder } from "@waku/interfaces"; import debug from "debug"; +import sha256 from "fast-sha256"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; @@ -75,6 +77,10 @@ export interface QueryOptions { * Retrieve messages with a timestamp within the provided values. */ timeFilter?: TimeFilter; + /** + * Cursor as an index to start a query from. + */ + cursor?: proto.Index; } /** @@ -251,7 +257,8 @@ export class WakuStore { connection, protocol, queryOpts, - decodersAsMap + decodersAsMap, + options?.cursor )) { yield messages; } @@ -270,7 +277,8 @@ async function* paginate( connection: Connection, protocol: string, queryOpts: Params, - decoders: Map> + decoders: Map>, + cursor?: proto.Index ): AsyncGenerator[]> { if ( queryOpts.contentTopics.toString() !== @@ -281,7 +289,6 @@ async function* paginate( ); } - let cursor = undefined; while (true) { queryOpts = Object.assign(queryOpts, { cursor }); @@ -370,3 +377,20 @@ async function* paginate( export function isDefined(msg: T | undefined): msg is T { return !!msg; } + +export async function createCursor( + message: string, + messageTimestamp: bigint, + contentTopic: string, + pubsubTopic: string = DefaultPubSubTopic +): Promise { + const contentTopicBytes = utf8ToBytes(contentTopic); + const messageBytes = utf8ToBytes(message); + const digest = sha256(Buffer.concat([contentTopicBytes, messageBytes])); + + return { + digest, + pubsubTopic, + senderTime: messageTimestamp, + }; +} diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 5551483137..0728bf795d 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -16,6 +16,12 @@ export interface PointToPointProtocol { libp2p: Libp2p; peers: () => Promise; } +export interface Index { + digest?: Uint8Array; + receivedTime?: bigint; + senderTime?: bigint; + pubsubTopic?: string; +} export type ProtocolOptions = { pubSubTopic?: string; @@ -73,6 +79,10 @@ export type StoreQueryOptions = { * Retrieve messages with a timestamp within the provided values. */ timeFilter?: TimeFilter; + /** + * Cursor as an index to start a query from. + */ + cursor?: Index; } & ProtocolOptions; export interface Store extends PointToPointProtocol { diff --git a/packages/message-encryption/src/index.ts b/packages/message-encryption/src/index.ts index ec84603f72..7964e0d9e3 100644 --- a/packages/message-encryption/src/index.ts +++ b/packages/message-encryption/src/index.ts @@ -62,6 +62,8 @@ export class MessageV1 extends MessageV0 implements DecodedMessage { } } +export { sha256 } from "./crypto"; + export class AsymEncoder implements Encoder { constructor( public contentTopic: string, diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index da6deeece3..e01b686a9a 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -1,5 +1,5 @@ import { bytesToUtf8, utf8ToBytes } from "@waku/byte-utils"; -import { PageDirection } from "@waku/core"; +import { createCursor, PageDirection } from "@waku/core"; import { waitForRemotePeer } from "@waku/core/lib/wait_for_remote_peer"; import { DecoderV0, EncoderV0 } from "@waku/core/lib/waku_message/version_0"; import { createFullNode } from "@waku/create"; @@ -40,8 +40,78 @@ describe("Waku Store", () => { !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); }); - it("Generator", async function () { - this.timeout(15_000); + // it("Generator", async function () { + // this.timeout(1000_000); + // const totalMsgs = 20; + + // for (let i = 0; i < totalMsgs; i++) { + // expect( + // await nwaku.sendMessage( + // Nwaku.toMessageRpcQuery({ + // payload: utf8ToBytes(`Message ${i}`), + // contentTopic: TestContentTopic, + // }) + // ) + // ).to.be.true; + // } + + // waku = await createFullNode({ + // staticNoiseKey: NOISE_KEY_1, + // }); + // await waku.start(); + // await waku.dial(await nwaku.getMultiaddrWithId()); + // await waitForRemotePeer(waku, [Protocols.Store]); + + // const messages: Message[] = []; + // let promises: Promise[] = []; + // for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + // const _promises = msgPromises.map(async (promise) => { + // const msg = await promise; + // if (msg) { + // messages.push(msg); + // } + // }); + + // promises = promises.concat(_promises); + // } + // await Promise.all(promises); + + // expect(messages?.length).eq(totalMsgs); + // const result = messages?.findIndex((msg) => { + // return bytesToUtf8(msg.payload!) === "Message 0"; + // }); + // expect(result).to.not.eq(-1); + // }); + + // it("Generator, no message returned", async function () { + // this.timeout(15_000); + + // waku = await createFullNode({ + // staticNoiseKey: NOISE_KEY_1, + // }); + // await waku.start(); + // await waku.dial(await nwaku.getMultiaddrWithId()); + // await waitForRemotePeer(waku, [Protocols.Store]); + + // const messages: Message[] = []; + // let promises: Promise[] = []; + // for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + // const _promises = msgPromises.map(async (promise) => { + // const msg = await promise; + // if (msg) { + // messages.push(msg); + // } + // }); + + // promises = promises.concat(_promises); + // } + // await Promise.all(promises); + + // expect(messages?.length).eq(0); + // }); + + it("Passing a cursor", async function () { + this.timeout(4_000); const totalMsgs = 20; for (let i = 0; i < totalMsgs; i++) { @@ -63,51 +133,45 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const messages: Message[] = []; - let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - const _promises = msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }); - promises = promises.concat(_promises); + const query = waku.store.queryGenerator([TestDecoder]); + + for await (const page of query) { + for await (const msg of page) { + messages.push(msg as Message); + } } - await Promise.all(promises); - expect(messages?.length).eq(totalMsgs); - const result = messages?.findIndex((msg) => { - return bytesToUtf8(msg.payload!) === "Message 0"; - }); - expect(result).to.not.eq(-1); - }); + const cursorIndex = 2; + const cursorMessage = messages[cursorIndex]; - it("Generator, no message returned", async function () { - this.timeout(15_000); + const cursor = await createCursor( + bytesToUtf8(cursorMessage.payload!), + BigInt(cursorMessage.timestamp!.getTime()) * BigInt(1000000), + TestContentTopic + ); - waku = await createFullNode({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); + const val = await waku.store + .queryGenerator([TestDecoder], { cursor }) + .next(); + //realIndexOfTest = (cursor-pageSize+test+len)%len + // the last message received on this page + const testMessage = await val.value[10 - 1]; - const messages: Message[] = []; - let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - const _promises = msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }); + // for (const msg of val.value) { + // const _msg = await msg; + // console.log({ + // msg: bytesToUtf8(_msg.payload!), + // }); + // } + // console.log({ + // cursorMessage: bytesToUtf8(cursorMessage.payload!), + // testMessage: bytesToUtf8(testMessage.payload!), + // }); - promises = promises.concat(_promises); - } - await Promise.all(promises); + expect(messages?.length).be.eq(totalMsgs); - expect(messages?.length).eq(0); + expect(testMessage).to.be.eq(messages[cursorIndex + 1]); }); it("Callback on promise", async function () { @@ -496,68 +560,68 @@ describe("Waku Store", () => { }); }); -describe("Waku Store, custom pubsub topic", () => { - const customPubSubTopic = "/waku/2/custom-dapp/proto"; - let waku: WakuFull; - let nwaku: Nwaku; +// describe("Waku Store, custom pubsub topic", () => { +// const customPubSubTopic = "/waku/2/custom-dapp/proto"; +// let waku: WakuFull; +// let nwaku: Nwaku; - beforeEach(async function () { - this.timeout(15_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ - persistMessages: true, - store: true, - topics: customPubSubTopic, - }); - }); +// beforeEach(async function () { +// this.timeout(15_000); +// nwaku = new Nwaku(makeLogFileName(this)); +// await nwaku.start({ +// persistMessages: true, +// store: true, +// topics: customPubSubTopic, +// }); +// }); - afterEach(async function () { - !!nwaku && nwaku.stop(); - !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); - }); +// afterEach(async function () { +// !!nwaku && nwaku.stop(); +// !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); +// }); - it("Generator, custom pubsub topic", async function () { - this.timeout(15_000); +// it("Generator, custom pubsub topic", async function () { +// this.timeout(15_000); - const totalMsgs = 20; - for (let i = 0; i < totalMsgs; i++) { - expect( - await nwaku.sendMessage( - Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), - contentTopic: TestContentTopic, - }), - customPubSubTopic - ) - ).to.be.true; - } +// const totalMsgs = 20; +// for (let i = 0; i < totalMsgs; i++) { +// expect( +// await nwaku.sendMessage( +// Nwaku.toMessageRpcQuery({ +// payload: utf8ToBytes(`Message ${i}`), +// contentTopic: TestContentTopic, +// }), +// customPubSubTopic +// ) +// ).to.be.true; +// } - waku = await createFullNode({ - staticNoiseKey: NOISE_KEY_1, - pubSubTopic: customPubSubTopic, - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); +// waku = await createFullNode({ +// staticNoiseKey: NOISE_KEY_1, +// pubSubTopic: customPubSubTopic, +// }); +// await waku.start(); +// await waku.dial(await nwaku.getMultiaddrWithId()); +// await waitForRemotePeer(waku, [Protocols.Store]); - const messages: Message[] = []; - let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - const _promises = msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }); +// const messages: Message[] = []; +// let promises: Promise[] = []; +// for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { +// const _promises = msgPromises.map(async (promise) => { +// const msg = await promise; +// if (msg) { +// messages.push(msg); +// } +// }); - promises = promises.concat(_promises); - } - await Promise.all(promises); +// promises = promises.concat(_promises); +// } +// await Promise.all(promises); - expect(messages?.length).eq(totalMsgs); - const result = messages?.findIndex((msg) => { - return bytesToUtf8(msg.payload!) === "Message 0"; - }); - expect(result).to.not.eq(-1); - }); -}); +// expect(messages?.length).eq(totalMsgs); +// const result = messages?.findIndex((msg) => { +// return bytesToUtf8(msg.payload!) === "Message 0"; +// }); +// expect(result).to.not.eq(-1); +// }); +// }); diff --git a/packages/tests/tsconfig.dev.json b/packages/tests/tsconfig.dev.json index d64199f6ad..0cec24c5cc 100644 --- a/packages/tests/tsconfig.dev.json +++ b/packages/tests/tsconfig.dev.json @@ -1,7 +1,7 @@ { "extends": "./tsconfig", "compilerOptions": { - "module": "esnext", + "module": "es2020", "noEmit": true }, "include": ["src", "tests"] From 69b64af5489dfb912837f8bc6d392b6e1d6d4ca9 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 15 Nov 2022 17:30:35 +0530 Subject: [PATCH 02/12] add: tests --- packages/tests/tests/store.node.spec.ts | 271 ++++++++++++------------ 1 file changed, 134 insertions(+), 137 deletions(-) diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index e01b686a9a..9689245c51 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -40,75 +40,75 @@ describe("Waku Store", () => { !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); }); - // it("Generator", async function () { - // this.timeout(1000_000); - // const totalMsgs = 20; + it("Generator", async function () { + this.timeout(1000_000); + const totalMsgs = 20; - // for (let i = 0; i < totalMsgs; i++) { - // expect( - // await nwaku.sendMessage( - // Nwaku.toMessageRpcQuery({ - // payload: utf8ToBytes(`Message ${i}`), - // contentTopic: TestContentTopic, - // }) - // ) - // ).to.be.true; - // } + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic, + }) + ) + ).to.be.true; + } - // waku = await createFullNode({ - // staticNoiseKey: NOISE_KEY_1, - // }); - // await waku.start(); - // await waku.dial(await nwaku.getMultiaddrWithId()); - // await waitForRemotePeer(waku, [Protocols.Store]); + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); - // const messages: Message[] = []; - // let promises: Promise[] = []; - // for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - // const _promises = msgPromises.map(async (promise) => { - // const msg = await promise; - // if (msg) { - // messages.push(msg); - // } - // }); + const messages: Message[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); - // promises = promises.concat(_promises); - // } - // await Promise.all(promises); + promises = promises.concat(_promises); + } + await Promise.all(promises); - // expect(messages?.length).eq(totalMsgs); - // const result = messages?.findIndex((msg) => { - // return bytesToUtf8(msg.payload!) === "Message 0"; - // }); - // expect(result).to.not.eq(-1); - // }); + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return bytesToUtf8(msg.payload!) === "Message 0"; + }); + expect(result).to.not.eq(-1); + }); - // it("Generator, no message returned", async function () { - // this.timeout(15_000); + it("Generator, no message returned", async function () { + this.timeout(15_000); - // waku = await createFullNode({ - // staticNoiseKey: NOISE_KEY_1, - // }); - // await waku.start(); - // await waku.dial(await nwaku.getMultiaddrWithId()); - // await waitForRemotePeer(waku, [Protocols.Store]); + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); - // const messages: Message[] = []; - // let promises: Promise[] = []; - // for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { - // const _promises = msgPromises.map(async (promise) => { - // const msg = await promise; - // if (msg) { - // messages.push(msg); - // } - // }); + const messages: Message[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); - // promises = promises.concat(_promises); - // } - // await Promise.all(promises); + promises = promises.concat(_promises); + } + await Promise.all(promises); - // expect(messages?.length).eq(0); - // }); + expect(messages?.length).eq(0); + }); it("Passing a cursor", async function () { this.timeout(4_000); @@ -132,46 +132,43 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages: Message[] = []; - const query = waku.store.queryGenerator([TestDecoder]); + // messages in reversed order (first message at last index) + const messages: Message[] = []; for await (const page of query) { - for await (const msg of page) { + for await (const msg of page.reverse()) { messages.push(msg as Message); } } + // index 2 would mean the third last message sent const cursorIndex = 2; const cursorMessage = messages[cursorIndex]; + // create cursor to extract messages after the 3rd index const cursor = await createCursor( bytesToUtf8(cursorMessage.payload!), BigInt(cursorMessage.timestamp!.getTime()) * BigInt(1000000), TestContentTopic ); - const val = await waku.store - .queryGenerator([TestDecoder], { cursor }) - .next(); - //realIndexOfTest = (cursor-pageSize+test+len)%len - // the last message received on this page - const testMessage = await val.value[10 - 1]; + const messagesAfterCursor: Message[] = []; + for await (const page of waku.store.queryGenerator([TestDecoder], { + cursor, + })) { + for await (const msg of page.reverse()) { + messagesAfterCursor.push(msg as Message); + } + } - // for (const msg of val.value) { - // const _msg = await msg; - // console.log({ - // msg: bytesToUtf8(_msg.payload!), - // }); - // } - // console.log({ - // cursorMessage: bytesToUtf8(cursorMessage.payload!), - // testMessage: bytesToUtf8(testMessage.payload!), - // }); + const testMessage = messagesAfterCursor[0]; - expect(messages?.length).be.eq(totalMsgs); + expect(messages.length).be.eq(totalMsgs); - expect(testMessage).to.be.eq(messages[cursorIndex + 1]); + expect(bytesToUtf8(testMessage.payload!)).to.be.eq( + bytesToUtf8(messages[cursorIndex + 1].payload!) + ); }); it("Callback on promise", async function () { @@ -560,68 +557,68 @@ describe("Waku Store", () => { }); }); -// describe("Waku Store, custom pubsub topic", () => { -// const customPubSubTopic = "/waku/2/custom-dapp/proto"; -// let waku: WakuFull; -// let nwaku: Nwaku; +describe("Waku Store, custom pubsub topic", () => { + const customPubSubTopic = "/waku/2/custom-dapp/proto"; + let waku: WakuFull; + let nwaku: Nwaku; -// beforeEach(async function () { -// this.timeout(15_000); -// nwaku = new Nwaku(makeLogFileName(this)); -// await nwaku.start({ -// persistMessages: true, -// store: true, -// topics: customPubSubTopic, -// }); -// }); + beforeEach(async function () { + this.timeout(15_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ + persistMessages: true, + store: true, + topics: customPubSubTopic, + }); + }); -// afterEach(async function () { -// !!nwaku && nwaku.stop(); -// !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); -// }); + afterEach(async function () { + !!nwaku && nwaku.stop(); + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + }); -// it("Generator, custom pubsub topic", async function () { -// this.timeout(15_000); + it("Generator, custom pubsub topic", async function () { + this.timeout(15_000); -// const totalMsgs = 20; -// for (let i = 0; i < totalMsgs; i++) { -// expect( -// await nwaku.sendMessage( -// Nwaku.toMessageRpcQuery({ -// payload: utf8ToBytes(`Message ${i}`), -// contentTopic: TestContentTopic, -// }), -// customPubSubTopic -// ) -// ).to.be.true; -// } + const totalMsgs = 20; + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic, + }), + customPubSubTopic + ) + ).to.be.true; + } -// waku = await createFullNode({ -// staticNoiseKey: NOISE_KEY_1, -// pubSubTopic: customPubSubTopic, -// }); -// await waku.start(); -// await waku.dial(await nwaku.getMultiaddrWithId()); -// await waitForRemotePeer(waku, [Protocols.Store]); + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + pubSubTopic: customPubSubTopic, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); -// const messages: Message[] = []; -// let promises: Promise[] = []; -// for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { -// const _promises = msgPromises.map(async (promise) => { -// const msg = await promise; -// if (msg) { -// messages.push(msg); -// } -// }); + const messages: Message[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); -// promises = promises.concat(_promises); -// } -// await Promise.all(promises); + promises = promises.concat(_promises); + } + await Promise.all(promises); -// expect(messages?.length).eq(totalMsgs); -// const result = messages?.findIndex((msg) => { -// return bytesToUtf8(msg.payload!) === "Message 0"; -// }); -// expect(result).to.not.eq(-1); -// }); -// }); + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return bytesToUtf8(msg.payload!) === "Message 0"; + }); + expect(result).to.not.eq(-1); + }); +}); From f3635f1fe8b0c4886fd815560901b98b209ed0c0 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 15 Nov 2022 18:06:33 +0530 Subject: [PATCH 03/12] address comments --- packages/tests/tsconfig.dev.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tests/tsconfig.dev.json b/packages/tests/tsconfig.dev.json index 0cec24c5cc..0e5599eb80 100644 --- a/packages/tests/tsconfig.dev.json +++ b/packages/tests/tsconfig.dev.json @@ -1,7 +1,7 @@ { "extends": "./tsconfig", "compilerOptions": { - "module": "es2020", + "module": "ESNext", "noEmit": true }, "include": ["src", "tests"] From 1a5a57f7563f54f273bb8920cb6f84c84a98a96a Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 15 Nov 2022 18:06:59 +0530 Subject: [PATCH 04/12] fix: git diff --- packages/tests/tsconfig.dev.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tests/tsconfig.dev.json b/packages/tests/tsconfig.dev.json index 0e5599eb80..d64199f6ad 100644 --- a/packages/tests/tsconfig.dev.json +++ b/packages/tests/tsconfig.dev.json @@ -1,7 +1,7 @@ { "extends": "./tsconfig", "compilerOptions": { - "module": "ESNext", + "module": "esnext", "noEmit": true }, "include": ["src", "tests"] From 8f243eb325634fabe46ba697efd2127cd0654f36 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 15 Nov 2022 18:07:29 +0530 Subject: [PATCH 05/12] fix: git diff --- packages/tests/tests/store.node.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 9689245c51..507cef029d 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -41,7 +41,7 @@ describe("Waku Store", () => { }); it("Generator", async function () { - this.timeout(1000_000); + this.timeout(15_000); const totalMsgs = 20; for (let i = 0; i < totalMsgs; i++) { From c012748bb7b1e604bdd05076f1c281ea6cdb3f83 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 15 Nov 2022 18:08:21 +0530 Subject: [PATCH 06/12] address comments --- package-lock.json | 30 ++++++++++++++--------- packages/core/package.json | 2 +- packages/core/src/lib/waku_store/index.ts | 6 ++--- 3 files changed, 22 insertions(+), 16 deletions(-) diff --git a/package-lock.json b/package-lock.json index cffaed34f1..84b02f0ccc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -3225,6 +3225,17 @@ } ] }, + "node_modules/@noble/hashes": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.1.3.tgz", + "integrity": "sha512-CE0FCR57H2acVI5UOzIGSSIYxZ6v/HOhDR0Ro9VLyhnzLwx0o8W1mmgaqlEUx4049qJDlIBRztv5k+MM8vbO3A==", + "funding": [ + { + "type": "individual", + "url": "https://paulmillr.com/funding/" + } + ] + }, "node_modules/@noble/secp256k1": { "version": "1.7.0", "resolved": "https://registry.npmjs.org/@noble/secp256k1/-/secp256k1-1.7.0.tgz", @@ -8628,11 +8639,6 @@ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", "dev": true }, - "node_modules/fast-sha256": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/fast-sha256/-/fast-sha256-1.3.0.tgz", - "integrity": "sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ==" - }, "node_modules/fastq": { "version": "1.13.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", @@ -21912,10 +21918,10 @@ "@libp2p/interfaces": "^3.0.2", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", + "@noble/hashes": "^1.1.3", "@waku/byte-utils": "*", "@waku/interfaces": "*", "debug": "^4.3.4", - "fast-sha256": "^1.3.0", "it-all": "^1.0.6", "it-length-prefixed": "^8.0.2", "it-pipe": "^2.0.4", @@ -24850,6 +24856,11 @@ "resolved": "https://registry.npmjs.org/@noble/ed25519/-/ed25519-1.7.1.tgz", "integrity": "sha512-Rk4SkJFaXZiznFyC/t77Q0NKS4FL7TLJJsVG2V2oiEq3kJVeTdxysEe/yRWSpnWMe808XRDJ+VFh5pt/FN5plw==" }, + "@noble/hashes": { + "version": "1.1.3", + "resolved": "https://registry.npmjs.org/@noble/hashes/-/hashes-1.1.3.tgz", + "integrity": "sha512-CE0FCR57H2acVI5UOzIGSSIYxZ6v/HOhDR0Ro9VLyhnzLwx0o8W1mmgaqlEUx4049qJDlIBRztv5k+MM8vbO3A==" + }, "@noble/secp256k1": { "version": "1.7.0", "resolved": "https://registry.npmjs.org/@noble/secp256k1/-/secp256k1-1.7.0.tgz", @@ -26268,6 +26279,7 @@ "@libp2p/interfaces": "^3.0.2", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", + "@noble/hashes": "*", "@rollup/plugin-commonjs": "^22.0.0", "@rollup/plugin-json": "^4.1.0", "@rollup/plugin-node-resolve": "^13.3.0", @@ -26293,7 +26305,6 @@ "eslint-plugin-import": "^2.25.3", "eslint-plugin-prettier": "^4.0.0", "fast-check": "^2.14.0", - "fast-sha256": "*", "gh-pages": "^3.2.3", "ignore-loader": "^0.1.2", "isomorphic-fetch": "^3.0.0", @@ -29388,11 +29399,6 @@ "integrity": "sha512-DCXu6Ifhqcks7TZKY3Hxp3y6qphY5SJZmrWMDrKcERSOXWQdMhU9Ig/PYrzyw/ul9jOIyh0N4M0tbC5hodg8dw==", "dev": true }, - "fast-sha256": { - "version": "1.3.0", - "resolved": "https://registry.npmjs.org/fast-sha256/-/fast-sha256-1.3.0.tgz", - "integrity": "sha512-n11RGP/lrWEFI/bWdygLxhI+pVeo1ZYIVwvvPkW7azl/rOy+F3HYRZ2K5zeE9mmkhQppyv9sQFx0JM9UabnpPQ==" - }, "fastq": { "version": "1.13.0", "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.13.0.tgz", diff --git a/packages/core/package.json b/packages/core/package.json index 20a4857216..917e0b4e05 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -96,10 +96,10 @@ "@libp2p/interfaces": "^3.0.2", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", + "@noble/hashes": "^1.1.3", "@waku/byte-utils": "*", "@waku/interfaces": "*", "debug": "^4.3.4", - "fast-sha256": "^1.3.0", "it-all": "^1.0.6", "it-length-prefixed": "^8.0.2", "it-pipe": "^2.0.4", diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index 9b6990f521..89c9dc417b 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -1,10 +1,10 @@ import type { Connection } from "@libp2p/interface-connection"; import type { PeerId } from "@libp2p/interface-peer-id"; import { Peer } from "@libp2p/interface-peer-store"; -import { utf8ToBytes } from "@waku/byte-utils"; +import { sha256 } from "@noble/hashes/sha256"; +import { concat, utf8ToBytes } from "@waku/byte-utils"; import { DecodedMessage, Decoder } from "@waku/interfaces"; import debug from "debug"; -import sha256 from "fast-sha256"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; @@ -386,7 +386,7 @@ export async function createCursor( ): Promise { const contentTopicBytes = utf8ToBytes(contentTopic); const messageBytes = utf8ToBytes(message); - const digest = sha256(Buffer.concat([contentTopicBytes, messageBytes])); + const digest = sha256(concat([contentTopicBytes, messageBytes])); return { digest, From 5c4118041e9cd3f036045aee544da0172ae98811 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 16 Nov 2022 18:57:46 +0530 Subject: [PATCH 08/12] fix: cursor --- packages/core/src/lib/waku_store/index.ts | 24 ++++++++++++++++------- packages/tests/tests/store.node.spec.ts | 17 ++++++---------- 2 files changed, 23 insertions(+), 18 deletions(-) diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index e0762d63f2..3f0ddf2636 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -379,18 +379,28 @@ export function isDefined(msg: T | undefined): msg is T { } export async function createCursor( - message: string, - messageTimestamp: bigint, - contentTopic: string, + message: DecodedMessage, pubsubTopic: string = DefaultPubSubTopic ): Promise { - const contentTopicBytes = utf8ToBytes(contentTopic); - const messageBytes = utf8ToBytes(message); - const digest = sha256(concat([contentTopicBytes, messageBytes])); + if ( + !message || + !message.timestamp || + !message.payload || + !message.contentTopic + ) { + throw new Error("Message is missing timestamp or payload"); + } + + const contentTopicBytes = utf8ToBytes(message.contentTopic); + + const digest = sha256(concat([contentTopicBytes, message.payload])); + + const messageTime = BigInt(message.timestamp.getTime()) * BigInt(1000000); return { digest, pubsubTopic, - senderTime: messageTimestamp, + senderTime: messageTime, + receivedTime: messageTime, }; } diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 7ad37fa5e1..2bd38cdc32 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -30,7 +30,7 @@ describe("Waku Store", () => { let nwaku: Nwaku; beforeEach(async function () { - this.timeout(15_000); + this.timeout(35_000); nwaku = new Nwaku(makeLogFileName(this)); await nwaku.start({ store: true, lightpush: true }); }); @@ -135,30 +135,25 @@ describe("Waku Store", () => { const query = waku.store.queryGenerator([TestDecoder]); // messages in reversed order (first message at last index) - const messages: Message[] = []; + const messages: DecodedMessage[] = []; for await (const page of query) { for await (const msg of page.reverse()) { - messages.push(msg as Message); + messages.push(msg as DecodedMessage); } } // index 2 would mean the third last message sent const cursorIndex = 2; - const cursorMessage = messages[cursorIndex]; // create cursor to extract messages after the 3rd index - const cursor = await createCursor( - bytesToUtf8(cursorMessage.payload!), - BigInt(cursorMessage.timestamp!.getTime()) * BigInt(1000000), - TestContentTopic - ); + const cursor = await createCursor(messages[cursorIndex]); - const messagesAfterCursor: Message[] = []; + const messagesAfterCursor: DecodedMessage[] = []; for await (const page of waku.store.queryGenerator([TestDecoder], { cursor, })) { for await (const msg of page.reverse()) { - messagesAfterCursor.push(msg as Message); + messagesAfterCursor.push(msg as DecodedMessage); } } From 0f73c0f331bc29a3a0a084310eec865c3d8b4532 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 16 Nov 2022 19:00:09 +0530 Subject: [PATCH 09/12] address comments --- packages/core/src/lib/waku_store/index.ts | 8 ++++---- packages/message-encryption/src/index.ts | 2 -- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index 3f0ddf2636..2d30632b5d 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -3,7 +3,7 @@ import type { PeerId } from "@libp2p/interface-peer-id"; import { Peer } from "@libp2p/interface-peer-store"; import { sha256 } from "@noble/hashes/sha256"; import { concat, utf8ToBytes } from "@waku/byte-utils"; -import { DecodedMessage, Decoder } from "@waku/interfaces"; +import { DecodedMessage, Decoder, Index } from "@waku/interfaces"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; @@ -80,7 +80,7 @@ export interface QueryOptions { /** * Cursor as an index to start a query from. */ - cursor?: proto.Index; + cursor?: Index; } /** @@ -278,7 +278,7 @@ async function* paginate( protocol: string, queryOpts: Params, decoders: Map>, - cursor?: proto.Index + cursor?: Index ): AsyncGenerator[]> { if ( queryOpts.contentTopics.toString() !== @@ -381,7 +381,7 @@ export function isDefined(msg: T | undefined): msg is T { export async function createCursor( message: DecodedMessage, pubsubTopic: string = DefaultPubSubTopic -): Promise { +): Promise { if ( !message || !message.timestamp || diff --git a/packages/message-encryption/src/index.ts b/packages/message-encryption/src/index.ts index 8bbb373b5d..fc60c60a99 100644 --- a/packages/message-encryption/src/index.ts +++ b/packages/message-encryption/src/index.ts @@ -62,8 +62,6 @@ export class MessageV1 extends MessageV0 implements DecodedMessage { } } -export { sha256 } from "./crypto"; - export class AsymEncoder implements Encoder { constructor( public contentTopic: string, From 0627a803ea73dd2852e890fe01a916cbd6a4c03a Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 16 Nov 2022 19:06:04 +0530 Subject: [PATCH 10/12] verbose error message --- packages/core/src/lib/waku_store/index.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index 2d30632b5d..addac8e37d 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -388,7 +388,7 @@ export async function createCursor( !message.payload || !message.contentTopic ) { - throw new Error("Message is missing timestamp or payload"); + throw new Error("Message is missing required fields"); } const contentTopicBytes = utf8ToBytes(message.contentTopic); From 0e0660f5b19bd8df608bcafb50ee6d6421d8485a Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 16 Nov 2022 19:06:32 +0530 Subject: [PATCH 11/12] revert: timeout --- packages/tests/tests/store.node.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index 2bd38cdc32..e173508de8 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -30,7 +30,7 @@ describe("Waku Store", () => { let nwaku: Nwaku; beforeEach(async function () { - this.timeout(35_000); + this.timeout(15_000); nwaku = new Nwaku(makeLogFileName(this)); await nwaku.start({ store: true, lightpush: true }); }); From 9f0493221e3036f7999333b2388d029b67172890 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 17 Nov 2022 13:03:36 +0530 Subject: [PATCH 12/12] commit package-lock --- package-lock.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/package-lock.json b/package-lock.json index 3463b035af..aba730c2b7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26088,7 +26088,7 @@ "@libp2p/interfaces": "^3.0.4", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", - "@noble/hashes": "*", + "@noble/hashes": "^1.1.3", "@rollup/plugin-commonjs": "^22.0.0", "@rollup/plugin-json": "^4.1.0", "@rollup/plugin-node-resolve": "^13.3.0",