mirror of https://github.com/waku-org/js-waku.git
Merge pull request #981 from waku-org/fix/abort-callback
This commit is contained in:
commit
d6b7aee989
|
@ -19,6 +19,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
### Fixed
|
### Fixed
|
||||||
|
|
||||||
- Incorrect cursor encoding in Store queries.
|
- Incorrect cursor encoding in Store queries.
|
||||||
|
- `WakuStore.queryOrderedCallback` not stopping when callback returns true.
|
||||||
|
|
||||||
### Removed
|
### Removed
|
||||||
|
|
||||||
|
|
|
@ -465,6 +465,43 @@ describe("Waku Store", () => {
|
||||||
|
|
||||||
expect(bothMessages?.length).eq(2);
|
expect(bothMessages?.length).eq(2);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("Ordered callback, aborts when callback returns true", async function () {
|
||||||
|
this.timeout(15_000);
|
||||||
|
|
||||||
|
const totalMsgs = 20;
|
||||||
|
|
||||||
|
for (let i = 0; i < totalMsgs; i++) {
|
||||||
|
expect(
|
||||||
|
await nwaku.sendMessage(
|
||||||
|
Nwaku.toMessageRpcQuery({
|
||||||
|
payload: utf8ToBytes(`Message ${i}`),
|
||||||
|
contentTopic: TestContentTopic,
|
||||||
|
})
|
||||||
|
)
|
||||||
|
).to.be.true;
|
||||||
|
}
|
||||||
|
|
||||||
|
waku = await createFullNode({
|
||||||
|
staticNoiseKey: NOISE_KEY_1,
|
||||||
|
});
|
||||||
|
await waku.start();
|
||||||
|
await waku.dial(await nwaku.getMultiaddrWithId());
|
||||||
|
await waitForRemotePeer(waku, [Protocols.Store]);
|
||||||
|
|
||||||
|
const desiredMsgs = 14;
|
||||||
|
const messages: Message[] = [];
|
||||||
|
await waku.store.queryOrderedCallback(
|
||||||
|
[TestDecoder],
|
||||||
|
async (msg) => {
|
||||||
|
messages.push(msg);
|
||||||
|
return messages.length >= desiredMsgs;
|
||||||
|
},
|
||||||
|
{ pageSize: 7 }
|
||||||
|
);
|
||||||
|
|
||||||
|
expect(messages?.length).eq(desiredMsgs);
|
||||||
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
describe("Waku Store, custom pubsub topic", () => {
|
describe("Waku Store, custom pubsub topic", () => {
|
||||||
|
|
|
@ -112,7 +112,7 @@ export class WakuStore {
|
||||||
callback: (message: T) => Promise<void | boolean> | boolean | void,
|
callback: (message: T) => Promise<void | boolean> | boolean | void,
|
||||||
options?: QueryOptions
|
options?: QueryOptions
|
||||||
): Promise<void> {
|
): Promise<void> {
|
||||||
const abort = false;
|
let abort = false;
|
||||||
for await (const promises of this.queryGenerator(decoders, options)) {
|
for await (const promises of this.queryGenerator(decoders, options)) {
|
||||||
if (abort) break;
|
if (abort) break;
|
||||||
const messagesOrUndef: Array<T | undefined> = await Promise.all(promises);
|
const messagesOrUndef: Array<T | undefined> = await Promise.all(promises);
|
||||||
|
@ -129,9 +129,9 @@ export class WakuStore {
|
||||||
}
|
}
|
||||||
|
|
||||||
await Promise.all(
|
await Promise.all(
|
||||||
messages.map((msg) => {
|
messages.map(async (msg) => {
|
||||||
if (!abort) {
|
if (msg && !abort) {
|
||||||
if (msg) return callback(msg);
|
abort = Boolean(await callback(msg));
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
);
|
);
|
||||||
|
|
Loading…
Reference in New Issue