diff --git a/CHANGELOG.md b/CHANGELOG.md index 1414e46d6a..4627a55cba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Correct options type for `createFullNode` & `createPrivacy` to enable passing gossipsub options. +- `WakuStore` now provides several APIs: `queryGenerator`, `queryCallbackOnPromise`, `queryOrderedCallback`; + each provides different guarantees and performance. ## [0.27.0] - 2022-09-13 diff --git a/src/lib/select_peer.ts b/src/lib/select_peer.ts index 13bde3627a..9c34fbce44 100644 --- a/src/lib/select_peer.ts +++ b/src/lib/select_peer.ts @@ -1,13 +1,14 @@ -import { Peer } from "@libp2p/interface-peer-store"; -import { Libp2p } from "libp2p"; +import type { PeerId } from "@libp2p/interface-peer-id"; +import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; +import debug from "debug"; + +const log = debug("waku:select-peer"); /** * Returns a pseudo-random peer that supports the given protocol. * Useful for protocols such as store and light push */ -export async function selectRandomPeer( - peers: Peer[] -): Promise { +export function selectRandomPeer(peers: Peer[]): Peer | undefined { if (peers.length === 0) return; const index = Math.round(Math.random() * (peers.length - 1)); @@ -18,11 +19,11 @@ export async function selectRandomPeer( * Returns the list of peers that supports the given protocol. */ export async function getPeersForProtocol( - libp2p: Libp2p, + peerStore: PeerStore, protocols: string[] ): Promise { const peers: Peer[] = []; - await libp2p.peerStore.forEach((peer) => { + await peerStore.forEach((peer) => { for (let i = 0; i < protocols.length; i++) { if (peer.protocols.includes(protocols[i])) { peers.push(peer); @@ -32,3 +33,45 @@ export async function getPeersForProtocol( }); return peers; } + +export async function selectPeerForProtocol( + peerStore: PeerStore, + protocols: string[], + peerId?: PeerId +): Promise<{ peer: Peer; protocol: string } | undefined> { + let peer; + if (peerId) { + peer = await peerStore.get(peerId); + if (!peer) { + log( + `Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}` + ); + return; + } + } else { + const peers = await getPeersForProtocol(peerStore, protocols); + peer = selectRandomPeer(peers); + if (!peer) { + log("Failed to find known peer that registers protocols", protocols); + return; + } + } + + let protocol; + for (const codec of protocols) { + if (peer.protocols.includes(codec)) { + protocol = codec; + // Do not break as we want to keep the last value + } + } + log(`Using codec ${protocol}`); + if (!protocol) { + log( + `Peer does not register required protocols: ${peer.id.toString()}`, + protocols + ); + return; + } + + return { peer, protocol }; +} diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index bd3bc8e8c9..e2fa36ca06 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -11,7 +11,11 @@ import type { Libp2p } from "libp2p"; import { WakuMessage as WakuMessageProto } from "../../proto/message"; import { DefaultPubSubTopic } from "../constants"; import { selectConnection } from "../select_connection"; -import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { + getPeersForProtocol, + selectPeerForProtocol, + selectRandomPeer, +} from "../select_peer"; import { hexToBytes } from "../utils"; import { DecryptionMethod, WakuMessage } from "../waku_message"; @@ -228,23 +232,15 @@ export class WakuFilter { } private async getPeer(peerId?: PeerId): Promise { - let peer; - if (peerId) { - peer = await this.libp2p.peerStore.get(peerId); - if (!peer) { - throw new Error( - `Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}` - ); - } - } else { - peer = await this.randomPeer(); - if (!peer) { - throw new Error( - "Failed to find known peer that registers waku filter protocol" - ); - } + const res = await selectPeerForProtocol( + this.libp2p.peerStore, + [FilterCodec], + peerId + ); + if (!res) { + throw new Error(`Failed to select peer for ${FilterCodec}`); } - return peer; + return res.peer; } /** @@ -272,7 +268,7 @@ export class WakuFilter { } async peers(): Promise { - return getPeersForProtocol(this.libp2p, [FilterCodec]); + return getPeersForProtocol(this.libp2p.peerStore, [FilterCodec]); } async randomPeer(): Promise { diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index a5a36bcce7..0c3db3bf20 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -10,7 +10,11 @@ import { Uint8ArrayList } from "uint8arraylist"; import { PushResponse } from "../../proto/light_push"; import { DefaultPubSubTopic } from "../constants"; import { selectConnection } from "../select_connection"; -import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { + getPeersForProtocol, + selectPeerForProtocol, + selectRandomPeer, +} from "../select_peer"; import { WakuMessage } from "../waku_message"; import { PushRPC } from "./push_rpc"; @@ -51,16 +55,16 @@ export class WakuLightPush { message: WakuMessage, opts?: PushOptions ): Promise { - let peer; - if (opts?.peerId) { - peer = await this.libp2p.peerStore.get(opts.peerId); - if (!peer) throw "Peer is unknown"; - } else { - peer = await this.randomPeer(); + const res = await selectPeerForProtocol( + this.libp2p.peerStore, + [LightPushCodec], + opts?.peerId + ); + + if (!res) { + throw new Error("Failed to get a peer"); } - if (!peer) throw "No peer available"; - if (!peer.protocols.includes(LightPushCodec)) - throw "Peer does not register waku light push protocol"; + const { peer } = res; const connections = this.libp2p.connectionManager.getConnections(peer.id); const connection = selectConnection(connections); @@ -109,7 +113,7 @@ export class WakuLightPush { * peers. */ async peers(): Promise { - return getPeersForProtocol(this.libp2p, [LightPushCodec]); + return getPeersForProtocol(this.libp2p.peerStore, [LightPushCodec]); } /** diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 7b4195285a..07db951213 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -29,18 +29,22 @@ describe("Waku Store", () => { let waku: WakuFull; let nwaku: Nwaku; + beforeEach(async function () { + this.timeout(15_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ persistMessages: true, store: true, lightpush: true }); + }); + afterEach(async function () { !!nwaku && nwaku.stop(); !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); }); - it("Retrieves history", async function () { + it("Generator", async function () { this.timeout(15_000); + const totalMsgs = 20; - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); - - for (let i = 0; i < 2; i++) { + for (let i = 0; i < totalMsgs; i++) { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ @@ -57,20 +61,95 @@ describe("Waku Store", () => { await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages = await waku.store.queryHistory([]); - expect(messages?.length).eq(2); + const messages: WakuMessage[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); + + promises = promises.concat(_promises); + } + await Promise.all(promises); + + expect(messages?.length).eq(totalMsgs); const result = messages?.findIndex((msg) => { return msg.payloadAsUtf8 === "Message 0"; }); expect(result).to.not.eq(-1); }); - it("Retrieves history using callback", async function () { + it("Generator, no message returned", async function () { this.timeout(15_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: WakuMessage[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); + + promises = promises.concat(_promises); + } + await Promise.all(promises); + + expect(messages?.length).eq(0); + }); + + it("Callback on promise", async function () { + this.timeout(15_000); + + const totalMsgs = 15; + + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic, + }) + ) + ).to.be.true; + } + + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: WakuMessage[] = []; + await waku.store.queryCallbackOnPromise([], async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } + }); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === "Message 0"; + }); + expect(result).to.not.eq(-1); + }); + + it("Callback on promise, aborts when callback returns true", async function () { + this.timeout(15_000); const totalMsgs = 20; @@ -92,68 +171,28 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - let messages: WakuMessage[] = []; - - await waku.store.queryHistory([], { - callback: (_msgs) => { - messages = messages.concat(_msgs); - }, - }); - - expect(messages?.length).eq(totalMsgs); - const result = messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === "Message 0"; - }); - expect(result).to.not.eq(-1); - }); - - it("Retrieval aborts when callback returns true", async function () { - this.timeout(15_000); - - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); - - const availMsgs = 20; - - for (let i = 0; i < availMsgs; i++) { - expect( - await nwaku.sendMessage( - Nwaku.toMessageRpcQuery({ - payload: utf8ToBytes(`Message ${i}`), - contentTopic: TestContentTopic, - }) - ) - ).to.be.true; - } - - waku = await createFullNode({ - staticNoiseKey: NOISE_KEY_1, - }); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Store]); - - let messages: WakuMessage[] = []; const desiredMsgs = 14; - - await waku.store.queryHistory([], { - pageSize: 7, - callback: (_msgs) => { - messages = messages.concat(_msgs); + const messages: WakuMessage[] = []; + await waku.store.queryCallbackOnPromise( + [], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } return messages.length >= desiredMsgs; }, - }); + { pageSize: 7 } + ); expect(messages?.length).eq(desiredMsgs); }); - it("Retrieves all historical elements in chronological order through paging", async function () { + it("Ordered Callback - Forward", async function () { this.timeout(15_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); - - for (let i = 0; i < 15; i++) { + const totalMsgs = 18; + for (let i = 0; i < totalMsgs; i++) { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ @@ -171,12 +210,19 @@ describe("Waku Store", () => { await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const messages = await waku.store.queryHistory([], { - pageDirection: PageDirection.FORWARD, - }); + const messages: WakuMessage[] = []; + await waku.store.queryOrderedCallback( + [], + async (msg) => { + messages.push(msg); + }, + { + pageDirection: PageDirection.FORWARD, + } + ); - expect(messages?.length).eq(15); - for (let index = 0; index < 2; index++) { + expect(messages?.length).eq(totalMsgs); + for (let index = 0; index < totalMsgs; index++) { expect( messages?.findIndex((msg) => { return msg.payloadAsUtf8 === `Message ${index}`; @@ -185,147 +231,54 @@ describe("Waku Store", () => { } }); - it("Retrieves history using custom pubsub topic", async function () { + it("Ordered Callback - Backward", async function () { this.timeout(15_000); - const customPubSubTopic = "/waku/2/custom-dapp/proto"; - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ - persistMessages: true, - store: true, - topics: customPubSubTopic, - }); - - for (let i = 0; i < 2; i++) { + const totalMsgs = 18; + for (let i = 0; i < totalMsgs; i++) { expect( await nwaku.sendMessage( Nwaku.toMessageRpcQuery({ payload: utf8ToBytes(`Message ${i}`), contentTopic: TestContentTopic, - }), - customPubSubTopic + }) ) ).to.be.true; } waku = await createFullNode({ - pubSubTopic: customPubSubTopic, staticNoiseKey: NOISE_KEY_1, }); await waku.start(); await waku.dial(await nwaku.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); - const nimPeerId = await nwaku.getPeerId(); + let messages: WakuMessage[] = []; + await waku.store.queryOrderedCallback( + [], + async (msg) => { + messages.push(msg); + }, + { + pageDirection: PageDirection.BACKWARD, + } + ); - const messages = await waku.store.queryHistory([], { - peerId: nimPeerId, - }); + messages = messages.reverse(); - expect(messages?.length).eq(2); - const result = messages?.findIndex((msg) => { - return msg.payloadAsUtf8 === "Message 0"; - }); - expect(result).to.not.eq(-1); + expect(messages?.length).eq(totalMsgs); + for (let index = 0; index < totalMsgs; index++) { + expect( + messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === `Message ${index}`; + }) + ).to.eq(index); + } }); - it("Retrieves history with asymmetric & symmetric encrypted messages", async function () { + it("Generator, with asymmetric & symmetric encrypted messages", async function () { this.timeout(15_000); - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true, lightpush: true }); - - const encryptedAsymmetricMessageText = "asymmetric encryption"; - const encryptedSymmetricMessageText = "symmetric encryption"; - const clearMessageText = - "This is a clear text message for everyone to read"; - const otherEncMessageText = - "This message is not for and I must not be able to read it"; - - const privateKey = generatePrivateKey(); - const symKey = generateSymmetricKey(); - const publicKey = getPublicKey(privateKey); - - const [ - encryptedAsymmetricMessage, - encryptedSymmetricMessage, - clearMessage, - otherEncMessage, - ] = await Promise.all([ - WakuMessage.fromUtf8String( - encryptedAsymmetricMessageText, - TestContentTopic, - { - encPublicKey: publicKey, - } - ), - WakuMessage.fromUtf8String( - encryptedSymmetricMessageText, - TestContentTopic, - { - symKey: symKey, - } - ), - WakuMessage.fromUtf8String(clearMessageText, TestContentTopic), - WakuMessage.fromUtf8String(otherEncMessageText, TestContentTopic, { - encPublicKey: getPublicKey(generatePrivateKey()), - }), - ]); - - log("Messages have been encrypted"); - - const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([ - createFullNode({ - staticNoiseKey: NOISE_KEY_1, - }).then((waku) => waku.start().then(() => waku)), - createFullNode({ - staticNoiseKey: NOISE_KEY_2, - }).then((waku) => waku.start().then(() => waku)), - nwaku.getMultiaddrWithId(), - ]); - - log("Waku nodes created"); - - await Promise.all([ - waku1.dial(nimWakuMultiaddr), - waku2.dial(nimWakuMultiaddr), - ]); - - log("Waku nodes connected to nwaku"); - - await waitForRemotePeer(waku1, [Protocols.LightPush]); - - log("Sending messages using light push"); - await Promise.all([ - waku1.lightPush.push(encryptedAsymmetricMessage), - waku1.lightPush.push(encryptedSymmetricMessage), - waku1.lightPush.push(otherEncMessage), - waku1.lightPush.push(clearMessage), - ]); - - await waitForRemotePeer(waku2, [Protocols.Store]); - - waku2.store.addDecryptionKey(symKey); - - log("Retrieve messages from store"); - const messages = await waku2.store.queryHistory([], { - decryptionParams: [{ key: privateKey }], - }); - - expect(messages[0]?.payloadAsUtf8).to.eq(clearMessageText); - expect(messages[1]?.payloadAsUtf8).to.eq(encryptedSymmetricMessageText); - expect(messages[2]?.payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); - - !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); - !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); - }); - - it("Retrieves history with asymmetric & symmetric encrypted messages on different content topics", async function () { - this.timeout(15_000); - - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true, lightpush: true }); - const encryptedAsymmetricMessageText = "This message is encrypted for me using asymmetric"; const encryptedAsymmetricContentTopic = "/test/1/asymmetric/proto"; @@ -341,6 +294,7 @@ describe("Waku Store", () => { const symKey = generateSymmetricKey(); const publicKey = getPublicKey(privateKey); + const timestamp = new Date(); const [ encryptedAsymmetricMessage, encryptedSymmetricMessage, @@ -352,6 +306,7 @@ describe("Waku Store", () => { encryptedAsymmetricContentTopic, { encPublicKey: publicKey, + timestamp, } ), WakuMessage.fromUtf8String( @@ -359,17 +314,20 @@ describe("Waku Store", () => { encryptedSymmetricContentTopic, { symKey: symKey, + timestamp: new Date(timestamp.valueOf() + 1), } ), WakuMessage.fromUtf8String( clearMessageText, - encryptedAsymmetricContentTopic + encryptedAsymmetricContentTopic, + { timestamp: new Date(timestamp.valueOf() + 2) } ), WakuMessage.fromUtf8String( otherEncMessageText, encryptedSymmetricContentTopic, { encPublicKey: getPublicKey(generatePrivateKey()), + timestamp: new Date(timestamp.valueOf() + 3), } ), ]); @@ -412,26 +370,45 @@ describe("Waku Store", () => { method: DecryptionMethod.Symmetric, }); + const messages: WakuMessage[] = []; log("Retrieve messages from store"); - const messages = await waku2.store.queryHistory([], { + + for await (const msgPromises of waku2.store.queryGenerator([], { decryptionParams: [{ key: privateKey }], - }); + })) { + for (const promise of msgPromises) { + const msg = await promise; + if (msg) { + messages.push(msg); + } + } + } expect(messages?.length).eq(3); if (!messages) throw "Length was tested"; - expect(messages[0].payloadAsUtf8).to.eq(clearMessageText); + // Messages are ordered from oldest to latest within a page (1 page query) + expect(messages[0].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); expect(messages[1].payloadAsUtf8).to.eq(encryptedSymmetricMessageText); - expect(messages[2].payloadAsUtf8).to.eq(encryptedAsymmetricMessageText); + expect(messages[2].payloadAsUtf8).to.eq(clearMessageText); + + for (const text of [ + encryptedAsymmetricMessageText, + encryptedSymmetricMessageText, + clearMessageText, + ]) { + expect( + messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === text; + }) + ).to.not.eq(-1); + } !!waku1 && waku1.stop().catch((e) => console.log("Waku failed to stop", e)); !!waku2 && waku2.stop().catch((e) => console.log("Waku failed to stop", e)); }); - it("Retrieves history using start and end time", async function () { - this.timeout(15_000); - - nwaku = new Nwaku(makeLogFileName(this)); - await nwaku.start({ persistMessages: true, store: true }); + it("Ordered callback, using start and end time", async function () { + this.timeout(20000); const now = new Date(); @@ -473,23 +450,105 @@ describe("Waku Store", () => { const nwakuPeerId = await nwaku.getPeerId(); - const firstMessage = await waku.store.queryHistory([], { - peerId: nwakuPeerId, - timeFilter: { startTime, endTime: message1Timestamp }, - }); - - const bothMessages = await waku.store.queryHistory([], { - peerId: nwakuPeerId, - timeFilter: { - startTime, - endTime, + const firstMessages: WakuMessage[] = []; + await waku.store.queryOrderedCallback( + [], + (msg) => { + if (msg) { + firstMessages.push(msg); + } }, - }); + { + peerId: nwakuPeerId, + timeFilter: { startTime, endTime: message1Timestamp }, + } + ); - expect(firstMessage?.length).eq(1); + const bothMessages: WakuMessage[] = []; + await waku.store.queryOrderedCallback( + [], + async (msg) => { + bothMessages.push(msg); + }, + { + peerId: nwakuPeerId, + timeFilter: { + startTime, + endTime, + }, + } + ); - expect(firstMessage[0]?.payloadAsUtf8).eq("Message 0"); + expect(firstMessages?.length).eq(1); + + expect(firstMessages[0]?.payloadAsUtf8).eq("Message 0"); expect(bothMessages?.length).eq(2); }); }); + +describe("Waku Store, custom pubsub topic", () => { + const customPubSubTopic = "/waku/2/custom-dapp/proto"; + let waku: WakuFull; + let nwaku: Nwaku; + + beforeEach(async function () { + this.timeout(15_000); + nwaku = new Nwaku(makeLogFileName(this)); + await nwaku.start({ + persistMessages: true, + store: true, + topics: customPubSubTopic, + }); + }); + + afterEach(async function () { + !!nwaku && nwaku.stop(); + !!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e)); + }); + + it("Generator, custom pubsub topic", async function () { + this.timeout(15_000); + + const totalMsgs = 20; + for (let i = 0; i < totalMsgs; i++) { + expect( + await nwaku.sendMessage( + Nwaku.toMessageRpcQuery({ + payload: utf8ToBytes(`Message ${i}`), + contentTopic: TestContentTopic, + }), + customPubSubTopic + ) + ).to.be.true; + } + + waku = await createFullNode({ + staticNoiseKey: NOISE_KEY_1, + pubSubTopic: customPubSubTopic, + }); + await waku.start(); + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + const messages: WakuMessage[] = []; + let promises: Promise[] = []; + for await (const msgPromises of waku.store.queryGenerator([])) { + const _promises = msgPromises.map(async (promise) => { + const msg = await promise; + if (msg) { + messages.push(msg); + } + }); + + promises = promises.concat(_promises); + } + await Promise.all(promises); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === "Message 0"; + }); + expect(result).to.not.eq(-1); + }); +}); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 2970fe0111..bf1dada03e 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -1,3 +1,4 @@ +import type { Connection } from "@libp2p/interface-connection"; import type { PeerId } from "@libp2p/interface-peer-id"; import { Peer } from "@libp2p/interface-peer-store"; import debug from "debug"; @@ -11,7 +12,7 @@ import * as protoV2Beta4 from "../../proto/store_v2beta4"; import { HistoryResponse } from "../../proto/store_v2beta4"; import { DefaultPubSubTopic, StoreCodecs } from "../constants"; import { selectConnection } from "../select_connection"; -import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { getPeersForProtocol, selectPeerForProtocol } from "../select_peer"; import { hexToBytes } from "../utils"; import { DecryptionMethod, @@ -19,9 +20,9 @@ import { WakuMessage, } from "../waku_message"; -import { HistoryRPC, PageDirection } from "./history_rpc"; +import { HistoryRPC, PageDirection, Params } from "./history_rpc"; -import Error = HistoryResponse.HistoryError; +import HistoryError = HistoryResponse.HistoryError; const log = debug("waku:store"); @@ -77,18 +78,6 @@ export interface QueryOptions { * Retrieve messages with a timestamp within the provided values. */ timeFilter?: TimeFilter; - /** - * Callback called on pages of stored messages as they are retrieved. - * - * Allows for a faster access to the results as it is called as soon as a page - * is received. Traversal of the pages is done automatically so this function - * will invoked for each retrieved page. - * - * If the call on a page returns `true`, then traversal of the pages is aborted. - * For example, this can be used for the caller to stop the query after a - * specific message is found. - */ - callback?: (messages: WakuMessage[]) => void | boolean; /** * Keys that will be used to decrypt messages. * @@ -119,17 +108,95 @@ export class WakuStore { /** * Do a query to a Waku Store to retrieve historical/missed messages. * - * @param contentTopics The content topics to pass to the query, leave empty to - * retrieve all messages. - * @param options Optional parameters. + * The callback function takes a `WakuMessage` in input, + * messages are processed in order: + * - oldest to latest if `options.pageDirection` == { @link PageDirection.FORWARD } + * - latest to oldest if `options.pageDirection` == { @link PageDirection.BACKWARD } + * + * The ordering may affect performance. * * @throws If not able to reach a Waku Store peer to query * or if an error is encountered when processing the reply. */ - async queryHistory( + async queryOrderedCallback( + contentTopics: string[], + callback: ( + message: WakuMessage + ) => Promise | boolean | void, + options?: QueryOptions + ): Promise { + const abort = false; + for await (const promises of this.queryGenerator(contentTopics, options)) { + if (abort) break; + let messages = await Promise.all(promises); + + messages = messages.filter(isWakuMessageDefined); + + // Messages in pages are ordered from oldest (first) to most recent (last). + // https://github.com/vacp2p/rfc/issues/533 + if ( + typeof options?.pageDirection === "undefined" || + options?.pageDirection === PageDirection.BACKWARD + ) { + messages = messages.reverse(); + } + + await Promise.all( + messages.map((msg) => { + if (!abort) { + if (msg) return callback(msg); + } + }) + ); + } + } + + /** + * Do a query to a Waku Store to retrieve historical/missed messages. + * + * The callback function takes a `Promise` in input, + * useful if messages needs to be decrypted and performance matters. + * **Order of messages is not guaranteed**. + * + * @returns the promises of the callback call. + * + * @throws If not able to reach a Waku Store peer to query + * or if an error is encountered when processing the reply. + */ + async queryCallbackOnPromise( + contentTopics: string[], + callback: ( + message: Promise + ) => Promise | boolean | void, + options?: QueryOptions + ): Promise>> { + let abort = false; + let promises: Promise[] = []; + for await (const page of this.queryGenerator(contentTopics, options)) { + const _promises = page.map(async (msg) => { + if (!abort) { + abort = Boolean(await callback(msg)); + } + }); + + promises = promises.concat(_promises); + } + return promises; + } + + /** + * Do a query to a Waku Store to retrieve historical/missed messages. + * + * This is a generator, useful if you want most control on how messages + * are processed. + * + * @throws If not able to reach a Waku Store peer to query + * or if an error is encountered when processing the reply. + */ + async *queryGenerator( contentTopics: string[], options?: QueryOptions - ): Promise { + ): AsyncGenerator[]> { let startTime, endTime; if (options?.timeFilter) { @@ -137,7 +204,7 @@ export class WakuStore { endTime = options.timeFilter.endTime; } - const opts = Object.assign( + const queryOpts = Object.assign( { pubSubTopic: this.pubSubTopic, pageDirection: PageDirection.BACKWARD, @@ -152,29 +219,17 @@ export class WakuStore { ...options, }); - let peer; - if (opts.peerId) { - peer = await this.libp2p.peerStore.get(opts.peerId); - if (!peer) - throw `Failed to retrieve connection details for provided peer in peer store: ${opts.peerId.toString()}`; - } else { - peer = await this.randomPeer(); - if (!peer) - throw "Failed to find known peer that registers waku store protocol"; - } + const res = await selectPeerForProtocol( + this.libp2p.peerStore, + Object.values(StoreCodecs), + options?.peerId + ); - let storeCodec = ""; - for (const codec of Object.values(StoreCodecs)) { - if (peer.protocols.includes(codec)) { - storeCodec = codec; - // Do not break as we want to keep the last value - } + if (!res) { + throw new Error("Failed to get a peer"); } - log(`Use store codec ${storeCodec}`); - if (!storeCodec) - throw `Peer does not register waku store protocol: ${peer.id.toString()}`; + const { peer, protocol } = res; - Object.assign(opts, { storeCodec }); const connections = this.libp2p.connectionManager.getConnections(peer.id); const connection = selectConnection(connections); @@ -192,94 +247,23 @@ export class WakuStore { // Add the decryption keys passed to this function against the // content topics also passed to this function. - if (opts.decryptionParams) { - decryptionParams = decryptionParams.concat(opts.decryptionParams); + if (options?.decryptionParams) { + decryptionParams = decryptionParams.concat(options.decryptionParams); } - const messages: WakuMessage[] = []; - let cursor = undefined; - while (true) { - const stream = await connection.newStream(storeCodec); - const queryOpts = Object.assign(opts, { cursor }); - const historyRpcQuery = HistoryRPC.createQuery(queryOpts); - log("Querying store peer", connections[0].remoteAddr.toString()); - - const res = await pipe( - [historyRpcQuery.encode()], - lp.encode(), - stream, - lp.decode(), - async (source) => await all(source) - ); - const bytes = new Uint8ArrayList(); - res.forEach((chunk) => { - bytes.append(chunk); - }); - - const reply = historyRpcQuery.decode(bytes); - - if (!reply.response) { - log("No message returned from store: `response` field missing"); - return messages; - } - - const response = reply.response as protoV2Beta4.HistoryResponse; - - if (response.error && response.error !== Error.ERROR_NONE_UNSPECIFIED) { - throw "History response contains an Error: " + response.error; - } - - if (!response.messages || !response.messages.length) { - // No messages left (or stored) - log("No message returned from store: `messages` array empty"); - return messages; - } - - log( - `${response.messages.length} messages retrieved for (${opts.pubSubTopic})`, - contentTopics - ); - - const pageMessages: WakuMessage[] = []; - await Promise.all( - response.messages.map(async (protoMsg) => { - const msg = await WakuMessage.decodeProto(protoMsg, decryptionParams); - - if (msg) { - messages.push(msg); - pageMessages.push(msg); - } - }) - ); - - let abort = false; - if (opts.callback) { - abort = Boolean(opts.callback(pageMessages)); - } - - const responsePageSize = response.pagingInfo?.pageSize; - const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; - if ( - abort || - // Response page size smaller than query, meaning this is the last page - (responsePageSize && queryPageSize && responsePageSize < queryPageSize) - ) { - return messages; - } - - cursor = response.pagingInfo?.cursor; - if (cursor === undefined) { - // If the server does not return cursor then there is an issue, - // Need to abort, or we end up in an infinite loop - log("Store response does not contain a cursor, stopping pagination"); - return messages; - } + for await (const messages of paginate( + connection, + protocol, + queryOpts, + decryptionParams + )) { + yield messages; } } /** * Register a decryption key to attempt decryption of messages received in any - * subsequent { @link queryHistory } call. This can either be a private key for + * subsequent query call. This can either be a private key for * asymmetric encryption or a symmetric key. { @link WakuStore } will attempt to * decrypt messages using both methods. * @@ -294,7 +278,7 @@ export class WakuStore { /**cursorV2Beta4 * Delete a decryption key that was used to attempt decryption of messages - * received in subsequent { @link queryHistory } calls. + * received in subsequent query calls. * * Strings must be in hex format. */ @@ -312,15 +296,97 @@ export class WakuStore { codecs.push(codec); } - return getPeersForProtocol(this.libp2p, codecs); - } - - /** - * Returns a random peer that supports store protocol from the address - * book (`libp2p.peerStore`). Waku may or may not be currently connected to - * this peer. - */ - async randomPeer(): Promise { - return selectRandomPeer(await this.peers()); + return getPeersForProtocol(this.libp2p.peerStore, codecs); } } + +async function* paginate( + connection: Connection, + protocol: string, + queryOpts: Params, + decryptionParams: DecryptionParams[] +): AsyncGenerator[]> { + let cursor = undefined; + while (true) { + queryOpts = Object.assign(queryOpts, { cursor }); + + const stream = await connection.newStream(protocol); + const historyRpcQuery = HistoryRPC.createQuery(queryOpts); + + log( + "Querying store peer", + connection.remoteAddr.toString(), + `for (${queryOpts.pubSubTopic})`, + queryOpts.contentTopics + ); + + const res = await pipe( + [historyRpcQuery.encode()], + lp.encode(), + stream, + lp.decode(), + async (source) => await all(source) + ); + + const bytes = new Uint8ArrayList(); + res.forEach((chunk) => { + bytes.append(chunk); + }); + + const reply = historyRpcQuery.decode(bytes); + + if (!reply.response) { + log("Stopping pagination due to store `response` field missing"); + break; + } + + const response = reply.response as protoV2Beta4.HistoryResponse; + + if ( + response.error && + response.error !== HistoryError.ERROR_NONE_UNSPECIFIED + ) { + throw "History response contains an Error: " + response.error; + } + + if (!response.messages || !response.messages.length) { + log( + "Stopping pagination due to store `response.messages` field missing or empty" + ); + break; + } + + log(`${response.messages.length} messages retrieved from store`); + + yield response.messages.map((protoMsg) => + WakuMessage.decodeProto(protoMsg, decryptionParams) + ); + + cursor = response.pagingInfo?.cursor; + if (typeof cursor === "undefined") { + // If the server does not return cursor then there is an issue, + // Need to abort, or we end up in an infinite loop + log( + "Stopping pagination due to `response.pagingInfo.cursor` missing from store response" + ); + break; + } + + const responsePageSize = response.pagingInfo?.pageSize; + const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; + if ( + // Response page size smaller than query, meaning this is the last page + responsePageSize && + queryPageSize && + responsePageSize < queryPageSize + ) { + break; + } + } +} + +export const isWakuMessageDefined = ( + msg: WakuMessage | undefined +): msg is WakuMessage => { + return !!msg; +};