fix: `queryOrderedCallback` not stopping when callback returns true

Fixes #978.
This commit is contained in:
fryorcraken.eth 2022-10-28 09:28:16 +11:00
parent f3bd7a7ba5
commit 0cca9a73a1
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
3 changed files with 42 additions and 4 deletions

View File

@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Fixed
- Incorrect cursor encoding in Store queries.
- `WakuStore.queryOrderedCallback` not stopping when callback returns true.
### Removed

View File

@ -465,6 +465,43 @@ describe("Waku Store", () => {
expect(bothMessages?.length).eq(2);
});
it("Ordered callback, aborts when callback returns true", async function () {
this.timeout(15_000);
const totalMsgs = 20;
for (let i = 0; i < totalMsgs; i++) {
expect(
await nwaku.sendMessage(
Nwaku.toMessageRpcQuery({
payload: utf8ToBytes(`Message ${i}`),
contentTopic: TestContentTopic,
})
)
).to.be.true;
}
waku = await createFullNode({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Store]);
const desiredMsgs = 14;
const messages: Message[] = [];
await waku.store.queryOrderedCallback(
[TestDecoder],
async (msg) => {
messages.push(msg);
return messages.length >= desiredMsgs;
},
{ pageSize: 7 }
);
expect(messages?.length).eq(desiredMsgs);
});
});
describe("Waku Store, custom pubsub topic", () => {

View File

@ -112,7 +112,7 @@ export class WakuStore {
callback: (message: T) => Promise<void | boolean> | boolean | void,
options?: QueryOptions
): Promise<void> {
const abort = false;
let abort = false;
for await (const promises of this.queryGenerator(decoders, options)) {
if (abort) break;
const messagesOrUndef: Array<T | undefined> = await Promise.all(promises);
@ -129,9 +129,9 @@ export class WakuStore {
}
await Promise.all(
messages.map((msg) => {
if (!abort) {
if (msg) return callback(msg);
messages.map(async (msg) => {
if (msg && !abort) {
abort = Boolean(await callback(msg));
}
})
);