diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 7583caac72..05068c1358 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -86,6 +86,54 @@ class Store extends BaseProtocol implements IStore { this.options = options ?? {}; } + /** + * Processes messages based on the provided callback and options. + * @private + */ + private async processMessages( + messages: Promise[], + callback: (message: T) => Promise | boolean | void, + options?: QueryOptions + ): Promise { + let abort = false; + const messagesOrUndef: Array = await Promise.all(messages); + let processedMessages: Array = messagesOrUndef.filter(isDefined); + + if (this.shouldReverseOrder(options)) { + processedMessages = processedMessages.reverse(); + } + + await Promise.all( + processedMessages.map(async (msg) => { + if (msg && !abort) { + abort = Boolean(await callback(msg)); + } + }) + ); + + return abort; + } + + /** + * Determines whether to reverse the order of messages based on the provided options. + * + * Messages in pages are ordered from oldest (first) to most recent (last). + * https://github.com/vacp2p/rfc/issues/533 + * + * @private + */ + private shouldReverseOrder(options?: QueryOptions): boolean { + return ( + typeof options?.pageDirection === "undefined" || + options?.pageDirection === PageDirection.BACKWARD + ); + } + + /** + * @deprecated Use `queryWithOrderedCallback` instead + **/ + queryOrderedCallback = this.queryWithOrderedCallback; + /** * Do a query to a Waku Store to retrieve historical/missed messages. * @@ -103,42 +151,20 @@ class Store extends BaseProtocol implements IStore { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ - async queryOrderedCallback( + async queryWithOrderedCallback( decoders: IDecoder[], callback: (message: T) => Promise | boolean | void, options?: QueryOptions ): Promise { - let abort = false; for await (const promises of this.queryGenerator(decoders, options)) { - if (abort) break; - const messagesOrUndef: Array = await Promise.all(promises); - - let messages: Array = messagesOrUndef.filter(isDefined); - - // Messages in pages are ordered from oldest (first) to most recent (last). - // https://github.com/vacp2p/rfc/issues/533 - if ( - typeof options?.pageDirection === "undefined" || - options?.pageDirection === PageDirection.BACKWARD - ) { - messages = messages.reverse(); - } - - await Promise.all( - messages.map(async (msg) => { - if (msg && !abort) { - abort = Boolean(await callback(msg)); - } - }) - ); + if (await this.processMessages(promises, callback, options)) break; } } /** * Do a query to a Waku Store to retrieve historical/missed messages. - * * The callback function takes a `Promise` in input, - * useful if messages needs to be decrypted and performance matters. + * useful if messages need to be decrypted and performance matters. * * The order of the messages passed to the callback is as follows: * - within a page, messages are expected to be ordered from oldest to most recent @@ -152,7 +178,7 @@ class Store extends BaseProtocol implements IStore { * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. */ - async queryCallbackOnPromise( + async queryWithPromiseCallback( decoders: IDecoder[], callback: ( message: Promise @@ -160,17 +186,15 @@ class Store extends BaseProtocol implements IStore { options?: QueryOptions ): Promise { let abort = false; - let promises: Promise[] = []; for await (const page of this.queryGenerator(decoders, options)) { - const _promises = page.map(async (msg) => { - if (!abort) { - abort = Boolean(await callback(msg)); - } + const _promises = page.map(async (msgPromise) => { + if (abort) return; + abort = Boolean(await callback(msgPromise)); }); - promises = promises.concat(_promises); + await Promise.all(_promises); + if (abort) break; } - await Promise.all(promises); } /** @@ -183,9 +207,6 @@ class Store extends BaseProtocol implements IStore { * as follows: * - within a page, messages SHOULD be ordered from oldest to most recent * - pages direction depends on { @link QueryOptions.pageDirection } - * - * However, there is no way to guarantee the behavior of the remote node. - * * @throws If not able to reach a Waku Store peer to query, * or if an error is encountered when processing the reply, * or if two decoders with the same content topic are passed. diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts index e3ad74c74d..aea825b3b2 100644 --- a/packages/interfaces/src/store.ts +++ b/packages/interfaces/src/store.ts @@ -46,12 +46,12 @@ export type StoreQueryOptions = { } & ProtocolOptions; export interface IStore extends IBaseProtocol { - queryOrderedCallback: ( + queryWithOrderedCallback: ( decoders: IDecoder[], callback: (message: T) => Promise | boolean | void, options?: StoreQueryOptions ) => Promise; - queryCallbackOnPromise: ( + queryWithPromiseCallback: ( decoders: IDecoder[], callback: ( message: Promise diff --git a/packages/tests/tests/store.node.spec.ts b/packages/tests/tests/store.node.spec.ts index f75d1b5935..cde93d098c 100644 --- a/packages/tests/tests/store.node.spec.ts +++ b/packages/tests/tests/store.node.spec.ts @@ -204,7 +204,7 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const messages: IMessage[] = []; - await waku.store.queryCallbackOnPromise( + await waku.store.queryWithPromiseCallback( [TestDecoder], async (msgPromise) => { const msg = await msgPromise; @@ -246,7 +246,7 @@ describe("Waku Store", () => { const desiredMsgs = 14; const messages: IMessage[] = []; - await waku.store.queryCallbackOnPromise( + await waku.store.queryWithPromiseCallback( [TestDecoder], async (msgPromise) => { const msg = await msgPromise; @@ -285,7 +285,7 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); const messages: IMessage[] = []; - await waku.store.queryOrderedCallback( + await waku.store.queryWithOrderedCallback( [TestDecoder], async (msg) => { messages.push(msg); @@ -324,7 +324,7 @@ describe("Waku Store", () => { await waitForRemotePeer(waku, [Protocols.Store]); let messages: IMessage[] = []; - await waku.store.queryOrderedCallback( + await waku.store.queryWithOrderedCallback( [TestDecoder], async (msg) => { messages.push(msg); @@ -491,7 +491,7 @@ describe("Waku Store", () => { const nwakuPeerId = await nwaku.getPeerId(); const firstMessages: IMessage[] = []; - await waku.store.queryOrderedCallback( + await waku.store.queryWithOrderedCallback( [TestDecoder], (msg) => { if (msg) { @@ -505,7 +505,7 @@ describe("Waku Store", () => { ); const bothMessages: IMessage[] = []; - await waku.store.queryOrderedCallback( + await waku.store.queryWithOrderedCallback( [TestDecoder], async (msg) => { bothMessages.push(msg); @@ -552,7 +552,7 @@ describe("Waku Store", () => { const desiredMsgs = 14; const messages: IMessage[] = []; - await waku.store.queryOrderedCallback( + await waku.store.queryWithOrderedCallback( [TestDecoder], async (msg) => { messages.push(msg);