diff --git a/CHANGELOG.md b/CHANGELOG.md index ea8e2166df..3adf660eea 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- If the `callback` function passed to`WakuStore.queryHistory` returns `true`, then no further pages are retrieved from the store. + ### Changed - **Breaking**: Renamed `WakuStore.QueryOptions`'s `direction` to `pageDirection` (and its type) as it only affects the page ordering, not the ordering of messages with the page. diff --git a/src/lib/waku_store/index.node.spec.ts b/src/lib/waku_store/index.node.spec.ts index 6fb739a0ac..7ec29842d0 100644 --- a/src/lib/waku_store/index.node.spec.ts +++ b/src/lib/waku_store/index.node.spec.ts @@ -68,6 +68,89 @@ describe('Waku Store', () => { expect(result).to.not.eq(-1); }); + it('Retrieves history using callback', async function () { + this.timeout(5_000); + + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ persistMessages: true }); + + const totalMsgs = 20; + + for (let i = 0; i < totalMsgs; i++) { + expect( + await nimWaku.sendMessage( + await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic) + ) + ).to.be.true; + } + + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + let messages: WakuMessage[] = []; + + await waku.store.queryHistory([], { + callback: (_msgs) => { + messages = messages.concat(_msgs); + }, + }); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === 'Message 0'; + }); + expect(result).to.not.eq(-1); + }); + + it('Retrieval aborts when callback returns true', async function () { + this.timeout(5_000); + + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ persistMessages: true }); + + const availMsgs = 20; + + for (let i = 0; i < availMsgs; i++) { + expect( + await nimWaku.sendMessage( + await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic) + ) + ).to.be.true; + } + + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + let messages: WakuMessage[] = []; + const desiredMsgs = 14; + + await waku.store.queryHistory([], { + pageSize: 7, + callback: (_msgs) => { + messages = messages.concat(_msgs); + return messages.length >= desiredMsgs; + }, + }); + + expect(messages?.length).eq(desiredMsgs); + }); + it('Retrieves all historical elements in chronological order through paging', async function () { this.timeout(5_000); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 613be4d4c1..5e149e2d95 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -72,12 +72,16 @@ export interface QueryOptions { timeFilter?: TimeFilter; /** * Callback called on pages of stored messages as they are retrieved. + * * Allows for a faster access to the results as it is called as soon as a page - * is received. - * Traversal of the pages is done automatically so this function will invoked - * for each retrieved page. + * is received. Traversal of the pages is done automatically so this function + * will invoked for each retrieved page. + * + * If the call on a page returns `true`, then traversal of the pages is aborted. + * For example, this can be used for the caller to stop the query after a + * specific message is found. */ - callback?: (messages: WakuMessage[]) => void; + callback?: (messages: WakuMessage[]) => void | boolean; /** * Keys that will be used to decrypt messages. * @@ -211,20 +215,18 @@ export class WakuStore { }) ); + let abort = false; if (opts.callback) { - // TODO: Test the callback feature - // TODO: Change callback to take individual messages - opts.callback(pageMessages); + abort = Boolean(opts.callback(pageMessages)); } const responsePageSize = response.pagingInfo?.pageSize; const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; if ( - responsePageSize && - queryPageSize && - responsePageSize < queryPageSize - ) { + abort || // Response page size smaller than query, meaning this is the last page + (responsePageSize && queryPageSize && responsePageSize < queryPageSize) + ) { return messages; }