diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index 58e61919b0..6b26b1c18f 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -48,7 +48,12 @@ describe("Util: toAsyncIterator", () => { const messageText = "hey, what's up?"; const sent = { payload: utf8ToBytes(messageText) }; - const { iterator } = await toAsyncIterator(waku.filter, TestDecoder); + const { iterator } = await toAsyncIterator( + waku.filter, + TestDecoder, + {}, + { timeoutMs: 1000 } + ); await waku.lightPush.send(TestEncoder, sent); const { value } = await iterator.next(); @@ -60,7 +65,12 @@ describe("Util: toAsyncIterator", () => { it("handles multiple messages", async function () { this.timeout(10000); - const { iterator } = await toAsyncIterator(waku.filter, TestDecoder); + const { iterator } = await toAsyncIterator( + waku.filter, + TestDecoder, + {}, + { timeoutMs: 1000 } + ); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("Filtering works!"), @@ -78,7 +88,12 @@ describe("Util: toAsyncIterator", () => { it("unsubscribes", async function () { this.timeout(10000); - const { iterator, stop } = await toAsyncIterator(waku.filter, TestDecoder); + const { iterator, stop } = await toAsyncIterator( + waku.filter, + TestDecoder, + {}, + { timeoutMs: 1000 } + ); await waku.lightPush.send(TestEncoder, { payload: utf8ToBytes("This should be received"), diff --git a/packages/utils/src/common/to_async_iterator.ts b/packages/utils/src/common/to_async_iterator.ts index 74d0218911..657050bbe0 100644 --- a/packages/utils/src/common/to_async_iterator.ts +++ b/packages/utils/src/common/to_async_iterator.ts @@ -7,11 +7,29 @@ import type { Unsubscribe, } from "@waku/interfaces"; +type IteratorOptions = { + timeoutMs?: number; + iteratorDelay?: number; +}; + +const FRAME_RATE = 60; + +/** + * Function that transforms IReceiver subscription to iterable stream of data. + * @param receiver - object that allows to be subscribed to; + * @param decoder - parameter to be passed to receiver for subscription; + * @param options - options for receiver for subscription; + * @param iteratorOptions - optional configuration for iterator; + * @returns iterator and stop function to terminate it. + */ export async function toAsyncIterator( receiver: IReceiver, decoder: IDecoder | IDecoder[], - options?: ProtocolOptions + options?: ProtocolOptions, + iteratorOptions?: IteratorOptions ): Promise> { + const iteratorDelay = iteratorOptions?.iteratorDelay ?? FRAME_RATE; + const messages: T[] = []; let unsubscribe: undefined | Unsubscribe; @@ -23,8 +41,18 @@ export async function toAsyncIterator( options ); + const isWithTimeout = Number.isInteger(iteratorOptions?.timeoutMs); + const timeoutMs = iteratorOptions?.timeoutMs ?? 0; + const startTime = Date.now(); + async function* iterator(): AsyncIterator { while (true) { + if (isWithTimeout && Date.now() - startTime >= timeoutMs) { + return; + } + + await wait(iteratorDelay); + const message = messages.shift() as T; if (!unsubscribe && messages.length === 0) { @@ -49,3 +77,9 @@ export async function toAsyncIterator( }, }; } + +function wait(ms: number): Promise { + return new Promise((resolve) => { + setTimeout(resolve, ms); + }); +}