From 634cdbebc49c41b381bd3a7db64b87144e2b4384 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Mon, 29 Jul 2024 15:44:11 +0530 Subject: [PATCH] chore: move SubscriptionManager to a separate file --- packages/sdk/src/index.ts | 2 +- packages/sdk/src/protocols/filter/index.ts | 222 ++++++++++++++++++ .../subscription_manager.ts} | 140 +---------- packages/sdk/src/waku.ts | 2 +- 4 files changed, 228 insertions(+), 138 deletions(-) create mode 100644 packages/sdk/src/protocols/filter/index.ts rename packages/sdk/src/protocols/{filter.ts => filter/subscription_manager.ts} (77%) diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 2bbdfc0d30..fdff4f4a8a 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -11,7 +11,7 @@ export * from "./waku.js"; export { createLightNode, defaultLibp2p } from "./create/index.js"; export { wakuLightPush } from "./protocols/light_push.js"; -export { wakuFilter } from "./protocols/filter.js"; +export { wakuFilter } from "./protocols/filter/index.js"; export { wakuStore } from "./protocols/store.js"; export * as waku from "@waku/core"; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts new file mode 100644 index 0000000000..c52a50197a --- /dev/null +++ b/packages/sdk/src/protocols/filter/index.ts @@ -0,0 +1,222 @@ +import { ConnectionManager, FilterCore } from "@waku/core"; +import { + type Callback, + type CreateSubscriptionResult, + type IAsyncIterator, + type IDecodedMessage, + type IDecoder, + type IFilterSDK, + type Libp2p, + type ProtocolCreateOptions, + ProtocolError, + type ProtocolUseOptions, + type PubsubTopic, + type ShardingParams, + type SubscribeOptions, + type Unsubscribe +} from "@waku/interfaces"; +import { + ensurePubsubTopicIsConfigured, + groupByContentTopic, + Logger, + shardInfoToPubsubTopics, + toAsyncIterator +} from "@waku/utils"; + +import { BaseProtocolSDK } from "../base_protocol.js"; + +import { + DEFAULT_SUBSCRIBE_OPTIONS, + SubscriptionManager +} from "./subscription_manager.js"; + +const log = new Logger("sdk:filter"); + +class FilterSDK extends BaseProtocolSDK implements IFilterSDK { + public readonly protocol: FilterCore; + + private activeSubscriptions = new Map(); + + public constructor( + connectionManager: ConnectionManager, + libp2p: Libp2p, + options?: ProtocolCreateOptions + ) { + super( + new FilterCore( + async (pubsubTopic, wakuMessage, peerIdStr) => { + const subscription = this.getActiveSubscription(pubsubTopic); + if (!subscription) { + log.error( + `No subscription locally registered for topic ${pubsubTopic}` + ); + return; + } + + await subscription.processIncomingMessage(wakuMessage, peerIdStr); + }, + libp2p, + options + ), + connectionManager, + { numPeersToUse: options?.numPeersToUse } + ); + + this.protocol = this.core as FilterCore; + + this.activeSubscriptions = new Map(); + } + + //TODO: move to SubscriptionManager + private getActiveSubscription( + pubsubTopic: PubsubTopic + ): SubscriptionManager | undefined { + return this.activeSubscriptions.get(pubsubTopic); + } + + private setActiveSubscription( + pubsubTopic: PubsubTopic, + subscription: SubscriptionManager + ): SubscriptionManager { + this.activeSubscriptions.set(pubsubTopic, subscription); + return subscription; + } + + /** + * Creates a new subscription to the given pubsub topic. + * The subscription is made to multiple peers for decentralization. + * @param pubsubTopicShardInfo The pubsub topic to subscribe to. + * @returns The subscription object. + */ + public async createSubscription( + pubsubTopicShardInfo: ShardingParams | PubsubTopic, + options?: ProtocolUseOptions + ): Promise { + options = { + autoRetry: true, + ...options + } as ProtocolUseOptions; + + const pubsubTopic = + typeof pubsubTopicShardInfo == "string" + ? pubsubTopicShardInfo + : shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0]; + + ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); + + const hasPeers = await this.hasPeers(options); + if (!hasPeers) { + return { + error: ProtocolError.NO_PEER_AVAILABLE, + subscription: null + }; + } + + log.info( + `Creating filter subscription with ${this.connectedPeers.length} peers: `, + this.connectedPeers.map((peer) => peer.id.toString()) + ); + + const subscription = + this.getActiveSubscription(pubsubTopic) ?? + this.setActiveSubscription( + pubsubTopic, + new SubscriptionManager( + pubsubTopic, + this.protocol, + () => this.connectedPeers, + this.renewPeer.bind(this) + ) + ); + + return { + error: null, + subscription + }; + } + + //TODO: remove this dependency on IReceiver + /** + * This method is used to satisfy the `IReceiver` interface. + * + * @hidden + * + * @param decoders The decoders to use for the subscription. + * @param callback The callback function to use for the subscription. + * @param opts Optional protocol options for the subscription. + * + * @returns A Promise that resolves to a function that unsubscribes from the subscription. + * + * @remarks + * This method should not be used directly. + * Instead, use `createSubscription` to create a new subscription. + */ + public async subscribe( + decoders: IDecoder | IDecoder[], + callback: Callback, + options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + ): Promise { + const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); + + if (uniquePubsubTopics.length === 0) { + throw Error( + "Failed to subscribe: no pubsubTopic found on decoders provided." + ); + } + + if (uniquePubsubTopics.length > 1) { + throw Error( + "Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile." + ); + } + + const { subscription, error } = await this.createSubscription( + uniquePubsubTopics[0] + ); + + if (error) { + throw Error(`Failed to create subscription: ${error}`); + } + + await subscription.subscribe(decoders, callback, options); + + const contentTopics = Array.from( + groupByContentTopic( + Array.isArray(decoders) ? decoders : [decoders] + ).keys() + ); + + return async () => { + await subscription.unsubscribe(contentTopics); + }; + } + + public toSubscriptionIterator( + decoders: IDecoder | IDecoder[] + ): Promise> { + return toAsyncIterator(this, decoders); + } + + private getUniquePubsubTopics( + decoders: IDecoder | IDecoder[] + ): string[] { + if (!Array.isArray(decoders)) { + return [decoders.pubsubTopic]; + } + + if (decoders.length === 0) { + return []; + } + + const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic)); + + return [...pubsubTopics]; + } +} + +export function wakuFilter( + connectionManager: ConnectionManager, + init?: ProtocolCreateOptions +): (libp2p: Libp2p) => IFilterSDK { + return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init); +} diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts similarity index 77% rename from packages/sdk/src/protocols/filter.ts rename to packages/sdk/src/protocols/filter/subscription_manager.ts index c5e67261a1..1f79c22142 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -1,38 +1,23 @@ import type { Peer } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface"; -import { ConnectionManager, FilterCore } from "@waku/core"; +import { FilterCore } from "@waku/core"; import { type Callback, type ContentTopic, type CoreProtocolResult, - type CreateSubscriptionResult, - type IAsyncIterator, type IDecodedMessage, type IDecoder, - type IFilterSDK, type IProtoMessage, type ISubscriptionSDK, - type Libp2p, type PeerIdStr, - type ProtocolCreateOptions, ProtocolError, - type ProtocolUseOptions, type PubsubTopic, type SDKProtocolResult, - type ShardingParams, type SubscribeOptions } from "@waku/interfaces"; import { messageHashStr } from "@waku/message-hash"; import { WakuMessage } from "@waku/proto"; -import { - ensurePubsubTopicIsConfigured, - groupByContentTopic, - Logger, - shardInfoToPubsubTopics, - toAsyncIterator -} from "@waku/utils"; - -import { BaseProtocolSDK } from "./base_protocol.js"; +import { groupByContentTopic, Logger } from "@waku/utils"; type SubscriptionCallback = { decoders: IDecoder[]; @@ -46,13 +31,13 @@ type ReceivedMessageHashes = { }; }; -const log = new Logger("sdk:filter"); +const log = new Logger("sdk:filter:subscription_manager"); const DEFAULT_MAX_PINGS = 3; const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3; const DEFAULT_KEEP_ALIVE = 30 * 1000; -const DEFAULT_SUBSCRIBE_OPTIONS = { +export const DEFAULT_SUBSCRIBE_OPTIONS = { keepAlive: DEFAULT_KEEP_ALIVE }; export class SubscriptionManager implements ISubscriptionSDK { @@ -412,123 +397,6 @@ export class SubscriptionManager implements ISubscriptionSDK { } } -class FilterSDK extends BaseProtocolSDK implements IFilterSDK { - public readonly protocol: FilterCore; - - private activeSubscriptions = new Map(); - - public constructor( - connectionManager: ConnectionManager, - libp2p: Libp2p, - options?: ProtocolCreateOptions - ) { - super( - new FilterCore( - async (pubsubTopic, wakuMessage, peerIdStr) => { - const subscription = this.getActiveSubscription(pubsubTopic); - if (!subscription) { - log.error( - `No subscription locally registered for topic ${pubsubTopic}` - ); - return; - } - - await subscription.processIncomingMessage(wakuMessage, peerIdStr); - }, - libp2p, - options - ), - connectionManager, - { numPeersToUse: options?.numPeersToUse } - ); - - this.protocol = this.core as FilterCore; - - this.activeSubscriptions = new Map(); - } - - //TODO: move to SubscriptionManager - private getActiveSubscription( - pubsubTopic: PubsubTopic - ): SubscriptionManager | undefined { - return this.activeSubscriptions.get(pubsubTopic); - } - - private setActiveSubscription( - pubsubTopic: PubsubTopic, - subscription: SubscriptionManager - ): SubscriptionManager { - this.activeSubscriptions.set(pubsubTopic, subscription); - return subscription; - } - - /** - * Creates a new subscription to the given pubsub topic. - * The subscription is made to multiple peers for decentralization. - * @param pubsubTopicShardInfo The pubsub topic to subscribe to. - * @returns The subscription object. - */ - public async createSubscription( - pubsubTopicShardInfo: ShardingParams | PubsubTopic, - options?: ProtocolUseOptions - ): Promise { - options = { - autoRetry: true, - ...options - } as ProtocolUseOptions; - - const pubsubTopic = - typeof pubsubTopicShardInfo == "string" - ? pubsubTopicShardInfo - : shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0]; - - ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics); - - const hasPeers = await this.hasPeers(options); - if (!hasPeers) { - return { - error: ProtocolError.NO_PEER_AVAILABLE, - subscription: null - }; - } - - log.info( - `Creating filter subscription with ${this.connectedPeers.length} peers: `, - this.connectedPeers.map((peer) => peer.id.toString()) - ); - - const subscription = - this.getActiveSubscription(pubsubTopic) ?? - this.setActiveSubscription( - pubsubTopic, - new SubscriptionManager( - pubsubTopic, - this.protocol, - () => this.connectedPeers, - this.renewPeer.bind(this) - ) - ); - - return { - error: null, - subscription - }; - } - - public toSubscriptionIterator( - decoders: IDecoder | IDecoder[] - ): Promise> { - return toAsyncIterator(this, decoders); - } -} - -export function wakuFilter( - connectionManager: ConnectionManager, - init?: ProtocolCreateOptions -): (libp2p: Libp2p) => IFilterSDK { - return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init); -} - async function pushMessage( subscriptionCallback: SubscriptionCallback, pubsubTopic: PubsubTopic, diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index 95977419f4..f6a025490e 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -17,7 +17,7 @@ import { Protocols } from "@waku/interfaces"; import { wakuRelay } from "@waku/relay"; import { Logger } from "@waku/utils"; -import { wakuFilter } from "./protocols/filter.js"; +import { wakuFilter } from "./protocols/filter/index.js"; import { wakuLightPush } from "./protocols/light_push.js"; import { wakuStore } from "./protocols/store.js";