Enable caller to abort store query

If the `callback` function passed to`WakuStore.queryHistory` returns
`true`, then no further pages are retrieved from the store.
This commit is contained in:
Franck Royer 2021-10-07 15:33:00 +11:00
parent ec8d7ba9dc
commit 8d469ff842
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
3 changed files with 99 additions and 11 deletions

View File

@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased] ## [Unreleased]
### Added
- If the `callback` function passed to`WakuStore.queryHistory` returns `true`, then no further pages are retrieved from the store.
### Changed ### Changed
- **Breaking**: Renamed `WakuStore.QueryOptions`'s `direction` to `pageDirection` (and its type) as it only affects the page ordering, - **Breaking**: Renamed `WakuStore.QueryOptions`'s `direction` to `pageDirection` (and its type) as it only affects the page ordering,
not the ordering of messages with the page. not the ordering of messages with the page.

View File

@ -68,6 +68,89 @@ describe('Waku Store', () => {
expect(result).to.not.eq(-1); expect(result).to.not.eq(-1);
}); });
it('Retrieves history using callback', async function () {
this.timeout(5_000);
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ persistMessages: true });
const totalMsgs = 20;
for (let i = 0; i < totalMsgs; i++) {
expect(
await nimWaku.sendMessage(
await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic)
)
).to.be.true;
}
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
libp2p: { modules: { transport: [TCP] } },
});
await waku.dial(await nimWaku.getMultiaddrWithId());
// Wait for identify protocol to finish
await new Promise((resolve) => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
let messages: WakuMessage[] = [];
await waku.store.queryHistory([], {
callback: (_msgs) => {
messages = messages.concat(_msgs);
},
});
expect(messages?.length).eq(totalMsgs);
const result = messages?.findIndex((msg) => {
return msg.payloadAsUtf8 === 'Message 0';
});
expect(result).to.not.eq(-1);
});
it('Retrieval aborts when callback returns true', async function () {
this.timeout(5_000);
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ persistMessages: true });
const availMsgs = 20;
for (let i = 0; i < availMsgs; i++) {
expect(
await nimWaku.sendMessage(
await WakuMessage.fromUtf8String(`Message ${i}`, TestContentTopic)
)
).to.be.true;
}
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
libp2p: { modules: { transport: [TCP] } },
});
await waku.dial(await nimWaku.getMultiaddrWithId());
// Wait for identify protocol to finish
await new Promise((resolve) => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
let messages: WakuMessage[] = [];
const desiredMsgs = 14;
await waku.store.queryHistory([], {
pageSize: 7,
callback: (_msgs) => {
messages = messages.concat(_msgs);
return messages.length >= desiredMsgs;
},
});
expect(messages?.length).eq(desiredMsgs);
});
it('Retrieves all historical elements in chronological order through paging', async function () { it('Retrieves all historical elements in chronological order through paging', async function () {
this.timeout(5_000); this.timeout(5_000);

View File

@ -72,12 +72,16 @@ export interface QueryOptions {
timeFilter?: TimeFilter; timeFilter?: TimeFilter;
/** /**
* Callback called on pages of stored messages as they are retrieved. * Callback called on pages of stored messages as they are retrieved.
*
* Allows for a faster access to the results as it is called as soon as a page * Allows for a faster access to the results as it is called as soon as a page
* is received. * is received. Traversal of the pages is done automatically so this function
* Traversal of the pages is done automatically so this function will invoked * will invoked for each retrieved page.
* for each retrieved page. *
* If the call on a page returns `true`, then traversal of the pages is aborted.
* For example, this can be used for the caller to stop the query after a
* specific message is found.
*/ */
callback?: (messages: WakuMessage[]) => void; callback?: (messages: WakuMessage[]) => void | boolean;
/** /**
* Keys that will be used to decrypt messages. * Keys that will be used to decrypt messages.
* *
@ -211,20 +215,18 @@ export class WakuStore {
}) })
); );
let abort = false;
if (opts.callback) { if (opts.callback) {
// TODO: Test the callback feature abort = Boolean(opts.callback(pageMessages));
// TODO: Change callback to take individual messages
opts.callback(pageMessages);
} }
const responsePageSize = response.pagingInfo?.pageSize; const responsePageSize = response.pagingInfo?.pageSize;
const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize; const queryPageSize = historyRpcQuery.query?.pagingInfo?.pageSize;
if ( if (
responsePageSize && abort ||
queryPageSize &&
responsePageSize < queryPageSize
) {
// Response page size smaller than query, meaning this is the last page // Response page size smaller than query, meaning this is the last page
(responsePageSize && queryPageSize && responsePageSize < queryPageSize)
) {
return messages; return messages;
} }