chore: remove subscribe() dependency

https://github.com/waku-org/js-waku/issues/1979#issuecomment-2090743620
This commit is contained in:
danisharora099 2024-07-29 16:33:27 +05:30
parent d464af3645
commit 854755f9bc
No known key found for this signature in database
GPG Key ID: FBD2BF500037F135
5 changed files with 40 additions and 95 deletions

View File

@ -1,11 +1,5 @@
import type { IDecodedMessage, IDecoder } from "./message.js";
import type {
ContentTopic,
IAsyncIterator,
PubsubTopic,
Unsubscribe
} from "./misc.js";
import type { Callback } from "./protocols.js";
import type { ContentTopic, IAsyncIterator, PubsubTopic } from "./misc.js";
export type ActiveSubscriptions = Map<PubsubTopic, ContentTopic[]>;
@ -13,8 +7,4 @@ export interface IReceiver {
toSubscriptionIterator: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
) => Promise<IAsyncIterator<T>>;
subscribe: <T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>
) => Unsubscribe | Promise<Unsubscribe>;
}

View File

@ -20,8 +20,7 @@ import {
type PubsubTopic,
type SDKProtocolResult,
type ShardingParams,
type SubscribeOptions,
type Unsubscribe
type SubscribeOptions
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
@ -516,83 +515,11 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
};
}
//TODO: remove this dependency on IReceiver
/**
* This method is used to satisfy the `IReceiver` interface.
*
* @hidden
*
* @param decoders The decoders to use for the subscription.
* @param callback The callback function to use for the subscription.
* @param opts Optional protocol options for the subscription.
*
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
*
* @remarks
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS
): Promise<Unsubscribe> {
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);
if (uniquePubsubTopics.length === 0) {
throw Error(
"Failed to subscribe: no pubsubTopic found on decoders provided."
);
}
if (uniquePubsubTopics.length > 1) {
throw Error(
"Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile."
);
}
const { subscription, error } = await this.createSubscription(
uniquePubsubTopics[0]
);
if (error) {
throw Error(`Failed to create subscription: ${error}`);
}
await subscription.subscribe(decoders, callback, options);
const contentTopics = Array.from(
groupByContentTopic(
Array.isArray(decoders) ? decoders : [decoders]
).keys()
);
return async () => {
await subscription.unsubscribe(contentTopics);
};
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
private getUniquePubsubTopics<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): string[] {
if (!Array.isArray(decoders)) {
return [decoders.pubsubTopic];
}
if (decoders.length === 0) {
return [];
}
const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic));
return [...pubsubTopics];
}
}
export function wakuFilter(

View File

@ -0,0 +1,17 @@
import { IDecodedMessage, IDecoder } from "@waku/interfaces";
export function getUniquePubsubTopicsFromDecoders<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): string[] {
if (!Array.isArray(decoders)) {
return [decoders.pubsubTopic];
}
if (decoders.length === 0) {
return [];
}
const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic));
return [...pubsubTopics];
}

View File

@ -8,6 +8,7 @@ export * from "./sharding.js";
export * from "./push_or_init_map.js";
export * from "./relay_shard_codec.js";
export * from "./delay.js";
export * from "./getUniquePubsubTopicsFromDecoders.js";
export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] {
const index = arr.indexOf(value);

View File

@ -2,10 +2,13 @@ import type {
IAsyncIterator,
IDecodedMessage,
IDecoder,
IReceiver,
IFilterSDK,
Unsubscribe
} from "@waku/interfaces";
import { delay } from "./delay.js";
import { getUniquePubsubTopicsFromDecoders } from "./getUniquePubsubTopicsFromDecoders.js";
/**
* Options for configuring the behavior of an iterator.
*
@ -28,7 +31,7 @@ const FRAME_RATE = 60;
* @returns iterator and stop function to terminate it.
*/
export async function toAsyncIterator<T extends IDecodedMessage>(
receiver: IReceiver,
filter: IFilterSDK,
decoder: IDecoder<T> | IDecoder<T>[],
iteratorOptions?: IteratorOptions
): Promise<IAsyncIterator<T>> {
@ -36,11 +39,24 @@ export async function toAsyncIterator<T extends IDecodedMessage>(
const messages: T[] = [];
const uniquePubsubTopics = getUniquePubsubTopicsFromDecoders(decoder);
const { error, subscription } = await filter.createSubscription(
uniquePubsubTopics[0]
);
if (error) {
throw new Error(`Error creating subscription: ${error}`);
}
let unsubscribe: undefined | Unsubscribe;
unsubscribe = await receiver.subscribe(decoder, (message: T) => {
const { failures } = await subscription.subscribe(decoder, (message: T) => {
messages.push(message);
});
if (failures.length > 0) {
throw new Error(`Error subscribing to topics: ${failures}`);
}
const isWithTimeout = Number.isInteger(iteratorOptions?.timeoutMs);
const timeoutMs = iteratorOptions?.timeoutMs ?? 0;
const startTime = Date.now();
@ -51,7 +67,7 @@ export async function toAsyncIterator<T extends IDecodedMessage>(
return;
}
await wait(iteratorDelay);
await delay(iteratorDelay);
const message = messages.shift() as T;
@ -77,9 +93,3 @@ export async function toAsyncIterator<T extends IDecodedMessage>(
}
};
}
function wait(ms: number): Promise<void> {
return new Promise((resolve) => {
setTimeout(resolve, ms);
});
}