diff --git a/CHANGELOG.md b/CHANGELOG.md index 1414e46d6a..4627a55cba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Correct options type for `createFullNode` & `createPrivacy` to enable passing gossipsub options. +- `WakuStore` now provides several APIs: `queryGenerator`, `queryCallbackOnPromise`, `queryOrderedCallback`; + each provides different guarantees and performance. ## [0.27.0] - 2022-09-13 diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 86a33f1efd..07db951213 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -25,70 +25,23 @@ const log = debug("waku:test:store"); const TestContentTopic = "/test/1/waku-store/utf8"; -const isWakuMessageDefined = ( - msg: WakuMessage | undefined -): msg is WakuMessage => { - return !!msg; -}; - describe("Waku Store", () => { let waku: WakuFull; let nwaku: Nwaku; + beforeEach(async function () { + this.timeout(15_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ persistMessages: true, store: true, lightpush: true }); + }); + afterEach(async function () { !!nwaku && nwaku.stop(); !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); }); - it("Retrieves history", async function () { + it("Generator", async function () { this.timeout(15_000); - - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); - - for (let i = 0; i < 2; i++) { - expect( - await nwaku.sendMessage( - Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), - contentTopic: TestContentTopic, - }) - ) - ).to.be.true; - } - - waku = await createFullNode({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const messages: WakuMessage[] = []; - await waku.store.queryHistory([], async (msgPromises) => { - await Promise.all( - msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }) - ); - }); - - expect(messages?.length).eq(2); - const result = messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === "Message 0"; - }); - expect(result).to.not.eq(-1); - }); - - it("Retrieves history using callback", async function () { - this.timeout(15_000); - - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); - const totalMsgs = 20; for (let i = 0; i < totalMsgs; i++) { @@ -110,15 +63,82 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const messages: WakuMessage[] = []; - await waku.store.queryHistory([], async (msgPromises) => { - await Promise.all( - msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }) - ); + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); + + promises = promises.concat(_promises); + } + await Promise.all(promises); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === "Message 0"; + }); + expect(result).to.not.eq(-1); + }); + + it("Generator, no message returned", async function () { + this.timeout(15_000); + + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: WakuMessage[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); + + promises = promises.concat(_promises); + } + await Promise.all(promises); + + expect(messages?.length).eq(0); + }); + + it("Callback on promise", async function () { + this.timeout(15_000); + + const totalMsgs = 15; + + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic, + }) + ) + ).to.be.true; + } + + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: WakuMessage[] = []; + await waku.store.queryCallbackOnPromise([], async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } }); expect(messages?.length).eq(totalMsgs); @@ -128,15 +148,12 @@ describe("Waku Store", () => { expect(result).to.not.eq(-1); }); - it("Retrieval aborts when callback returns true", async function () { + it("Callback on promise, aborts when callback returns true", async function () { this.timeout(15_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); + const totalMsgs = 20; - const availMsgs = 20; - - for (let i = 0; i < availMsgs; i++) { + for (let i = 0; i < totalMsgs; i++) { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ @@ -154,15 +171,15 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - let messages: WakuMessage[] = []; const desiredMsgs = 14; - - await waku.store.queryHistory( + const messages: WakuMessage[] = []; + await waku.store.queryCallbackOnPromise( [], - async (msgPromises) => { - const msgsOrUndefined = await Promise.all(msgPromises); - const msgs = msgsOrUndefined.filter(isWakuMessageDefined); - messages = messages.concat(msgs); + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } return messages.length >= desiredMsgs; }, { pageSize: 7 } @@ -171,13 +188,11 @@ describe("Waku Store", () => { expect(messages?.length).eq(desiredMsgs); }); - it("Retrieves all historical elements in chronological order through paging", async function () { + it("Ordered Callback - Forward", async function () { this.timeout(15_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); - - for (let i = 0; i < 15; i++) { + const totalMsgs = 18; + for (let i = 0; i < totalMsgs; i++) { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ @@ -195,24 +210,19 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - let messages: WakuMessage[] = []; - await waku.store.queryHistory( + const messages: WakuMessage[] = []; + await waku.store.queryOrderedCallback( [], - async (msgPromises) => { - const msgsOrUndefined = await Promise.all(msgPromises); - const msgs = msgsOrUndefined.filter(isWakuMessageDefined); - // Note: messages within a page are ordered from oldest to most recent - // so the `concat` can only preserve order when `PageDirection` - // is forward - messages = messages.concat(msgs); + async (msg) => { + messages.push(msg); }, { pageDirection: PageDirection.FORWARD, } ); - expect(messages?.length).eq(15); - for (let index = 0; index < 2; index++) { + expect(messages?.length).eq(totalMsgs); + for (let index = 0; index < totalMsgs; index++) { expect( messages?.findIndex((msg) => { return msg.payloadAsUtf8 === `Message ${index}`; @@ -221,176 +231,54 @@ describe("Waku Store", () => { } }); - it("Retrieves history using custom pubsub topic", async function () { + it("Ordered Callback - Backward", async function () { this.timeout(15_000); - const customPubSubTopic = "/waku/2/custom-dapp/proto"; - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ - persistMessages: true, - store: true, - topics: customPubSubTopic, - }); - - for (let i = 0; i < 2; i++) { + const totalMsgs = 18; + for (let i = 0; i < totalMsgs; i++) { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ payload: utf8ToBytes(`Message ${i}`), contentTopic: TestContentTopic, - }), - customPubSubTopic + }) ) ).to.be.true; } waku = await createFullNode({ - pubSubTopic: customPubSubTopic, staticNoiseKey: NOISE_KEY_1, }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const nimPeerId = await nwaku.getPeerId(); - - const messages: WakuMessage[] = []; - await waku.store.queryHistory( - [], - async (msgPromises) => { - await Promise.all( - msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }) - ); - }, - { - peerId: nimPeerId, - } - ); - - expect(messages?.length).eq(2); - const result = messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === "Message 0"; - }); - expect(result).to.not.eq(-1); - }); - - it("Retrieves history with asymmetric & symmetric encrypted messages", async function () { - this.timeout(15_000); - - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true, lightpush: true }); - - const encryptedAsymmetricMessageText = "asymmetric encryption"; - const encryptedSymmetricMessageText = "symmetric encryption"; - const clearMessageText = - "This is a clear text message for everyone to read"; - const otherEncMessageText = - "This message is not for and I must not be able to read it"; - - const privateKey = generatePrivateKey(); - const symKey = generateSymmetricKey(); - const publicKey = getPublicKey(privateKey); - - const timestamp = new Date(); - const [ - encryptedAsymmetricMessage, - encryptedSymmetricMessage, - clearMessage, - otherEncMessage, - ] = await Promise.all([ - WakuMessage.fromUtf8String( - encryptedAsymmetricMessageText, - TestContentTopic, - { - encPublicKey: publicKey, - timestamp, - } - ), - WakuMessage.fromUtf8String( - encryptedSymmetricMessageText, - TestContentTopic, - { - symKey: symKey, - timestamp: new Date(timestamp.valueOf() + 1), - } - ), - WakuMessage.fromUtf8String(clearMessageText, TestContentTopic, { - timestamp: new Date(timestamp.valueOf() + 2), - }), - WakuMessage.fromUtf8String(otherEncMessageText, TestContentTopic, { - encPublicKey: getPublicKey(generatePrivateKey()), - timestamp: new Date(timestamp.valueOf() + 3), - }), - ]); - - log("Messages have been encrypted"); - const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ - createFullNode({ - staticNoiseKey: NOISE_KEY_1, - }).then((waku) => waku.start().then(() => waku)), - createFullNode({ - staticNoiseKey: NOISE_KEY_2, - }).then((waku) => waku.start().then(() => waku)), - nwaku.getMultiaddrWithId(), - ]); - - log("Waku nodes created"); - - await Promise.all([ - waku1.dial(nimWakuMultiaddr), - waku2.dial(nimWakuMultiaddr), - ]); - - log("Waku nodes connected to nwaku"); - - await waitForRemotePeer(waku1, [Protocols.LightPush]); - - log("Sending messages using light push"); - await Promise.all([ - waku1.lightPush.push(encryptedAsymmetricMessage), - waku1.lightPush.push(encryptedSymmetricMessage), - waku1.lightPush.push(otherEncMessage), - waku1.lightPush.push(clearMessage), - ]); - - await waitForRemotePeer(waku2, [Protocols.Store]); - - waku2.store.addDecryptionKey(symKey); - - log("Retrieve messages from store"); let messages: WakuMessage[] = []; - await waku2.store.queryHistory( + await waku.store.queryOrderedCallback( [], - async (msgPromises) => { - const msgsOrUndefined = await Promise.all(msgPromises); - const msgs = msgsOrUndefined.filter(isWakuMessageDefined); - messages = messages.concat(msgs); + async (msg) => { + messages.push(msg); }, { - decryptionParams: [{ key: privateKey }], + pageDirection: PageDirection.BACKWARD, } ); - // Messages are ordered from oldest to latest within a page (1 page query) - expect(messages[0]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); - expect(messages[1]?.payloadAsUtf8).to.eq(encryptedSymmetricMessageText); - expect(messages[2]?.payloadAsUtf8).to.eq(clearMessageText); + messages = messages.reverse(); - !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); - !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); + expect(messages?.length).eq(totalMsgs); + for (let index = 0; index < totalMsgs; index++) { + expect( + messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === `Message ${index}`; + }) + ).to.eq(index); + } }); - it("Retrieves history with asymmetric & symmetric encrypted messages on different content topics", async function () { + it("Generator, with asymmetric & symmetric encrypted messages", async function () { this.timeout(15_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true, lightpush: true }); - const encryptedAsymmetricMessageText = "This message is encrypted for me using asymmetric"; const encryptedAsymmetricContentTopic = "/test/1/asymmetric/proto"; @@ -482,19 +370,19 @@ describe("Waku Store", () => { method: DecryptionMethod.Symmetric, }); - let messages: WakuMessage[] = []; + const messages: WakuMessage[] = []; log("Retrieve messages from store"); - await waku2.store.queryHistory( - [], - async (msgPromises) => { - const msgsOrUndefined = await Promise.all(msgPromises); - const msgs = msgsOrUndefined.filter(isWakuMessageDefined); - messages = messages.concat(msgs); - }, - { - decryptionParams: [{ key: privateKey }], + + for await (const msgPromises of waku2.store.queryGenerator([], { + decryptionParams: [{ key: privateKey }], + })) { + for (const promise of msgPromises) { + const msg = await promise; + if (msg) { + messages.push(msg); + } } - ); + } expect(messages?.length).eq(3); if (!messages) throw "Length was tested"; @@ -503,15 +391,24 @@ describe("Waku Store", () => { expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText); expect(messages[2].payloadAsUtf8).to.eq(clearMessageText); + for (const text of [ + encryptedAsymmetricMessageText, + encryptedSymmetricMessageText, + clearMessageText, + ]) { + expect( + messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === text; + }) + ).to.not.eq(-1); + } + !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); }); - it("Retrieves history using start and end time", async function () { - this.timeout(15_000); - - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); + it("Ordered callback, using start and end time", async function () { + this.timeout(20000); const now = new Date(); @@ -554,17 +451,12 @@ describe("Waku Store", () => { const nwakuPeerId = await nwaku.getPeerId(); const firstMessages: WakuMessage[] = []; - await waku.store.queryHistory( + await waku.store.queryOrderedCallback( [], - async (msgPromises) => { - await Promise.all( - msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - firstMessages.push(msg); - } - }) - ); + (msg) => { + if (msg) { + firstMessages.push(msg); + } }, { peerId: nwakuPeerId, @@ -573,17 +465,10 @@ describe("Waku Store", () => { ); const bothMessages: WakuMessage[] = []; - await waku.store.queryHistory( + await waku.store.queryOrderedCallback( [], - async (msgPromises) => { - await Promise.all( - msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - bothMessages.push(msg); - } - }) - ); + async (msg) => { + bothMessages.push(msg); }, { peerId: nwakuPeerId, @@ -601,3 +486,69 @@ describe("Waku Store", () => { expect(bothMessages?.length).eq(2); }); }); + +describe("Waku Store, custom pubsub topic", () => { + const customPubSubTopic = "/waku/2/custom-dapp/proto"; + let waku: WakuFull; + let nwaku: Nwaku; + + beforeEach(async function () { + this.timeout(15_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ + persistMessages: true, + store: true, + topics: customPubSubTopic, + }); + }); + + afterEach(async function () { + !!nwaku && nwaku.stop(); + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + + it("Generator, custom pubsub topic", async function () { + this.timeout(15_000); + + const totalMsgs = 20; + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic, + }), + customPubSubTopic + ) + ).to.be.true; + } + + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + pubSubTopic: customPubSubTopic, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: WakuMessage[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); + + promises = promises.concat(_promises); + } + await Promise.all(promises); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === "Message 0"; + }); + expect(result).to.not.eq(-1); + }); +}); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index ef879fa56c..bf1dada03e 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -108,22 +108,95 @@ export class WakuStore { /** * Do a query to a Waku Store to retrieve historical/missed messages. * - * @param contentTopics The content topics to pass to the query, leave empty to - * retrieve all messages. - * @param callback called on a page of retrieved messages. If the callback returns `true` - * then pagination is stopped. - * @param options Optional parameters. + * The callback function takes a `WakuMessage` in input, + * messages are processed in order: + * - oldest to latest if `options.pageDirection` == { @link PageDirection.FORWARD } + * - latest to oldest if `options.pageDirection` == { @link PageDirection.BACKWARD } + * + * The ordering may affect performance. * * @throws If not able to reach a Waku Store peer to query * or if an error is encountered when processing the reply. */ - async queryHistory( + async queryOrderedCallback( contentTopics: string[], callback: ( - messages: Array> + message: WakuMessage ) => Promise | boolean | void, options?: QueryOptions ): Promise { + const abort = false; + for await (const promises of this.queryGenerator(contentTopics, options)) { + if (abort) break; + let messages = await Promise.all(promises); + + messages = messages.filter(isWakuMessageDefined); + + // Messages in pages are ordered from oldest (first) to most recent (last). + // https://github.com/vacp2p/rfc/issues/533 + if ( + typeof options?.pageDirection === "undefined" || + options?.pageDirection === PageDirection.BACKWARD + ) { + messages = messages.reverse(); + } + + await Promise.all( + messages.map((msg) => { + if (!abort) { + if (msg) return callback(msg); + } + }) + ); + } + } + + /** + * Do a query to a Waku Store to retrieve historical/missed messages. + * + * The callback function takes a `Promise` in input, + * useful if messages needs to be decrypted and performance matters. + * **Order of messages is not guaranteed**. + * + * @returns the promises of the callback call. + * + * @throws If not able to reach a Waku Store peer to query + * or if an error is encountered when processing the reply. + */ + async queryCallbackOnPromise( + contentTopics: string[], + callback: ( + message: Promise + ) => Promise | boolean | void, + options?: QueryOptions + ): Promise>> { + let abort = false; + let promises: Promise[] = []; + for await (const page of this.queryGenerator(contentTopics, options)) { + const _promises = page.map(async (msg) => { + if (!abort) { + abort = Boolean(await callback(msg)); + } + }); + + promises = promises.concat(_promises); + } + return promises; + } + + /** + * Do a query to a Waku Store to retrieve historical/missed messages. + * + * This is a generator, useful if you want most control on how messages + * are processed. + * + * @throws If not able to reach a Waku Store peer to query + * or if an error is encountered when processing the reply. + */ + async *queryGenerator( + contentTopics: string[], + options?: QueryOptions + ): AsyncGenerator[]> { let startTime, endTime; if (options?.timeFilter) { @@ -178,23 +251,19 @@ export class WakuStore { decryptionParams = decryptionParams.concat(options.decryptionParams); } - for await (const messagePromises of paginate( + for await (const messages of paginate( connection, protocol, queryOpts, decryptionParams )) { - const abort = Boolean(await callback(messagePromises)); - if (abort) { - // TODO: Also abort underlying generator - break; - } + yield messages; } } /** * Register a decryption key to attempt decryption of messages received in any - * subsequent { @link queryHistory } call. This can either be a private key for + * subsequent query call. This can either be a private key for * asymmetric encryption or a symmetric key. { @link WakuStore } will attempt to * decrypt messages using both methods. * @@ -209,7 +278,7 @@ export class WakuStore { /**cursorV2Beta4 * Delete a decryption key that was used to attempt decryption of messages - * received in subsequent { @link queryHistory } calls. + * received in subsequent query calls. * * Strings must be in hex format. */ @@ -280,7 +349,7 @@ async function* paginate( throw "History response contains an Error: " + response.error; } - if (!response.messages) { + if (!response.messages || !response.messages.length) { log( "Stopping pagination due to store `response.messages` field missing or empty" ); @@ -315,3 +384,9 @@ async function* paginate( } } } + +export const isWakuMessageDefined = ( + msg: WakuMessage | undefined +): msg is WakuMessage => { + return !!msg; +};