From 3b05bfe988bb616beb2b106c7f5c70c8251fd3c5 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Sun, 11 Sep 2022 09:49:17 +1000 Subject: [PATCH 1/6] refact: getPeersForProtocol only needs the peer store --- src/lib/select_peer.ts | 7 +++---- src/lib/waku_filter/index.ts | 2 +- src/lib/waku_light_push/index.ts | 2 +- src/lib/waku_store/index.ts | 2 +- 4 files changed, 6 insertions(+), 7 deletions(-) diff --git a/src/lib/select_peer.ts b/src/lib/select_peer.ts index 13bde3627a..e712412815 100644 --- a/src/lib/select_peer.ts +++ b/src/lib/select_peer.ts @@ -1,5 +1,4 @@ -import { Peer } from "@libp2p/interface-peer-store"; -import { Libp2p } from "libp2p"; +import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; /** * Returns a pseudo-random peer that supports the given protocol. @@ -18,11 +17,11 @@ export async function selectRandomPeer( * Returns the list of peers that supports the given protocol. */ export async function getPeersForProtocol( - libp2p: Libp2p, + peerStore: PeerStore, protocols: string[] ): Promise { const peers: Peer[] = []; - await libp2p.peerStore.forEach((peer) => { + await peerStore.forEach((peer) => { for (let i = 0; i < protocols.length; i++) { if (peer.protocols.includes(protocols[i])) { peers.push(peer); diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index bd3bc8e8c9..fdcf1abed5 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -272,7 +272,7 @@ export class WakuFilter { } async peers(): Promise { - return getPeersForProtocol(this.libp2p, [FilterCodec]); + return getPeersForProtocol(this.libp2p.peerStore, [FilterCodec]); } async randomPeer(): Promise { diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index a5a36bcce7..48e5b5a111 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -109,7 +109,7 @@ export class WakuLightPush { * peers. */ async peers(): Promise { - return getPeersForProtocol(this.libp2p, [LightPushCodec]); + return getPeersForProtocol(this.libp2p.peerStore, [LightPushCodec]); } /** diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 2970fe0111..24fea200c6 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -312,7 +312,7 @@ export class WakuStore { codecs.push(codec); } - return getPeersForProtocol(this.libp2p, codecs); + return getPeersForProtocol(this.libp2p.peerStore, codecs); } /** From 056aed59fbdc7f159e5d0a5990b38581ded95c66 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Sun, 11 Sep 2022 09:54:10 +1000 Subject: [PATCH 2/6] refactor: selectRandomPeer doesn't need to be async --- src/lib/select_peer.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/lib/select_peer.ts b/src/lib/select_peer.ts index e712412815..3ee0bfd2c5 100644 --- a/src/lib/select_peer.ts +++ b/src/lib/select_peer.ts @@ -4,9 +4,7 @@ import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; * Returns a pseudo-random peer that supports the given protocol. * Useful for protocols such as store and light push */ -export async function selectRandomPeer( - peers: Peer[] -): Promise { +export function selectRandomPeer(peers: Peer[]): Peer | undefined { if (peers.length === 0) return; const index = Math.round(Math.random() * (peers.length - 1)); From bdf1c9b7e371764b15540d82b6d76fad4eb2dd6f Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Sun, 11 Sep 2022 10:02:12 +1000 Subject: [PATCH 3/6] fix: rename to avoid conflict with eponymous TypeScript type --- src/lib/waku_store/index.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 24fea200c6..0304496cbb 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -21,7 +21,7 @@ import { import { HistoryRPC, PageDirection } from "./history_rpc"; -import Error = HistoryResponse.HistoryError; +import HistoryError = HistoryResponse.HistoryError; const log = debug("waku:store"); @@ -225,7 +225,10 @@ export class WakuStore { const response = reply.response as protoV2Beta4.HistoryResponse; - if (response.error && response.error !== Error.ERROR_NONE_UNSPECIFIED) { + if ( + response.error && + response.error !== HistoryError.ERROR_NONE_UNSPECIFIED + ) { throw "History response contains an Error: " + response.error; } From 930c7beaef889762541306079664bf7c0c7c8265 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Sun, 11 Sep 2022 10:02:58 +1000 Subject: [PATCH 4/6] refactor: extract peer selection logic --- src/lib/select_peer.ts | 46 ++++++++++++++++++++++++++++++++ src/lib/waku_filter/index.ts | 30 +++++++++------------ src/lib/waku_light_push/index.ts | 24 ++++++++++------- src/lib/waku_store/index.ts | 41 +++++++--------------------- 4 files changed, 83 insertions(+), 58 deletions(-) diff --git a/src/lib/select_peer.ts b/src/lib/select_peer.ts index 3ee0bfd2c5..9c34fbce44 100644 --- a/src/lib/select_peer.ts +++ b/src/lib/select_peer.ts @@ -1,4 +1,8 @@ +import type { PeerId } from "@libp2p/interface-peer-id"; import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; +import debug from "debug"; + +const log = debug("waku:select-peer"); /** * Returns a pseudo-random peer that supports the given protocol. @@ -29,3 +33,45 @@ export async function getPeersForProtocol( }); return peers; } + +export async function selectPeerForProtocol( + peerStore: PeerStore, + protocols: string[], + peerId?: PeerId +): Promise<{ peer: Peer; protocol: string } | undefined> { + let peer; + if (peerId) { + peer = await peerStore.get(peerId); + if (!peer) { + log( + `Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}` + ); + return; + } + } else { + const peers = await getPeersForProtocol(peerStore, protocols); + peer = selectRandomPeer(peers); + if (!peer) { + log("Failed to find known peer that registers protocols", protocols); + return; + } + } + + let protocol; + for (const codec of protocols) { + if (peer.protocols.includes(codec)) { + protocol = codec; + // Do not break as we want to keep the last value + } + } + log(`Using codec ${protocol}`); + if (!protocol) { + log( + `Peer does not register required protocols: ${peer.id.toString()}`, + protocols + ); + return; + } + + return { peer, protocol }; +} diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index fdcf1abed5..e2fa36ca06 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -11,7 +11,11 @@ import type { Libp2p } from "libp2p"; import { WakuMessage as WakuMessageProto } from "../../proto/message"; import { DefaultPubSubTopic } from "../constants"; import { selectConnection } from "../select_connection"; -import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { + getPeersForProtocol, + selectPeerForProtocol, + selectRandomPeer, +} from "../select_peer"; import { hexToBytes } from "../utils"; import { DecryptionMethod, WakuMessage } from "../waku_message"; @@ -228,23 +232,15 @@ export class WakuFilter { } private async getPeer(peerId?: PeerId): Promise { - let peer; - if (peerId) { - peer = await this.libp2p.peerStore.get(peerId); - if (!peer) { - throw new Error( - `Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}` - ); - } - } else { - peer = await this.randomPeer(); - if (!peer) { - throw new Error( - "Failed to find known peer that registers waku filter protocol" - ); - } + const res = await selectPeerForProtocol( + this.libp2p.peerStore, + [FilterCodec], + peerId + ); + if (!res) { + throw new Error(`Failed to select peer for ${FilterCodec}`); } - return peer; + return res.peer; } /** diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index 48e5b5a111..0c3db3bf20 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -10,7 +10,11 @@ import { Uint8ArrayList } from "uint8arraylist"; import { PushResponse } from "../../proto/light_push"; import { DefaultPubSubTopic } from "../constants"; import { selectConnection } from "../select_connection"; -import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { + getPeersForProtocol, + selectPeerForProtocol, + selectRandomPeer, +} from "../select_peer"; import { WakuMessage } from "../waku_message"; import { PushRPC } from "./push_rpc"; @@ -51,16 +55,16 @@ export class WakuLightPush { message: WakuMessage, opts?: PushOptions ): Promise { - let peer; - if (opts?.peerId) { - peer = await this.libp2p.peerStore.get(opts.peerId); - if (!peer) throw "Peer is unknown"; - } else { - peer = await this.randomPeer(); + const res = await selectPeerForProtocol( + this.libp2p.peerStore, + [LightPushCodec], + opts?.peerId + ); + + if (!res) { + throw new Error("Failed to get a peer"); } - if (!peer) throw "No peer available"; - if (!peer.protocols.includes(LightPushCodec)) - throw "Peer does not register waku light push protocol"; + const { peer } = res; const connections = this.libp2p.connectionManager.getConnections(peer.id); const connection = selectConnection(connections); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 0304496cbb..a2c89b6f38 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -11,7 +11,7 @@ import * as protoV2Beta4 from "../../proto/store_v2beta4"; import { HistoryResponse } from "../../proto/store_v2beta4"; import { DefaultPubSubTopic, StoreCodecs } from "../constants"; import { selectConnection } from "../select_connection"; -import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { getPeersForProtocol, selectPeerForProtocol } from "../select_peer"; import { hexToBytes } from "../utils"; import { DecryptionMethod, @@ -152,29 +152,17 @@ export class WakuStore { ...options, }); - let peer; - if (opts.peerId) { - peer = await this.libp2p.peerStore.get(opts.peerId); - if (!peer) - throw `Failed to retrieve connection details for provided peer in peer store: ${opts.peerId.toString()}`; - } else { - peer = await this.randomPeer(); - if (!peer) - throw "Failed to find known peer that registers waku store protocol"; - } + const res = await selectPeerForProtocol( + this.libp2p.peerStore, + Object.values(StoreCodecs), + opts?.peerId + ); - let storeCodec = ""; - for (const codec of Object.values(StoreCodecs)) { - if (peer.protocols.includes(codec)) { - storeCodec = codec; - // Do not break as we want to keep the last value - } + if (!res) { + throw new Error("Failed to get a peer"); } - log(`Use store codec ${storeCodec}`); - if (!storeCodec) - throw `Peer does not register waku store protocol: ${peer.id.toString()}`; + const { peer, protocol } = res; - Object.assign(opts, { storeCodec }); const connections = this.libp2p.connectionManager.getConnections(peer.id); const connection = selectConnection(connections); @@ -199,7 +187,7 @@ export class WakuStore { const messages: WakuMessage[] = []; let cursor = undefined; while (true) { - const stream = await connection.newStream(storeCodec); + const stream = await connection.newStream(protocol); const queryOpts = Object.assign(opts, { cursor }); const historyRpcQuery = HistoryRPC.createQuery(queryOpts); log("Querying store peer", connections[0].remoteAddr.toString()); @@ -317,13 +305,4 @@ export class WakuStore { return getPeersForProtocol(this.libp2p.peerStore, codecs); } - - /** - * Returns a random peer that supports store protocol from the address - * book (`libp2p.peerStore`). Waku may or may not be currently connected to - * this peer. - */ - async randomPeer(): Promise { - return selectRandomPeer(await this.peers()); - } } From 65511a5888afdeba8123c4088e6629ee1954f46d Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Mon, 12 Sep 2022 11:35:24 +1000 Subject: [PATCH 5/6] feat: store callback takes promises This enables the consumer to decide between: 1. Waiting for all promises, less efficient but maintain order; 2. Process promises as they resolve, faster to get messages through but disrupt message order. --- src/lib/waku_store/index.node.spec.ts | 196 ++++++++++++++++++------ src/lib/waku_store/index.ts | 205 ++++++++++++++------------ 2 files changed, 259 insertions(+), 142 deletions(-) diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 7b4195285a..86a33f1efd 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -25,6 +25,12 @@ const log = debug("waku:test:store"); const TestContentTopic = "/test/1/waku-store/utf8"; +const isWakuMessageDefined = ( + msg: WakuMessage | undefined +): msg is WakuMessage => { + return !!msg; +}; + describe("Waku Store", () => { let waku: WakuFull; let nwaku: Nwaku; @@ -57,7 +63,18 @@ describe("Waku Store", () => { await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages = await waku.store.queryHistory([]); + + const messages: WakuMessage[] = []; + await waku.store.queryHistory([], async (msgPromises) => { + await Promise.all( + msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }) + ); + }); expect(messages?.length).eq(2); const result = messages?.findIndex((msg) => { @@ -92,12 +109,16 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - let messages: WakuMessage[] = []; - - await waku.store.queryHistory([], { - callback: (_msgs) => { - messages = messages.concat(_msgs); - }, + const messages: WakuMessage[] = []; + await waku.store.queryHistory([], async (msgPromises) => { + await Promise.all( + msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }) + ); }); expect(messages?.length).eq(totalMsgs); @@ -136,13 +157,16 @@ describe("Waku Store", () => { let messages: WakuMessage[] = []; const desiredMsgs = 14; - await waku.store.queryHistory([], { - pageSize: 7, - callback: (_msgs) => { - messages = messages.concat(_msgs); + await waku.store.queryHistory( + [], + async (msgPromises) => { + const msgsOrUndefined = await Promise.all(msgPromises); + const msgs = msgsOrUndefined.filter(isWakuMessageDefined); + messages = messages.concat(msgs); return messages.length >= desiredMsgs; }, - }); + { pageSize: 7 } + ); expect(messages?.length).eq(desiredMsgs); }); @@ -171,9 +195,21 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages = await waku.store.queryHistory([], { - pageDirection: PageDirection.FORWARD, - }); + let messages: WakuMessage[] = []; + await waku.store.queryHistory( + [], + async (msgPromises) => { + const msgsOrUndefined = await Promise.all(msgPromises); + const msgs = msgsOrUndefined.filter(isWakuMessageDefined); + // Note: messages within a page are ordered from oldest to most recent + // so the `concat` can only preserve order when `PageDirection` + // is forward + messages = messages.concat(msgs); + }, + { + pageDirection: PageDirection.FORWARD, + } + ); expect(messages?.length).eq(15); for (let index = 0; index < 2; index++) { @@ -218,9 +254,23 @@ describe("Waku Store", () => { const nimPeerId = await nwaku.getPeerId(); - const messages = await waku.store.queryHistory([], { - peerId: nimPeerId, - }); + const messages: WakuMessage[] = []; + await waku.store.queryHistory( + [], + async (msgPromises) => { + await Promise.all( + msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }) + ); + }, + { + peerId: nimPeerId, + } + ); expect(messages?.length).eq(2); const result = messages?.findIndex((msg) => { @@ -246,6 +296,7 @@ describe("Waku Store", () => { const symKey = generateSymmetricKey(); const publicKey = getPublicKey(privateKey); + const timestamp = new Date(); const [ encryptedAsymmetricMessage, encryptedSymmetricMessage, @@ -257,6 +308,7 @@ describe("Waku Store", () => { TestContentTopic, { encPublicKey: publicKey, + timestamp, } ), WakuMessage.fromUtf8String( @@ -264,16 +316,19 @@ describe("Waku Store", () => { TestContentTopic, { symKey: symKey, + timestamp: new Date(timestamp.valueOf() + 1), } ), - WakuMessage.fromUtf8String(clearMessageText, TestContentTopic), + WakuMessage.fromUtf8String(clearMessageText, TestContentTopic, { + timestamp: new Date(timestamp.valueOf() + 2), + }), WakuMessage.fromUtf8String(otherEncMessageText, TestContentTopic, { encPublicKey: getPublicKey(generatePrivateKey()), + timestamp: new Date(timestamp.valueOf() + 3), }), ]); log("Messages have been encrypted"); - const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ createFullNode({ staticNoiseKey: NOISE_KEY_1, @@ -308,13 +363,23 @@ describe("Waku Store", () => { waku2.store.addDecryptionKey(symKey); log("Retrieve messages from store"); - const messages = await waku2.store.queryHistory([], { - decryptionParams: [{ key: privateKey }], - }); + let messages: WakuMessage[] = []; + await waku2.store.queryHistory( + [], + async (msgPromises) => { + const msgsOrUndefined = await Promise.all(msgPromises); + const msgs = msgsOrUndefined.filter(isWakuMessageDefined); + messages = messages.concat(msgs); + }, + { + decryptionParams: [{ key: privateKey }], + } + ); - expect(messages[0]?.payloadAsUtf8).to.eq(clearMessageText); + // Messages are ordered from oldest to latest within a page (1 page query) + expect(messages[0]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); expect(messages[1]?.payloadAsUtf8).to.eq(encryptedSymmetricMessageText); - expect(messages[2]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); + expect(messages[2]?.payloadAsUtf8).to.eq(clearMessageText); !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); @@ -341,6 +406,7 @@ describe("Waku Store", () => { const symKey = generateSymmetricKey(); const publicKey = getPublicKey(privateKey); + const timestamp = new Date(); const [ encryptedAsymmetricMessage, encryptedSymmetricMessage, @@ -352,6 +418,7 @@ describe("Waku Store", () => { encryptedAsymmetricContentTopic, { encPublicKey: publicKey, + timestamp, } ), WakuMessage.fromUtf8String( @@ -359,17 +426,20 @@ describe("Waku Store", () => { encryptedSymmetricContentTopic, { symKey: symKey, + timestamp: new Date(timestamp.valueOf() + 1), } ), WakuMessage.fromUtf8String( clearMessageText, - encryptedAsymmetricContentTopic + encryptedAsymmetricContentTopic, + { timestamp: new Date(timestamp.valueOf() + 2) } ), WakuMessage.fromUtf8String( otherEncMessageText, encryptedSymmetricContentTopic, { encPublicKey: getPublicKey(generatePrivateKey()), + timestamp: new Date(timestamp.valueOf() + 3), } ), ]); @@ -412,16 +482,26 @@ describe("Waku Store", () => { method: DecryptionMethod.Symmetric, }); + let messages: WakuMessage[] = []; log("Retrieve messages from store"); - const messages = await waku2.store.queryHistory([], { - decryptionParams: [{ key: privateKey }], - }); + await waku2.store.queryHistory( + [], + async (msgPromises) => { + const msgsOrUndefined = await Promise.all(msgPromises); + const msgs = msgsOrUndefined.filter(isWakuMessageDefined); + messages = messages.concat(msgs); + }, + { + decryptionParams: [{ key: privateKey }], + } + ); expect(messages?.length).eq(3); if (!messages) throw "Length was tested"; - expect(messages[0].payloadAsUtf8).to.eq(clearMessageText); + // Messages are ordered from oldest to latest within a page (1 page query) + expect(messages[0].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText); - expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); + expect(messages[2].payloadAsUtf8).to.eq(clearMessageText); !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); @@ -473,22 +553,50 @@ describe("Waku Store", () => { const nwakuPeerId = await nwaku.getPeerId(); - const firstMessage = await waku.store.queryHistory([], { - peerId: nwakuPeerId, - timeFilter: { startTime, endTime: message1Timestamp }, - }); - - const bothMessages = await waku.store.queryHistory([], { - peerId: nwakuPeerId, - timeFilter: { - startTime, - endTime, + const firstMessages: WakuMessage[] = []; + await waku.store.queryHistory( + [], + async (msgPromises) => { + await Promise.all( + msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + firstMessages.push(msg); + } + }) + ); }, - }); + { + peerId: nwakuPeerId, + timeFilter: { startTime, endTime: message1Timestamp }, + } + ); - expect(firstMessage?.length).eq(1); + const bothMessages: WakuMessage[] = []; + await waku.store.queryHistory( + [], + async (msgPromises) => { + await Promise.all( + msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + bothMessages.push(msg); + } + }) + ); + }, + { + peerId: nwakuPeerId, + timeFilter: { + startTime, + endTime, + }, + } + ); - expect(firstMessage[0]?.payloadAsUtf8).eq("Message 0"); + expect(firstMessages?.length).eq(1); + + expect(firstMessages[0]?.payloadAsUtf8).eq("Message 0"); expect(bothMessages?.length).eq(2); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index a2c89b6f38..ef879fa56c 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -1,3 +1,4 @@ +import type { Connection } from "@libp2p/interface-connection"; import type { PeerId } from "@libp2p/interface-peer-id"; import { Peer } from "@libp2p/interface-peer-store"; import debug from "debug"; @@ -19,7 +20,7 @@ import { WakuMessage, } from "../waku_message"; -import { HistoryRPC, PageDirection } from "./history_rpc"; +import { HistoryRPC, PageDirection, Params } from "./history_rpc"; import HistoryError = HistoryResponse.HistoryError; @@ -77,18 +78,6 @@ export interface QueryOptions { * Retrieve messages with a timestamp within the provided values. */ timeFilter?: TimeFilter; - /** - * Callback called on pages of stored messages as they are retrieved. - * - * Allows for a faster access to the results as it is called as soon as a page - * is received. Traversal of the pages is done automatically so this function - * will invoked for each retrieved page. - * - * If the call on a page returns `true`, then traversal of the pages is aborted. - * For example, this can be used for the caller to stop the query after a - * specific message is found. - */ - callback?: (messages: WakuMessage[]) => void | boolean; /** * Keys that will be used to decrypt messages. * @@ -121,6 +110,8 @@ export class WakuStore { * * @param contentTopics The content topics to pass to the query, leave empty to * retrieve all messages. + * @param callback called on a page of retrieved messages. If the callback returns `true` + * then pagination is stopped. * @param options Optional parameters. * * @throws If not able to reach a Waku Store peer to query @@ -128,8 +119,11 @@ export class WakuStore { */ async queryHistory( contentTopics: string[], + callback: ( + messages: Array> + ) => Promise | boolean | void, options?: QueryOptions - ): Promise { + ): Promise { let startTime, endTime; if (options?.timeFilter) { @@ -137,7 +131,7 @@ export class WakuStore { endTime = options.timeFilter.endTime; } - const opts = Object.assign( + const queryOpts = Object.assign( { pubSubTopic: this.pubSubTopic, pageDirection: PageDirection.BACKWARD, @@ -155,7 +149,7 @@ export class WakuStore { const res = await selectPeerForProtocol( this.libp2p.peerStore, Object.values(StoreCodecs), - opts?.peerId + options?.peerId ); if (!res) { @@ -180,90 +174,20 @@ export class WakuStore { // Add the decryption keys passed to this function against the // content topics also passed to this function. - if (opts.decryptionParams) { - decryptionParams = decryptionParams.concat(opts.decryptionParams); + if (options?.decryptionParams) { + decryptionParams = decryptionParams.concat(options.decryptionParams); } - const messages: WakuMessage[] = []; - let cursor = undefined; - while (true) { - const stream = await connection.newStream(protocol); - const queryOpts = Object.assign(opts, { cursor }); - const historyRpcQuery = HistoryRPC.createQuery(queryOpts); - log("Querying store peer", connections[0].remoteAddr.toString()); - - const res = await pipe( - [historyRpcQuery.encode()], - lp.encode(), - stream, - lp.decode(), - async (source) => await all(source) - ); - const bytes = new Uint8ArrayList(); - res.forEach((chunk) => { - bytes.append(chunk); - }); - - const reply = historyRpcQuery.decode(bytes); - - if (!reply.response) { - log("No message returned from store: `response` field missing"); - return messages; - } - - const response = reply.response as protoV2Beta4.HistoryResponse; - - if ( - response.error && - response.error !== HistoryError.ERROR_NONE_UNSPECIFIED - ) { - throw "History response contains an Error: " + response.error; - } - - if (!response.messages || !response.messages.length) { - // No messages left (or stored) - log("No message returned from store: `messages` array empty"); - return messages; - } - - log( - `${response.messages.length} messages retrieved for (${opts.pubSubTopic})`, - contentTopics - ); - - const pageMessages: WakuMessage[] = []; - await Promise.all( - response.messages.map(async (protoMsg) => { - const msg = await WakuMessage.decodeProto(protoMsg, decryptionParams); - - if (msg) { - messages.push(msg); - pageMessages.push(msg); - } - }) - ); - - let abort = false; - if (opts.callback) { - abort = Boolean(opts.callback(pageMessages)); - } - - const responsePageSize = response.pagingInfo?.pageSize; - const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; - if ( - abort || - // Response page size smaller than query, meaning this is the last page - (responsePageSize && queryPageSize && responsePageSize < queryPageSize) - ) { - return messages; - } - - cursor = response.pagingInfo?.cursor; - if (cursor === undefined) { - // If the server does not return cursor then there is an issue, - // Need to abort, or we end up in an infinite loop - log("Store response does not contain a cursor, stopping pagination"); - return messages; + for await (const messagePromises of paginate( + connection, + protocol, + queryOpts, + decryptionParams + )) { + const abort = Boolean(await callback(messagePromises)); + if (abort) { + // TODO: Also abort underlying generator + break; } } } @@ -306,3 +230,88 @@ export class WakuStore { return getPeersForProtocol(this.libp2p.peerStore, codecs); } } + +async function* paginate( + connection: Connection, + protocol: string, + queryOpts: Params, + decryptionParams: DecryptionParams[] +): AsyncGenerator[]> { + let cursor = undefined; + while (true) { + queryOpts = Object.assign(queryOpts, { cursor }); + + const stream = await connection.newStream(protocol); + const historyRpcQuery = HistoryRPC.createQuery(queryOpts); + + log( + "Querying store peer", + connection.remoteAddr.toString(), + `for (${queryOpts.pubSubTopic})`, + queryOpts.contentTopics + ); + + const res = await pipe( + [historyRpcQuery.encode()], + lp.encode(), + stream, + lp.decode(), + async (source) => await all(source) + ); + + const bytes = new Uint8ArrayList(); + res.forEach((chunk) => { + bytes.append(chunk); + }); + + const reply = historyRpcQuery.decode(bytes); + + if (!reply.response) { + log("Stopping pagination due to store `response` field missing"); + break; + } + + const response = reply.response as protoV2Beta4.HistoryResponse; + + if ( + response.error && + response.error !== HistoryError.ERROR_NONE_UNSPECIFIED + ) { + throw "History response contains an Error: " + response.error; + } + + if (!response.messages) { + log( + "Stopping pagination due to store `response.messages` field missing or empty" + ); + break; + } + + log(`${response.messages.length} messages retrieved from store`); + + yield response.messages.map((protoMsg) => + WakuMessage.decodeProto(protoMsg, decryptionParams) + ); + + cursor = response.pagingInfo?.cursor; + if (typeof cursor === "undefined") { + // If the server does not return cursor then there is an issue, + // Need to abort, or we end up in an infinite loop + log( + "Stopping pagination due to `response.pagingInfo.cursor` missing from store response" + ); + break; + } + + const responsePageSize = response.pagingInfo?.pageSize; + const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; + if ( + // Response page size smaller than query, meaning this is the last page + responsePageSize && + queryPageSize && + responsePageSize < queryPageSize + ) { + break; + } + } +} From 5a529c1cd796d627c9836673a2fa754fb49fcbc8 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Wed, 14 Sep 2022 13:44:00 +1000 Subject: [PATCH 6/6] feat: provide several API for store queries --- CHANGELOG.md | 2 + src/lib/waku_store/index.node.spec.ts | 487 ++++++++++++-------------- src/lib/waku_store/index.ts | 107 +++++- 3 files changed, 312 insertions(+), 284 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1414e46d6a..4627a55cba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Correct options type for `createFullNode` & `createPrivacy` to enable passing gossipsub options. +- `WakuStore` now provides several APIs: `queryGenerator`, `queryCallbackOnPromise`, `queryOrderedCallback`; + each provides different guarantees and performance. ## [0.27.0] - 2022-09-13 diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 86a33f1efd..07db951213 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -25,70 +25,23 @@ const log = debug("waku:test:store"); const TestContentTopic = "/test/1/waku-store/utf8"; -const isWakuMessageDefined = ( - msg: WakuMessage | undefined -): msg is WakuMessage => { - return !!msg; -}; - describe("Waku Store", () => { let waku: WakuFull; let nwaku: Nwaku; + beforeEach(async function () { + this.timeout(15_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ persistMessages: true, store: true, lightpush: true }); + }); + afterEach(async function () { !!nwaku && nwaku.stop(); !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); }); - it("Retrieves history", async function () { + it("Generator", async function () { this.timeout(15_000); - - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); - - for (let i = 0; i < 2; i++) { - expect( - await nwaku.sendMessage( - Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), - contentTopic: TestContentTopic, - }) - ) - ).to.be.true; - } - - waku = await createFullNode({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - const messages: WakuMessage[] = []; - await waku.store.queryHistory([], async (msgPromises) => { - await Promise.all( - msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }) - ); - }); - - expect(messages?.length).eq(2); - const result = messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === "Message 0"; - }); - expect(result).to.not.eq(-1); - }); - - it("Retrieves history using callback", async function () { - this.timeout(15_000); - - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); - const totalMsgs = 20; for (let i = 0; i < totalMsgs; i++) { @@ -110,15 +63,82 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const messages: WakuMessage[] = []; - await waku.store.queryHistory([], async (msgPromises) => { - await Promise.all( - msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }) - ); + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); + + promises = promises.concat(_promises); + } + await Promise.all(promises); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === "Message 0"; + }); + expect(result).to.not.eq(-1); + }); + + it("Generator, no message returned", async function () { + this.timeout(15_000); + + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: WakuMessage[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); + + promises = promises.concat(_promises); + } + await Promise.all(promises); + + expect(messages?.length).eq(0); + }); + + it("Callback on promise", async function () { + this.timeout(15_000); + + const totalMsgs = 15; + + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic, + }) + ) + ).to.be.true; + } + + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: WakuMessage[] = []; + await waku.store.queryCallbackOnPromise([], async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } }); expect(messages?.length).eq(totalMsgs); @@ -128,15 +148,12 @@ describe("Waku Store", () => { expect(result).to.not.eq(-1); }); - it("Retrieval aborts when callback returns true", async function () { + it("Callback on promise, aborts when callback returns true", async function () { this.timeout(15_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); + const totalMsgs = 20; - const availMsgs = 20; - - for (let i = 0; i < availMsgs; i++) { + for (let i = 0; i < totalMsgs; i++) { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ @@ -154,15 +171,15 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - let messages: WakuMessage[] = []; const desiredMsgs = 14; - - await waku.store.queryHistory( + const messages: WakuMessage[] = []; + await waku.store.queryCallbackOnPromise( [], - async (msgPromises) => { - const msgsOrUndefined = await Promise.all(msgPromises); - const msgs = msgsOrUndefined.filter(isWakuMessageDefined); - messages = messages.concat(msgs); + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } return messages.length >= desiredMsgs; }, { pageSize: 7 } @@ -171,13 +188,11 @@ describe("Waku Store", () => { expect(messages?.length).eq(desiredMsgs); }); - it("Retrieves all historical elements in chronological order through paging", async function () { + it("Ordered Callback - Forward", async function () { this.timeout(15_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); - - for (let i = 0; i < 15; i++) { + const totalMsgs = 18; + for (let i = 0; i < totalMsgs; i++) { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ @@ -195,24 +210,19 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - let messages: WakuMessage[] = []; - await waku.store.queryHistory( + const messages: WakuMessage[] = []; + await waku.store.queryOrderedCallback( [], - async (msgPromises) => { - const msgsOrUndefined = await Promise.all(msgPromises); - const msgs = msgsOrUndefined.filter(isWakuMessageDefined); - // Note: messages within a page are ordered from oldest to most recent - // so the `concat` can only preserve order when `PageDirection` - // is forward - messages = messages.concat(msgs); + async (msg) => { + messages.push(msg); }, { pageDirection: PageDirection.FORWARD, } ); - expect(messages?.length).eq(15); - for (let index = 0; index < 2; index++) { + expect(messages?.length).eq(totalMsgs); + for (let index = 0; index < totalMsgs; index++) { expect( messages?.findIndex((msg) => { return msg.payloadAsUtf8 === `Message ${index}`; @@ -221,176 +231,54 @@ describe("Waku Store", () => { } }); - it("Retrieves history using custom pubsub topic", async function () { + it("Ordered Callback - Backward", async function () { this.timeout(15_000); - const customPubSubTopic = "/waku/2/custom-dapp/proto"; - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ - persistMessages: true, - store: true, - topics: customPubSubTopic, - }); - - for (let i = 0; i < 2; i++) { + const totalMsgs = 18; + for (let i = 0; i < totalMsgs; i++) { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ payload: utf8ToBytes(`Message ${i}`), contentTopic: TestContentTopic, - }), - customPubSubTopic + }) ) ).to.be.true; } waku = await createFullNode({ - pubSubTopic: customPubSubTopic, staticNoiseKey: NOISE_KEY_1, }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const nimPeerId = await nwaku.getPeerId(); - - const messages: WakuMessage[] = []; - await waku.store.queryHistory( - [], - async (msgPromises) => { - await Promise.all( - msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - messages.push(msg); - } - }) - ); - }, - { - peerId: nimPeerId, - } - ); - - expect(messages?.length).eq(2); - const result = messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === "Message 0"; - }); - expect(result).to.not.eq(-1); - }); - - it("Retrieves history with asymmetric & symmetric encrypted messages", async function () { - this.timeout(15_000); - - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true, lightpush: true }); - - const encryptedAsymmetricMessageText = "asymmetric encryption"; - const encryptedSymmetricMessageText = "symmetric encryption"; - const clearMessageText = - "This is a clear text message for everyone to read"; - const otherEncMessageText = - "This message is not for and I must not be able to read it"; - - const privateKey = generatePrivateKey(); - const symKey = generateSymmetricKey(); - const publicKey = getPublicKey(privateKey); - - const timestamp = new Date(); - const [ - encryptedAsymmetricMessage, - encryptedSymmetricMessage, - clearMessage, - otherEncMessage, - ] = await Promise.all([ - WakuMessage.fromUtf8String( - encryptedAsymmetricMessageText, - TestContentTopic, - { - encPublicKey: publicKey, - timestamp, - } - ), - WakuMessage.fromUtf8String( - encryptedSymmetricMessageText, - TestContentTopic, - { - symKey: symKey, - timestamp: new Date(timestamp.valueOf() + 1), - } - ), - WakuMessage.fromUtf8String(clearMessageText, TestContentTopic, { - timestamp: new Date(timestamp.valueOf() + 2), - }), - WakuMessage.fromUtf8String(otherEncMessageText, TestContentTopic, { - encPublicKey: getPublicKey(generatePrivateKey()), - timestamp: new Date(timestamp.valueOf() + 3), - }), - ]); - - log("Messages have been encrypted"); - const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ - createFullNode({ - staticNoiseKey: NOISE_KEY_1, - }).then((waku) => waku.start().then(() => waku)), - createFullNode({ - staticNoiseKey: NOISE_KEY_2, - }).then((waku) => waku.start().then(() => waku)), - nwaku.getMultiaddrWithId(), - ]); - - log("Waku nodes created"); - - await Promise.all([ - waku1.dial(nimWakuMultiaddr), - waku2.dial(nimWakuMultiaddr), - ]); - - log("Waku nodes connected to nwaku"); - - await waitForRemotePeer(waku1, [Protocols.LightPush]); - - log("Sending messages using light push"); - await Promise.all([ - waku1.lightPush.push(encryptedAsymmetricMessage), - waku1.lightPush.push(encryptedSymmetricMessage), - waku1.lightPush.push(otherEncMessage), - waku1.lightPush.push(clearMessage), - ]); - - await waitForRemotePeer(waku2, [Protocols.Store]); - - waku2.store.addDecryptionKey(symKey); - - log("Retrieve messages from store"); let messages: WakuMessage[] = []; - await waku2.store.queryHistory( + await waku.store.queryOrderedCallback( [], - async (msgPromises) => { - const msgsOrUndefined = await Promise.all(msgPromises); - const msgs = msgsOrUndefined.filter(isWakuMessageDefined); - messages = messages.concat(msgs); + async (msg) => { + messages.push(msg); }, { - decryptionParams: [{ key: privateKey }], + pageDirection: PageDirection.BACKWARD, } ); - // Messages are ordered from oldest to latest within a page (1 page query) - expect(messages[0]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); - expect(messages[1]?.payloadAsUtf8).to.eq(encryptedSymmetricMessageText); - expect(messages[2]?.payloadAsUtf8).to.eq(clearMessageText); + messages = messages.reverse(); - !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); - !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); + expect(messages?.length).eq(totalMsgs); + for (let index = 0; index < totalMsgs; index++) { + expect( + messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === `Message ${index}`; + }) + ).to.eq(index); + } }); - it("Retrieves history with asymmetric & symmetric encrypted messages on different content topics", async function () { + it("Generator, with asymmetric & symmetric encrypted messages", async function () { this.timeout(15_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true, lightpush: true }); - const encryptedAsymmetricMessageText = "This message is encrypted for me using asymmetric"; const encryptedAsymmetricContentTopic = "/test/1/asymmetric/proto"; @@ -482,19 +370,19 @@ describe("Waku Store", () => { method: DecryptionMethod.Symmetric, }); - let messages: WakuMessage[] = []; + const messages: WakuMessage[] = []; log("Retrieve messages from store"); - await waku2.store.queryHistory( - [], - async (msgPromises) => { - const msgsOrUndefined = await Promise.all(msgPromises); - const msgs = msgsOrUndefined.filter(isWakuMessageDefined); - messages = messages.concat(msgs); - }, - { - decryptionParams: [{ key: privateKey }], + + for await (const msgPromises of waku2.store.queryGenerator([], { + decryptionParams: [{ key: privateKey }], + })) { + for (const promise of msgPromises) { + const msg = await promise; + if (msg) { + messages.push(msg); + } } - ); + } expect(messages?.length).eq(3); if (!messages) throw "Length was tested"; @@ -503,15 +391,24 @@ describe("Waku Store", () => { expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText); expect(messages[2].payloadAsUtf8).to.eq(clearMessageText); + for (const text of [ + encryptedAsymmetricMessageText, + encryptedSymmetricMessageText, + clearMessageText, + ]) { + expect( + messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === text; + }) + ).to.not.eq(-1); + } + !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); }); - it("Retrieves history using start and end time", async function () { - this.timeout(15_000); - - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); + it("Ordered callback, using start and end time", async function () { + this.timeout(20000); const now = new Date(); @@ -554,17 +451,12 @@ describe("Waku Store", () => { const nwakuPeerId = await nwaku.getPeerId(); const firstMessages: WakuMessage[] = []; - await waku.store.queryHistory( + await waku.store.queryOrderedCallback( [], - async (msgPromises) => { - await Promise.all( - msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - firstMessages.push(msg); - } - }) - ); + (msg) => { + if (msg) { + firstMessages.push(msg); + } }, { peerId: nwakuPeerId, @@ -573,17 +465,10 @@ describe("Waku Store", () => { ); const bothMessages: WakuMessage[] = []; - await waku.store.queryHistory( + await waku.store.queryOrderedCallback( [], - async (msgPromises) => { - await Promise.all( - msgPromises.map(async (promise) => { - const msg = await promise; - if (msg) { - bothMessages.push(msg); - } - }) - ); + async (msg) => { + bothMessages.push(msg); }, { peerId: nwakuPeerId, @@ -601,3 +486,69 @@ describe("Waku Store", () => { expect(bothMessages?.length).eq(2); }); }); + +describe("Waku Store, custom pubsub topic", () => { + const customPubSubTopic = "/waku/2/custom-dapp/proto"; + let waku: WakuFull; + let nwaku: Nwaku; + + beforeEach(async function () { + this.timeout(15_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ + persistMessages: true, + store: true, + topics: customPubSubTopic, + }); + }); + + afterEach(async function () { + !!nwaku && nwaku.stop(); + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + + it("Generator, custom pubsub topic", async function () { + this.timeout(15_000); + + const totalMsgs = 20; + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic, + }), + customPubSubTopic + ) + ).to.be.true; + } + + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + pubSubTopic: customPubSubTopic, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: WakuMessage[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); + + promises = promises.concat(_promises); + } + await Promise.all(promises); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === "Message 0"; + }); + expect(result).to.not.eq(-1); + }); +}); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index ef879fa56c..bf1dada03e 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -108,22 +108,95 @@ export class WakuStore { /** * Do a query to a Waku Store to retrieve historical/missed messages. * - * @param contentTopics The content topics to pass to the query, leave empty to - * retrieve all messages. - * @param callback called on a page of retrieved messages. If the callback returns `true` - * then pagination is stopped. - * @param options Optional parameters. + * The callback function takes a `WakuMessage` in input, + * messages are processed in order: + * - oldest to latest if `options.pageDirection` == { @link PageDirection.FORWARD } + * - latest to oldest if `options.pageDirection` == { @link PageDirection.BACKWARD } + * + * The ordering may affect performance. * * @throws If not able to reach a Waku Store peer to query * or if an error is encountered when processing the reply. */ - async queryHistory( + async queryOrderedCallback( contentTopics: string[], callback: ( - messages: Array> + message: WakuMessage ) => Promise | boolean | void, options?: QueryOptions ): Promise { + const abort = false; + for await (const promises of this.queryGenerator(contentTopics, options)) { + if (abort) break; + let messages = await Promise.all(promises); + + messages = messages.filter(isWakuMessageDefined); + + // Messages in pages are ordered from oldest (first) to most recent (last). + // https://github.com/vacp2p/rfc/issues/533 + if ( + typeof options?.pageDirection === "undefined" || + options?.pageDirection === PageDirection.BACKWARD + ) { + messages = messages.reverse(); + } + + await Promise.all( + messages.map((msg) => { + if (!abort) { + if (msg) return callback(msg); + } + }) + ); + } + } + + /** + * Do a query to a Waku Store to retrieve historical/missed messages. + * + * The callback function takes a `Promise` in input, + * useful if messages needs to be decrypted and performance matters. + * **Order of messages is not guaranteed**. + * + * @returns the promises of the callback call. + * + * @throws If not able to reach a Waku Store peer to query + * or if an error is encountered when processing the reply. + */ + async queryCallbackOnPromise( + contentTopics: string[], + callback: ( + message: Promise + ) => Promise | boolean | void, + options?: QueryOptions + ): Promise>> { + let abort = false; + let promises: Promise[] = []; + for await (const page of this.queryGenerator(contentTopics, options)) { + const _promises = page.map(async (msg) => { + if (!abort) { + abort = Boolean(await callback(msg)); + } + }); + + promises = promises.concat(_promises); + } + return promises; + } + + /** + * Do a query to a Waku Store to retrieve historical/missed messages. + * + * This is a generator, useful if you want most control on how messages + * are processed. + * + * @throws If not able to reach a Waku Store peer to query + * or if an error is encountered when processing the reply. + */ + async *queryGenerator( + contentTopics: string[], + options?: QueryOptions + ): AsyncGenerator[]> { let startTime, endTime; if (options?.timeFilter) { @@ -178,23 +251,19 @@ export class WakuStore { decryptionParams = decryptionParams.concat(options.decryptionParams); } - for await (const messagePromises of paginate( + for await (const messages of paginate( connection, protocol, queryOpts, decryptionParams )) { - const abort = Boolean(await callback(messagePromises)); - if (abort) { - // TODO: Also abort underlying generator - break; - } + yield messages; } } /** * Register a decryption key to attempt decryption of messages received in any - * subsequent { @link queryHistory } call. This can either be a private key for + * subsequent query call. This can either be a private key for * asymmetric encryption or a symmetric key. { @link WakuStore } will attempt to * decrypt messages using both methods. * @@ -209,7 +278,7 @@ export class WakuStore { /**cursorV2Beta4 * Delete a decryption key that was used to attempt decryption of messages - * received in subsequent { @link queryHistory } calls. + * received in subsequent query calls. * * Strings must be in hex format. */ @@ -280,7 +349,7 @@ async function* paginate( throw "History response contains an Error: " + response.error; } - if (!response.messages) { + if (!response.messages || !response.messages.length) { log( "Stopping pagination due to store `response.messages` field missing or empty" ); @@ -315,3 +384,9 @@ async function* paginate( } } } + +export const isWakuMessageDefined = ( + msg: WakuMessage | undefined +): msg is WakuMessage => { + return !!msg; +};