From 854755f9bc49157e17a1aff6719fc3fd50869242 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 29 Jul 2024 16:33:27 +0530 Subject: [PATCH] chore: remove `subscribe()` dependency https://github.com/waku-org/js-waku/issues/1979#issuecomment-2090743620 --- packages/interfaces/src/receiver.ts | 12 +-- packages/sdk/src/protocols/filter.ts | 75 +------------------ .../getUniquePubsubTopicsFromDecoders.ts | 17 +++++ packages/utils/src/common/index.ts | 1 + .../utils/src/common/to_async_iterator.ts | 30 +++++--- 5 files changed, 40 insertions(+), 95 deletions(-) create mode 100644 packages/utils/src/common/getUniquePubsubTopicsFromDecoders.ts diff --git a/packages/interfaces/src/receiver.ts b/packages/interfaces/src/receiver.ts index f329460b4e..b2571d3ec6 100644 --- a/packages/interfaces/src/receiver.ts +++ b/packages/interfaces/src/receiver.ts @@ -1,11 +1,5 @@ import type { IDecodedMessage, IDecoder } from "./message.js"; -import type { - ContentTopic, - IAsyncIterator, - PubsubTopic, - Unsubscribe -} from "./misc.js"; -import type { Callback } from "./protocols.js"; +import type { ContentTopic, IAsyncIterator, PubsubTopic } from "./misc.js"; export type ActiveSubscriptions = Map; @@ -13,8 +7,4 @@ export interface IReceiver { toSubscriptionIterator: ( decoders: IDecoder | IDecoder[] ) => Promise>; - subscribe: ( - decoders: IDecoder | IDecoder[], - callback: Callback - ) => Unsubscribe | Promise; } diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index c40661a728..c5e67261a1 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -20,8 +20,7 @@ import { type PubsubTopic, type SDKProtocolResult, type ShardingParams, - type SubscribeOptions, - type Unsubscribe + type SubscribeOptions } from "@waku/interfaces"; import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; @@ -516,83 +515,11 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { }; } - //TODO: remove this dependency on IReceiver - /** - * This method is used to satisfy the `IReceiver` interface. - * - * @hidden - * - * @param decoders The decoders to use for the subscription. - * @param callback The callback function to use for the subscription. - * @param opts Optional protocol options for the subscription. - * - * @returns A Promise that resolves to a function that unsubscribes from the subscription. - * - * @remarks - * This method should not be used directly. - * Instead, use `createSubscription` to create a new subscription. - */ - public async subscribe( - decoders: IDecoder | IDecoder[], - callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS - ): Promise { - const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); - - if (uniquePubsubTopics.length === 0) { - throw Error( - "Failed to subscribe: no pubsubTopic found on decoders provided." - ); - } - - if (uniquePubsubTopics.length > 1) { - throw Error( - "Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile." - ); - } - - const { subscription, error } = await this.createSubscription( - uniquePubsubTopics[0] - ); - - if (error) { - throw Error(`Failed to create subscription: ${error}`); - } - - await subscription.subscribe(decoders, callback, options); - - const contentTopics = Array.from( - groupByContentTopic( - Array.isArray(decoders) ? decoders : [decoders] - ).keys() - ); - - return async () => { - await subscription.unsubscribe(contentTopics); - }; - } - public toSubscriptionIterator( decoders: IDecoder | IDecoder[] ): Promise> { return toAsyncIterator(this, decoders); } - - private getUniquePubsubTopics( - decoders: IDecoder | IDecoder[] - ): string[] { - if (!Array.isArray(decoders)) { - return [decoders.pubsubTopic]; - } - - if (decoders.length === 0) { - return []; - } - - const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic)); - - return [...pubsubTopics]; - } } export function wakuFilter( diff --git a/packages/utils/src/common/getUniquePubsubTopicsFromDecoders.ts b/packages/utils/src/common/getUniquePubsubTopicsFromDecoders.ts new file mode 100644 index 0000000000..6daa21bf28 --- /dev/null +++ b/packages/utils/src/common/getUniquePubsubTopicsFromDecoders.ts @@ -0,0 +1,17 @@ +import { IDecodedMessage, IDecoder } from "@waku/interfaces"; + +export function getUniquePubsubTopicsFromDecoders( + decoders: IDecoder | IDecoder[] +): string[] { + if (!Array.isArray(decoders)) { + return [decoders.pubsubTopic]; + } + + if (decoders.length === 0) { + return []; + } + + const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic)); + + return [...pubsubTopics]; +} diff --git a/packages/utils/src/common/index.ts b/packages/utils/src/common/index.ts index 025a06f03c..f6fd13a7e1 100644 --- a/packages/utils/src/common/index.ts +++ b/packages/utils/src/common/index.ts @@ -8,6 +8,7 @@ export * from "./sharding.js"; export * from "./push_or_init_map.js"; export * from "./relay_shard_codec.js"; export * from "./delay.js"; +export * from "./getUniquePubsubTopicsFromDecoders.js"; export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] { const index = arr.indexOf(value); diff --git a/packages/utils/src/common/to_async_iterator.ts b/packages/utils/src/common/to_async_iterator.ts index 489ef234ef..9c3e1e2d33 100644 --- a/packages/utils/src/common/to_async_iterator.ts +++ b/packages/utils/src/common/to_async_iterator.ts @@ -2,10 +2,13 @@ import type { IAsyncIterator, IDecodedMessage, IDecoder, - IReceiver, + IFilterSDK, Unsubscribe } from "@waku/interfaces"; +import { delay } from "./delay.js"; +import { getUniquePubsubTopicsFromDecoders } from "./getUniquePubsubTopicsFromDecoders.js"; + /** * Options for configuring the behavior of an iterator. * @@ -28,7 +31,7 @@ const FRAME_RATE = 60; * @returns iterator and stop function to terminate it. */ export async function toAsyncIterator( - receiver: IReceiver, + filter: IFilterSDK, decoder: IDecoder | IDecoder[], iteratorOptions?: IteratorOptions ): Promise> { @@ -36,11 +39,24 @@ export async function toAsyncIterator( const messages: T[] = []; + const uniquePubsubTopics = getUniquePubsubTopicsFromDecoders(decoder); + const { error, subscription } = await filter.createSubscription( + uniquePubsubTopics[0] + ); + + if (error) { + throw new Error(`Error creating subscription: ${error}`); + } + let unsubscribe: undefined | Unsubscribe; - unsubscribe = await receiver.subscribe(decoder, (message: T) => { + const { failures } = await subscription.subscribe(decoder, (message: T) => { messages.push(message); }); + if (failures.length > 0) { + throw new Error(`Error subscribing to topics: ${failures}`); + } + const isWithTimeout = Number.isInteger(iteratorOptions?.timeoutMs); const timeoutMs = iteratorOptions?.timeoutMs ?? 0; const startTime = Date.now(); @@ -51,7 +67,7 @@ export async function toAsyncIterator( return; } - await wait(iteratorDelay); + await delay(iteratorDelay); const message = messages.shift() as T; @@ -77,9 +93,3 @@ export async function toAsyncIterator( } }; } - -function wait(ms: number): Promise { - return new Promise((resolve) => { - setTimeout(resolve, ms); - }); -}