logos-messaging-js/packages/utils/src/common/to_async_iterator.ts
Danish Arora 81a52a8097
feat!: set peer-exchange with default bootstrap (#1469)
* set peer-exchange with default bootstrap

* only initialise protocols with bootstrap peers

* update package

* update package-lock

* refactor `getPeers` while setting up a protocol

* move codecs to `@waku/interfaces`

* lightpush: send messages to multiple peers

* only use multiple peers for LP and Filter

* fix: ts warnings

* lightpush: tests pass

* update breaking changes for new API

* move codecs back into protocol files

* refactor: `getPeers()`

* rm: log as an arg

* add tsdoc for getPeers

* add import

* add prettier rule to eslint

* add: peer exchange to sdk as a dep

* fix eslint error

* add try catch

* revert unecessary diff

* revert unecessary diff

* fix imports

* convert relaycodecs to array

* remove: peerId as an arg for protocol methods

* keep peerId as an arg for peer-exchange

* remove: peerId from getPeers()

* lightpush: extract hardcoded numPeers as a constant

* return all peers if numPeers is 0 and increase readability for random peers

* refactor considering more than 1 bootstrap peers can exist

* use `getPeers`

* change arg for `getPeers` to object

* address comments

* refactor tests for new API

* lightpush: make constant the class variable

* use `maxBootstrapPeers` instead of `includeBootstrap`

* refactor protocols for new API

* add tests for `getPeers`

* skip getPeers test

* rm: only from test

* move tests to `base_protocol.spec.ts`

* break down `getPeers` into a `filter` method

* return all bootstrap peers if arg is 0

* refactor test without stubbing

* address comments

* update test title

* move `filterPeers` to a separate file

* address comments & add more test

* make test title more verbose

* address comments

* remove ProtocolOptions

* chore: refactor tests for new API

* add defaults for getPeers

* address comments

* rm unneeded comment

* address comment: add diversity of node tags to test

* address comments

* fix: imports
2023-09-07 13:15:49 +05:30

86 lines
2.2 KiB
TypeScript

import type {
IAsyncIterator,
IDecodedMessage,
IDecoder,
IReceiver,
Unsubscribe
} from "@waku/interfaces";
/**
* Options for configuring the behavior of an iterator.
*
* @property timeoutMs - Optional timeout in milliseconds. If specified, the iterator will terminate after this time period.
* @property iteratorDelay - Optional delay in milliseconds between each iteration. Can be used to control the rate of iteration.
*/
export type IteratorOptions = {
timeoutMs?: number;
iteratorDelay?: number;
};
const FRAME_RATE = 60;
/**
* Function that transforms IReceiver subscription to iterable stream of data.
* @param receiver - object that allows to be subscribed to;
* @param decoder - parameter to be passed to receiver for subscription;
* @param options - options for receiver for subscription;
* @param iteratorOptions - optional configuration for iterator;
* @returns iterator and stop function to terminate it.
*/
export async function toAsyncIterator<T extends IDecodedMessage>(
receiver: IReceiver,
decoder: IDecoder<T> | IDecoder<T>[],
iteratorOptions?: IteratorOptions
): Promise<IAsyncIterator<T>> {
const iteratorDelay = iteratorOptions?.iteratorDelay ?? FRAME_RATE;
const messages: T[] = [];
let unsubscribe: undefined | Unsubscribe;
unsubscribe = await receiver.subscribe(decoder, (message: T) => {
messages.push(message);
});
const isWithTimeout = Number.isInteger(iteratorOptions?.timeoutMs);
const timeoutMs = iteratorOptions?.timeoutMs ?? 0;
const startTime = Date.now();
async function* iterator(): AsyncIterator<T> {
while (true) {
if (isWithTimeout && Date.now() - startTime >= timeoutMs) {
return;
}
await wait(iteratorDelay);
const message = messages.shift() as T;
if (!unsubscribe && messages.length === 0) {
return message;
}
if (!message && unsubscribe) {
continue;
}
yield message;
}
}
return {
iterator: iterator(),
async stop() {
if (unsubscribe) {
await unsubscribe();
unsubscribe = undefined;
}
}
};
}
function wait(ms: number): Promise<void> {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}