diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 7b4195285a..86a33f1efd 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -25,6 +25,12 @@ const log = debug("waku:test:store"); const TestContentTopic = "/test/1/waku-store/utf8"; +const isWakuMessageDefined = ( + msg: WakuMessage | undefined +): msg is WakuMessage => { + return !!msg; +}; + describe("Waku Store", () => { let waku: WakuFull; let nwaku: Nwaku; @@ -57,7 +63,18 @@ describe("Waku Store", () => { await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages = await waku.store.queryHistory([]); + + const messages: WakuMessage[] = []; + await waku.store.queryHistory([], async (msgPromises) => { + await Promise.all( + msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }) + ); + }); expect(messages?.length).eq(2); const result = messages?.findIndex((msg) => { @@ -92,12 +109,16 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - let messages: WakuMessage[] = []; - - await waku.store.queryHistory([], { - callback: (_msgs) => { - messages = messages.concat(_msgs); - }, + const messages: WakuMessage[] = []; + await waku.store.queryHistory([], async (msgPromises) => { + await Promise.all( + msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }) + ); }); expect(messages?.length).eq(totalMsgs); @@ -136,13 +157,16 @@ describe("Waku Store", () => { let messages: WakuMessage[] = []; const desiredMsgs = 14; - await waku.store.queryHistory([], { - pageSize: 7, - callback: (_msgs) => { - messages = messages.concat(_msgs); + await waku.store.queryHistory( + [], + async (msgPromises) => { + const msgsOrUndefined = await Promise.all(msgPromises); + const msgs = msgsOrUndefined.filter(isWakuMessageDefined); + messages = messages.concat(msgs); return messages.length >= desiredMsgs; }, - }); + { pageSize: 7 } + ); expect(messages?.length).eq(desiredMsgs); }); @@ -171,9 +195,21 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages = await waku.store.queryHistory([], { - pageDirection: PageDirection.FORWARD, - }); + let messages: WakuMessage[] = []; + await waku.store.queryHistory( + [], + async (msgPromises) => { + const msgsOrUndefined = await Promise.all(msgPromises); + const msgs = msgsOrUndefined.filter(isWakuMessageDefined); + // Note: messages within a page are ordered from oldest to most recent + // so the `concat` can only preserve order when `PageDirection` + // is forward + messages = messages.concat(msgs); + }, + { + pageDirection: PageDirection.FORWARD, + } + ); expect(messages?.length).eq(15); for (let index = 0; index < 2; index++) { @@ -218,9 +254,23 @@ describe("Waku Store", () => { const nimPeerId = await nwaku.getPeerId(); - const messages = await waku.store.queryHistory([], { - peerId: nimPeerId, - }); + const messages: WakuMessage[] = []; + await waku.store.queryHistory( + [], + async (msgPromises) => { + await Promise.all( + msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }) + ); + }, + { + peerId: nimPeerId, + } + ); expect(messages?.length).eq(2); const result = messages?.findIndex((msg) => { @@ -246,6 +296,7 @@ describe("Waku Store", () => { const symKey = generateSymmetricKey(); const publicKey = getPublicKey(privateKey); + const timestamp = new Date(); const [ encryptedAsymmetricMessage, encryptedSymmetricMessage, @@ -257,6 +308,7 @@ describe("Waku Store", () => { TestContentTopic, { encPublicKey: publicKey, + timestamp, } ), WakuMessage.fromUtf8String( @@ -264,16 +316,19 @@ describe("Waku Store", () => { TestContentTopic, { symKey: symKey, + timestamp: new Date(timestamp.valueOf() + 1), } ), - WakuMessage.fromUtf8String(clearMessageText, TestContentTopic), + WakuMessage.fromUtf8String(clearMessageText, TestContentTopic, { + timestamp: new Date(timestamp.valueOf() + 2), + }), WakuMessage.fromUtf8String(otherEncMessageText, TestContentTopic, { encPublicKey: getPublicKey(generatePrivateKey()), + timestamp: new Date(timestamp.valueOf() + 3), }), ]); log("Messages have been encrypted"); - const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ createFullNode({ staticNoiseKey: NOISE_KEY_1, @@ -308,13 +363,23 @@ describe("Waku Store", () => { waku2.store.addDecryptionKey(symKey); log("Retrieve messages from store"); - const messages = await waku2.store.queryHistory([], { - decryptionParams: [{ key: privateKey }], - }); + let messages: WakuMessage[] = []; + await waku2.store.queryHistory( + [], + async (msgPromises) => { + const msgsOrUndefined = await Promise.all(msgPromises); + const msgs = msgsOrUndefined.filter(isWakuMessageDefined); + messages = messages.concat(msgs); + }, + { + decryptionParams: [{ key: privateKey }], + } + ); - expect(messages[0]?.payloadAsUtf8).to.eq(clearMessageText); + // Messages are ordered from oldest to latest within a page (1 page query) + expect(messages[0]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); expect(messages[1]?.payloadAsUtf8).to.eq(encryptedSymmetricMessageText); - expect(messages[2]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); + expect(messages[2]?.payloadAsUtf8).to.eq(clearMessageText); !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); @@ -341,6 +406,7 @@ describe("Waku Store", () => { const symKey = generateSymmetricKey(); const publicKey = getPublicKey(privateKey); + const timestamp = new Date(); const [ encryptedAsymmetricMessage, encryptedSymmetricMessage, @@ -352,6 +418,7 @@ describe("Waku Store", () => { encryptedAsymmetricContentTopic, { encPublicKey: publicKey, + timestamp, } ), WakuMessage.fromUtf8String( @@ -359,17 +426,20 @@ describe("Waku Store", () => { encryptedSymmetricContentTopic, { symKey: symKey, + timestamp: new Date(timestamp.valueOf() + 1), } ), WakuMessage.fromUtf8String( clearMessageText, - encryptedAsymmetricContentTopic + encryptedAsymmetricContentTopic, + { timestamp: new Date(timestamp.valueOf() + 2) } ), WakuMessage.fromUtf8String( otherEncMessageText, encryptedSymmetricContentTopic, { encPublicKey: getPublicKey(generatePrivateKey()), + timestamp: new Date(timestamp.valueOf() + 3), } ), ]); @@ -412,16 +482,26 @@ describe("Waku Store", () => { method: DecryptionMethod.Symmetric, }); + let messages: WakuMessage[] = []; log("Retrieve messages from store"); - const messages = await waku2.store.queryHistory([], { - decryptionParams: [{ key: privateKey }], - }); + await waku2.store.queryHistory( + [], + async (msgPromises) => { + const msgsOrUndefined = await Promise.all(msgPromises); + const msgs = msgsOrUndefined.filter(isWakuMessageDefined); + messages = messages.concat(msgs); + }, + { + decryptionParams: [{ key: privateKey }], + } + ); expect(messages?.length).eq(3); if (!messages) throw "Length was tested"; - expect(messages[0].payloadAsUtf8).to.eq(clearMessageText); + // Messages are ordered from oldest to latest within a page (1 page query) + expect(messages[0].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText); - expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); + expect(messages[2].payloadAsUtf8).to.eq(clearMessageText); !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); @@ -473,22 +553,50 @@ describe("Waku Store", () => { const nwakuPeerId = await nwaku.getPeerId(); - const firstMessage = await waku.store.queryHistory([], { - peerId: nwakuPeerId, - timeFilter: { startTime, endTime: message1Timestamp }, - }); - - const bothMessages = await waku.store.queryHistory([], { - peerId: nwakuPeerId, - timeFilter: { - startTime, - endTime, + const firstMessages: WakuMessage[] = []; + await waku.store.queryHistory( + [], + async (msgPromises) => { + await Promise.all( + msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + firstMessages.push(msg); + } + }) + ); }, - }); + { + peerId: nwakuPeerId, + timeFilter: { startTime, endTime: message1Timestamp }, + } + ); - expect(firstMessage?.length).eq(1); + const bothMessages: WakuMessage[] = []; + await waku.store.queryHistory( + [], + async (msgPromises) => { + await Promise.all( + msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + bothMessages.push(msg); + } + }) + ); + }, + { + peerId: nwakuPeerId, + timeFilter: { + startTime, + endTime, + }, + } + ); - expect(firstMessage[0]?.payloadAsUtf8).eq("Message 0"); + expect(firstMessages?.length).eq(1); + + expect(firstMessages[0]?.payloadAsUtf8).eq("Message 0"); expect(bothMessages?.length).eq(2); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index a2c89b6f38..ef879fa56c 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -1,3 +1,4 @@ +import type { Connection } from "@libp2p/interface-connection"; import type { PeerId } from "@libp2p/interface-peer-id"; import { Peer } from "@libp2p/interface-peer-store"; import debug from "debug"; @@ -19,7 +20,7 @@ import { WakuMessage, } from "../waku_message"; -import { HistoryRPC, PageDirection } from "./history_rpc"; +import { HistoryRPC, PageDirection, Params } from "./history_rpc"; import HistoryError = HistoryResponse.HistoryError; @@ -77,18 +78,6 @@ export interface QueryOptions { * Retrieve messages with a timestamp within the provided values. */ timeFilter?: TimeFilter; - /** - * Callback called on pages of stored messages as they are retrieved. - * - * Allows for a faster access to the results as it is called as soon as a page - * is received. Traversal of the pages is done automatically so this function - * will invoked for each retrieved page. - * - * If the call on a page returns `true`, then traversal of the pages is aborted. - * For example, this can be used for the caller to stop the query after a - * specific message is found. - */ - callback?: (messages: WakuMessage[]) => void | boolean; /** * Keys that will be used to decrypt messages. * @@ -121,6 +110,8 @@ export class WakuStore { * * @param contentTopics The content topics to pass to the query, leave empty to * retrieve all messages. + * @param callback called on a page of retrieved messages. If the callback returns `true` + * then pagination is stopped. * @param options Optional parameters. * * @throws If not able to reach a Waku Store peer to query @@ -128,8 +119,11 @@ export class WakuStore { */ async queryHistory( contentTopics: string[], + callback: ( + messages: Array> + ) => Promise | boolean | void, options?: QueryOptions - ): Promise { + ): Promise { let startTime, endTime; if (options?.timeFilter) { @@ -137,7 +131,7 @@ export class WakuStore { endTime = options.timeFilter.endTime; } - const opts = Object.assign( + const queryOpts = Object.assign( { pubSubTopic: this.pubSubTopic, pageDirection: PageDirection.BACKWARD, @@ -155,7 +149,7 @@ export class WakuStore { const res = await selectPeerForProtocol( this.libp2p.peerStore, Object.values(StoreCodecs), - opts?.peerId + options?.peerId ); if (!res) { @@ -180,90 +174,20 @@ export class WakuStore { // Add the decryption keys passed to this function against the // content topics also passed to this function. - if (opts.decryptionParams) { - decryptionParams = decryptionParams.concat(opts.decryptionParams); + if (options?.decryptionParams) { + decryptionParams = decryptionParams.concat(options.decryptionParams); } - const messages: WakuMessage[] = []; - let cursor = undefined; - while (true) { - const stream = await connection.newStream(protocol); - const queryOpts = Object.assign(opts, { cursor }); - const historyRpcQuery = HistoryRPC.createQuery(queryOpts); - log("Querying store peer", connections[0].remoteAddr.toString()); - - const res = await pipe( - [historyRpcQuery.encode()], - lp.encode(), - stream, - lp.decode(), - async (source) => await all(source) - ); - const bytes = new Uint8ArrayList(); - res.forEach((chunk) => { - bytes.append(chunk); - }); - - const reply = historyRpcQuery.decode(bytes); - - if (!reply.response) { - log("No message returned from store: `response` field missing"); - return messages; - } - - const response = reply.response as protoV2Beta4.HistoryResponse; - - if ( - response.error && - response.error !== HistoryError.ERROR_NONE_UNSPECIFIED - ) { - throw "History response contains an Error: " + response.error; - } - - if (!response.messages || !response.messages.length) { - // No messages left (or stored) - log("No message returned from store: `messages` array empty"); - return messages; - } - - log( - `${response.messages.length} messages retrieved for (${opts.pubSubTopic})`, - contentTopics - ); - - const pageMessages: WakuMessage[] = []; - await Promise.all( - response.messages.map(async (protoMsg) => { - const msg = await WakuMessage.decodeProto(protoMsg, decryptionParams); - - if (msg) { - messages.push(msg); - pageMessages.push(msg); - } - }) - ); - - let abort = false; - if (opts.callback) { - abort = Boolean(opts.callback(pageMessages)); - } - - const responsePageSize = response.pagingInfo?.pageSize; - const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; - if ( - abort || - // Response page size smaller than query, meaning this is the last page - (responsePageSize && queryPageSize && responsePageSize < queryPageSize) - ) { - return messages; - } - - cursor = response.pagingInfo?.cursor; - if (cursor === undefined) { - // If the server does not return cursor then there is an issue, - // Need to abort, or we end up in an infinite loop - log("Store response does not contain a cursor, stopping pagination"); - return messages; + for await (const messagePromises of paginate( + connection, + protocol, + queryOpts, + decryptionParams + )) { + const abort = Boolean(await callback(messagePromises)); + if (abort) { + // TODO: Also abort underlying generator + break; } } } @@ -306,3 +230,88 @@ export class WakuStore { return getPeersForProtocol(this.libp2p.peerStore, codecs); } } + +async function* paginate( + connection: Connection, + protocol: string, + queryOpts: Params, + decryptionParams: DecryptionParams[] +): AsyncGenerator[]> { + let cursor = undefined; + while (true) { + queryOpts = Object.assign(queryOpts, { cursor }); + + const stream = await connection.newStream(protocol); + const historyRpcQuery = HistoryRPC.createQuery(queryOpts); + + log( + "Querying store peer", + connection.remoteAddr.toString(), + `for (${queryOpts.pubSubTopic})`, + queryOpts.contentTopics + ); + + const res = await pipe( + [historyRpcQuery.encode()], + lp.encode(), + stream, + lp.decode(), + async (source) => await all(source) + ); + + const bytes = new Uint8ArrayList(); + res.forEach((chunk) => { + bytes.append(chunk); + }); + + const reply = historyRpcQuery.decode(bytes); + + if (!reply.response) { + log("Stopping pagination due to store `response` field missing"); + break; + } + + const response = reply.response as protoV2Beta4.HistoryResponse; + + if ( + response.error && + response.error !== HistoryError.ERROR_NONE_UNSPECIFIED + ) { + throw "History response contains an Error: " + response.error; + } + + if (!response.messages) { + log( + "Stopping pagination due to store `response.messages` field missing or empty" + ); + break; + } + + log(`${response.messages.length} messages retrieved from store`); + + yield response.messages.map((protoMsg) => + WakuMessage.decodeProto(protoMsg, decryptionParams) + ); + + cursor = response.pagingInfo?.cursor; + if (typeof cursor === "undefined") { + // If the server does not return cursor then there is an issue, + // Need to abort, or we end up in an infinite loop + log( + "Stopping pagination due to `response.pagingInfo.cursor` missing from store response" + ); + break; + } + + const responsePageSize = response.pagingInfo?.pageSize; + const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; + if ( + // Response page size smaller than query, meaning this is the last page + responsePageSize && + queryPageSize && + responsePageSize < queryPageSize + ) { + break; + } + } +}