From 8679adcf80b78fd9b4ed03ed87ab65610c4dfb46 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Mon, 19 Sep 2022 16:33:07 +1000 Subject: [PATCH] feat: enable store queries with multiple content topics and decoders --- src/lib/waku_store/index.node.spec.ts | 53 +++++++++------------- src/lib/waku_store/index.ts | 65 ++++++++++++++++++++------- 2 files changed, 69 insertions(+), 49 deletions(-) diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 8156d4469c..ecbec36309 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -72,7 +72,7 @@ describe("Waku Store", () => { const messages: Message[] = []; let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; if (msg) { @@ -103,7 +103,7 @@ describe("Waku Store", () => { const messages: Message[] = []; let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; if (msg) { @@ -142,12 +142,15 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const messages: Message[] = []; - await waku.store.queryCallbackOnPromise(TestDecoder, async (msgPromise) => { - const msg = await msgPromise; - if (msg) { - messages.push(msg); + await waku.store.queryCallbackOnPromise( + [TestDecoder], + async (msgPromise) => { + const msg = await msgPromise; + if (msg) { + messages.push(msg); + } } - }); + ); expect(messages?.length).eq(totalMsgs); const result = messages?.findIndex((msg) => { @@ -182,7 +185,7 @@ describe("Waku Store", () => { const desiredMsgs = 14; const messages: Message[] = []; await waku.store.queryCallbackOnPromise( - TestDecoder, + [TestDecoder], async (msgPromise) => { const msg = await msgPromise; if (msg) { @@ -220,7 +223,7 @@ describe("Waku Store", () => { const messages: Message[] = []; await waku.store.queryOrderedCallback( - TestDecoder, + [TestDecoder], async (msg) => { messages.push(msg); }, @@ -263,7 +266,7 @@ describe("Waku Store", () => { let messages: Message[] = []; await waku.store.queryOrderedCallback( - TestDecoder, + [TestDecoder], async (msg) => { messages.push(msg); }, @@ -361,25 +364,11 @@ describe("Waku Store", () => { const messages: Message[] = []; log("Retrieve messages from store"); - for await (const msgPromises of waku2.store.queryGenerator(asymDecoder)) { - for (const promise of msgPromises) { - const msg = await promise; - if (msg) { - messages.push(msg); - } - } - } - - for await (const msgPromises of waku2.store.queryGenerator(symDecoder)) { - for (const promise of msgPromises) { - const msg = await promise; - if (msg) { - messages.push(msg); - } - } - } - - for await (const msgPromises of waku2.store.queryGenerator(TestDecoder)) { + for await (const msgPromises of waku2.store.queryGenerator([ + asymDecoder, + symDecoder, + TestDecoder, + ])) { for (const promise of msgPromises) { const msg = await promise; if (msg) { @@ -443,7 +432,7 @@ describe("Waku Store", () => { const firstMessages: Message[] = []; await waku.store.queryOrderedCallback( - TestDecoder, + [TestDecoder], (msg) => { if (msg) { firstMessages.push(msg); @@ -457,7 +446,7 @@ describe("Waku Store", () => { const bothMessages: Message[] = []; await waku.store.queryOrderedCallback( - TestDecoder, + [TestDecoder], async (msg) => { bothMessages.push(msg); }, @@ -524,7 +513,7 @@ describe("Waku Store, custom pubsub topic", () => { const messages: Message[] = []; let promises: Promise[] = []; - for await (const msgPromises of waku.store.queryGenerator(TestDecoder)) { + for await (const msgPromises of waku.store.queryGenerator([TestDecoder])) { const _promises = msgPromises.map(async (promise) => { const msg = await promise; if (msg) { diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 8874942c45..5f51f63e93 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -100,16 +100,17 @@ export class WakuStore { * If strong ordering is needed, you may need to handle this at application level * and set your own timestamps too (the WakuMessage timestamps are not certified). * - * @throws If not able to reach a Waku Store peer to query - * or if an error is encountered when processing the reply. + * @throws If not able to reach a Waku Store peer to query, + * or if an error is encountered when processing the reply, + * or if two decoders with the same content topic are passed. */ async queryOrderedCallback( - decoder: Decoder, + decoders: Decoder[], callback: (message: Message) => Promise | boolean | void, options?: QueryOptions ): Promise { const abort = false; - for await (const promises of this.queryGenerator(decoder, options)) { + for await (const promises of this.queryGenerator(decoders, options)) { if (abort) break; let messages = await Promise.all(promises); @@ -148,11 +149,12 @@ export class WakuStore { * break the order as it may rely on the browser decryption API, which in turn, * may have a different speed depending on the type of decryption. * - * @throws If not able to reach a Waku Store peer to query - * or if an error is encountered when processing the reply. + * @throws If not able to reach a Waku Store peer to query, + * or if an error is encountered when processing the reply, + * or if two decoders with the same content topic are passed. */ async queryCallbackOnPromise( - decoder: Decoder, + decoders: Decoder[], callback: ( message: Promise ) => Promise | boolean | void, @@ -160,7 +162,7 @@ export class WakuStore { ): Promise { let abort = false; let promises: Promise[] = []; - for await (const page of this.queryGenerator(decoder, options)) { + for await (const page of this.queryGenerator(decoders, options)) { const _promises = page.map(async (msg) => { if (!abort) { abort = Boolean(await callback(msg)); @@ -185,11 +187,12 @@ export class WakuStore { * * However, there is no way to guarantee the behavior of the remote node. * - * @throws If not able to reach a Waku Store peer to query - * or if an error is encountered when processing the reply. + * @throws If not able to reach a Waku Store peer to query, + * or if an error is encountered when processing the reply, + * or if two decoders with the same content topic are passed. */ async *queryGenerator( - decoder: Decoder, + decoders: Decoder[], options?: QueryOptions ): AsyncGenerator[]> { let startTime, endTime; @@ -199,7 +202,17 @@ export class WakuStore { endTime = options.timeFilter.endTime; } - const contentTopic = decoder.contentTopic; + const decodersAsMap = new Map(); + decoders.forEach((dec) => { + if (decodersAsMap.has(dec.contentTopic)) { + throw new Error( + "API does not support different decoder per content topic" + ); + } + decodersAsMap.set(dec.contentTopic, dec); + }); + + const contentTopics = decoders.map((dec) => dec.contentTopic); const queryOpts = Object.assign( { @@ -208,7 +221,7 @@ export class WakuStore { pageSize: DefaultPageSize, }, options, - { contentTopics: [contentTopic], startTime, endTime } + { contentTopics, startTime, endTime } ); log("Querying history with the following options", { @@ -236,7 +249,7 @@ export class WakuStore { connection, protocol, queryOpts, - decoder + decodersAsMap )) { yield messages; } @@ -260,8 +273,17 @@ async function* paginate( connection: Connection, protocol: string, queryOpts: Params, - decoder: Decoder -): AsyncGenerator[]> { + decoders: Map> +): AsyncGenerator[]> { + if ( + queryOpts.contentTopics.toString() !== + Array.from(decoders.keys()).toString() + ) { + throw new Error( + "Internal error, the decoders should match the query's content topics" + ); + } + let cursor = undefined; while (true) { queryOpts = Object.assign(queryOpts, { cursor }); @@ -314,7 +336,16 @@ async function* paginate( log(`${response.messages.length} messages retrieved from store`); - yield response.messages.map((protoMsg) => decoder.decode(protoMsg)); + yield response.messages.map((protoMsg) => { + const contentTopic = protoMsg.contentTopic; + if (typeof contentTopic !== "undefined") { + const decoder = decoders.get(contentTopic); + if (decoder) { + return decoder.decode(protoMsg); + } + } + return Promise.resolve(undefined); + }); cursor = response.pagingInfo?.cursor; if (typeof cursor === "undefined") {