chore: split FilterSDK into SubscriptionManager as a separete class

This commit is contained in:
Danish Arora 2024-09-10 14:33:22 +05:30
parent a2c5dba32d
commit 365c08403e
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
4 changed files with 325 additions and 316 deletions

View File

@ -15,7 +15,7 @@ export {
createLibp2pAndUpdateOptions
} from "./create/index.js";
export { wakuLightPush } from "./protocols/light_push.js";
export { wakuFilter } from "./protocols/filter.js";
export { wakuFilter } from "./protocols/filter/index.js";
export { wakuStore } from "./protocols/store.js";
export * as waku from "@waku/core";

View File

@ -0,0 +1,305 @@
import { ConnectionManager, FilterCore } from "@waku/core";
import {
type Callback,
type CreateSubscriptionResult,
type IAsyncIterator,
type IDecodedMessage,
type IDecoder,
type IFilterSDK,
type Libp2p,
NetworkConfig,
type ProtocolCreateOptions,
ProtocolError,
type ProtocolUseOptions,
type PubsubTopic,
type SubscribeOptions,
SubscribeResult,
type Unsubscribe
} from "@waku/interfaces";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
Logger,
shardInfoToPubsubTopics,
toAsyncIterator
} from "@waku/utils";
import { BaseProtocolSDK } from "../base_protocol";
import { SubscriptionManager } from "./subscription_manager";
const log = new Logger("sdk:filter");
class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore;
private readonly _connectionManager: ConnectionManager;
private activeSubscriptions = new Map<string, SubscriptionManager>();
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(
new FilterCore(
async (pubsubTopic, wakuMessage, peerIdStr) => {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(
`No subscription locally registered for topic ${pubsubTopic}`
);
return;
}
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},
connectionManager.configuredPubsubTopics,
libp2p
),
connectionManager,
{ numPeersToUse: options?.numPeersToUse }
);
this.protocol = this.core as FilterCore;
this._connectionManager = connectionManager;
}
/**
* Opens a subscription with the Filter protocol using the provided decoders and callback.
* This method combines the functionality of creating a subscription and subscribing to it.
*
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
* @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol.
* @param {SubscribeOptions} [subscribeOptions] - Options for the subscription.
*
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
* - subscription: The created subscription object if successful, or null if failed.
* - error: A ProtocolError if the subscription creation failed, or null if successful.
* - results: An object containing arrays of failures and successes from the subscription process.
* Only present if the subscription was created successfully.
*
* @throws {Error} If there's an unexpected error during the subscription process.
*
* @remarks
* This method attempts to create a subscription using the pubsub topic derived from the provided decoders,
* then tries to subscribe using the created subscription. The return value should be interpreted as follows:
* - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely.
* - If `subscription` is non-null and `error` is null, the subscription was created successfully.
* In this case, check the `results` field for detailed information about successes and failures during the subscription process.
* - Even if the subscription was created successfully, there might be some failures in the results.
*
* @example
* ```typescript
* const {subscription, error, results} = await waku.filter.subscribe(decoders, callback);
* if (!subscription || error) {
* console.error("Failed to create subscription:", error);
* }
* console.log("Subscription created successfully");
* if (results.failures.length > 0) {
* console.warn("Some errors occurred during subscription:", results.failures);
* }
* console.log("Successful subscriptions:", results.successes);
*
* ```
*/
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions?: SubscribeOptions
): Promise<SubscribeResult> {
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
if (uniquePubsubTopics.length !== 1) {
return {
subscription: null,
error: ProtocolError.INVALID_DECODER_TOPICS,
results: null
};
}
const pubsubTopic = uniquePubsubTopics[0];
const { subscription, error } = await this.createSubscription(
pubsubTopic,
protocolUseOptions
);
if (error) {
return {
subscription: null,
error: error,
results: null
};
}
const { failures, successes } = await subscription.subscribe(
decoders,
callback,
subscribeOptions
);
return {
subscription,
error: null,
results: {
failures: failures,
successes: successes
}
};
}
/**
* Creates a new subscription to the given pubsub topic.
* The subscription is made to multiple peers for decentralization.
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
* @returns The subscription object.
*/
private async createSubscription(
pubsubTopicShardInfo: NetworkConfig | PubsubTopic,
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult> {
options = {
autoRetry: true,
...options
} as ProtocolUseOptions;
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0];
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
return {
error: ProtocolError.NO_PEER_AVAILABLE,
subscription: null
};
}
log.info(
`Creating filter subscription with ${this.connectedPeers.length} peers: `,
this.connectedPeers.map((peer) => peer.id.toString())
);
const subscription =
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(
pubsubTopic,
this.protocol,
this._connectionManager,
() => this.connectedPeers,
this.renewPeer.bind(this)
)
);
return {
error: null,
subscription
};
}
/**
* This method is used to satisfy the `IReceiver` interface.
*
* @hidden
*
* @param decoders The decoders to use for the subscription.
* @param callback The callback function to use for the subscription.
* @param opts Optional protocol options for the subscription.
*
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
*
* @remarks
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options?: SubscribeOptions
): Promise<Unsubscribe> {
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);
if (uniquePubsubTopics.length === 0) {
throw Error(
"Failed to subscribe: no pubsubTopic found on decoders provided."
);
}
if (uniquePubsubTopics.length > 1) {
throw Error(
"Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile."
);
}
const { subscription, error } = await this.createSubscription(
uniquePubsubTopics[0]
);
if (error) {
throw Error(`Failed to create subscription: ${error}`);
}
await subscription.subscribe(decoders, callback, options);
const contentTopics = Array.from(
groupByContentTopic(
Array.isArray(decoders) ? decoders : [decoders]
).keys()
);
return async () => {
await subscription.unsubscribe(contentTopics);
};
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
//TODO: move to SubscriptionManager
private getActiveSubscription(
pubsubTopic: PubsubTopic
): SubscriptionManager | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}
private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: SubscriptionManager
): SubscriptionManager {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}
private getUniquePubsubTopics<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): string[] {
if (!Array.isArray(decoders)) {
return [decoders.pubsubTopic];
}
if (decoders.length === 0) {
return [];
}
const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic));
return [...pubsubTopics];
}
}
export function wakuFilter(
connectionManager: ConnectionManager,
init?: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilterSDK {
return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init);
}

View File

@ -1,46 +1,24 @@
import type { Peer } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager, FilterCore } from "@waku/core";
import {
type Callback,
type ContentTopic,
type CoreProtocolResult,
type CreateSubscriptionResult,
Callback,
ContentTopic,
CoreProtocolResult,
EConnectionStateEvents,
type IAsyncIterator,
type IDecodedMessage,
type IDecoder,
type IFilterSDK,
type IProtoMessage,
type ISubscriptionSDK,
type Libp2p,
NetworkConfig,
type PeerIdStr,
type ProtocolCreateOptions,
IDecodedMessage,
IDecoder,
IProtoMessage,
ISubscriptionSDK,
PeerIdStr,
ProtocolError,
type ProtocolUseOptions,
type PubsubTopic,
type SDKProtocolResult,
type SubscribeOptions,
SubscribeResult,
type Unsubscribe
PubsubTopic,
SDKProtocolResult,
SubscribeOptions,
SubscriptionCallback
} from "@waku/interfaces";
import { messageHashStr } from "@waku/message-hash";
import { WakuMessage } from "@waku/proto";
import {
ensurePubsubTopicIsConfigured,
groupByContentTopic,
Logger,
shardInfoToPubsubTopics,
toAsyncIterator
} from "@waku/utils";
import { BaseProtocolSDK } from "./base_protocol.js";
type SubscriptionCallback<T extends IDecodedMessage> = {
decoders: IDecoder<T>[];
callback: Callback<T>;
};
import { groupByContentTopic, Logger } from "@waku/utils";
type ReceivedMessageHashes = {
all: Set<string>;
@ -49,11 +27,11 @@ type ReceivedMessageHashes = {
};
};
const log = new Logger("sdk:filter");
export const DEFAULT_MAX_PINGS = 2;
export const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
export const DEFAULT_KEEP_ALIVE = 60 * 1000;
const DEFAULT_MAX_PINGS = 2;
const DEFAULT_MAX_MISSED_MESSAGES_THRESHOLD = 3;
const DEFAULT_KEEP_ALIVE = 60 * 1000;
const log = new Logger("sdk:filter:subscription_manager");
export class SubscriptionManager implements ISubscriptionSDK {
private subscriptionCallbacks: Map<
@ -460,280 +438,6 @@ export class SubscriptionManager implements ISubscriptionSDK {
}
}
class FilterSDK extends BaseProtocolSDK implements IFilterSDK {
public readonly protocol: FilterCore;
private readonly _connectionManager: ConnectionManager;
private activeSubscriptions = new Map<string, SubscriptionManager>();
public constructor(
connectionManager: ConnectionManager,
libp2p: Libp2p,
options?: ProtocolCreateOptions
) {
super(
new FilterCore(
async (pubsubTopic, wakuMessage, peerIdStr) => {
const subscription = this.getActiveSubscription(pubsubTopic);
if (!subscription) {
log.error(
`No subscription locally registered for topic ${pubsubTopic}`
);
return;
}
await subscription.processIncomingMessage(wakuMessage, peerIdStr);
},
connectionManager.configuredPubsubTopics,
libp2p
),
connectionManager,
{ numPeersToUse: options?.numPeersToUse }
);
this.protocol = this.core as FilterCore;
this._connectionManager = connectionManager;
}
/**
* Opens a subscription with the Filter protocol using the provided decoders and callback.
* This method combines the functionality of creating a subscription and subscribing to it.
*
* @param {IDecoder<T> | IDecoder<T>[]} decoders - A single decoder or an array of decoders to use for decoding messages.
* @param {Callback<T>} callback - The callback function to be invoked with decoded messages.
* @param {ProtocolUseOptions} [protocolUseOptions] - Optional settings for using the protocol.
* @param {SubscribeOptions} [subscribeOptions] - Options for the subscription.
*
* @returns {Promise<SubscribeResult>} A promise that resolves to an object containing:
* - subscription: The created subscription object if successful, or null if failed.
* - error: A ProtocolError if the subscription creation failed, or null if successful.
* - results: An object containing arrays of failures and successes from the subscription process.
* Only present if the subscription was created successfully.
*
* @throws {Error} If there's an unexpected error during the subscription process.
*
* @remarks
* This method attempts to create a subscription using the pubsub topic derived from the provided decoders,
* then tries to subscribe using the created subscription. The return value should be interpreted as follows:
* - If `subscription` is null and `error` is non-null, a critical error occurred and the subscription failed completely.
* - If `subscription` is non-null and `error` is null, the subscription was created successfully.
* In this case, check the `results` field for detailed information about successes and failures during the subscription process.
* - Even if the subscription was created successfully, there might be some failures in the results.
*
* @example
* ```typescript
* const {subscription, error, results} = await waku.filter.subscribe(decoders, callback);
* if (!subscription || error) {
* console.error("Failed to create subscription:", error);
* }
* console.log("Subscription created successfully");
* if (results.failures.length > 0) {
* console.warn("Some errors occurred during subscription:", results.failures);
* }
* console.log("Successful subscriptions:", results.successes);
*
* ```
*/
public async subscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
protocolUseOptions?: ProtocolUseOptions,
subscribeOptions?: SubscribeOptions
): Promise<SubscribeResult> {
const uniquePubsubTopics = this.getUniquePubsubTopics(decoders);
if (uniquePubsubTopics.length !== 1) {
return {
subscription: null,
error: ProtocolError.INVALID_DECODER_TOPICS,
results: null
};
}
const pubsubTopic = uniquePubsubTopics[0];
const { subscription, error } = await this.createSubscription(
pubsubTopic,
protocolUseOptions
);
if (error) {
return {
subscription: null,
error: error,
results: null
};
}
const { failures, successes } = await subscription.subscribe(
decoders,
callback,
subscribeOptions
);
return {
subscription,
error: null,
results: {
failures: failures,
successes: successes
}
};
}
/**
* Creates a new subscription to the given pubsub topic.
* The subscription is made to multiple peers for decentralization.
* @param pubsubTopicShardInfo The pubsub topic to subscribe to.
* @returns The subscription object.
*/
private async createSubscription(
pubsubTopicShardInfo: NetworkConfig | PubsubTopic,
options?: ProtocolUseOptions
): Promise<CreateSubscriptionResult> {
options = {
autoRetry: true,
...options
} as ProtocolUseOptions;
const pubsubTopic =
typeof pubsubTopicShardInfo == "string"
? pubsubTopicShardInfo
: shardInfoToPubsubTopics(pubsubTopicShardInfo)?.[0];
ensurePubsubTopicIsConfigured(pubsubTopic, this.protocol.pubsubTopics);
const hasPeers = await this.hasPeers(options);
if (!hasPeers) {
return {
error: ProtocolError.NO_PEER_AVAILABLE,
subscription: null
};
}
log.info(
`Creating filter subscription with ${this.connectedPeers.length} peers: `,
this.connectedPeers.map((peer) => peer.id.toString())
);
const subscription =
this.getActiveSubscription(pubsubTopic) ??
this.setActiveSubscription(
pubsubTopic,
new SubscriptionManager(
pubsubTopic,
this.protocol,
this._connectionManager,
() => this.connectedPeers,
this.renewPeer.bind(this)
)
);
return {
error: null,
subscription
};
}
/**
* This method is used to satisfy the `IReceiver` interface.
*
* @hidden
*
* @param decoders The decoders to use for the subscription.
* @param callback The callback function to use for the subscription.
* @param opts Optional protocol options for the subscription.
*
* @returns A Promise that resolves to a function that unsubscribes from the subscription.
*
* @remarks
* This method should not be used directly.
* Instead, use `createSubscription` to create a new subscription.
*/
public async subscribeWithUnsubscribe<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[],
callback: Callback<T>,
options?: SubscribeOptions
): Promise<Unsubscribe> {
const uniquePubsubTopics = this.getUniquePubsubTopics<T>(decoders);
if (uniquePubsubTopics.length === 0) {
throw Error(
"Failed to subscribe: no pubsubTopic found on decoders provided."
);
}
if (uniquePubsubTopics.length > 1) {
throw Error(
"Failed to subscribe: all decoders should have the same pubsub topic. Use createSubscription to be more agile."
);
}
const { subscription, error } = await this.createSubscription(
uniquePubsubTopics[0]
);
if (error) {
throw Error(`Failed to create subscription: ${error}`);
}
await subscription.subscribe(decoders, callback, options);
const contentTopics = Array.from(
groupByContentTopic(
Array.isArray(decoders) ? decoders : [decoders]
).keys()
);
return async () => {
await subscription.unsubscribe(contentTopics);
};
}
public toSubscriptionIterator<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): Promise<IAsyncIterator<T>> {
return toAsyncIterator(this, decoders);
}
//TODO: move to SubscriptionManager
private getActiveSubscription(
pubsubTopic: PubsubTopic
): SubscriptionManager | undefined {
return this.activeSubscriptions.get(pubsubTopic);
}
private setActiveSubscription(
pubsubTopic: PubsubTopic,
subscription: SubscriptionManager
): SubscriptionManager {
this.activeSubscriptions.set(pubsubTopic, subscription);
return subscription;
}
private getUniquePubsubTopics<T extends IDecodedMessage>(
decoders: IDecoder<T> | IDecoder<T>[]
): string[] {
if (!Array.isArray(decoders)) {
return [decoders.pubsubTopic];
}
if (decoders.length === 0) {
return [];
}
const pubsubTopics = new Set(decoders.map((d) => d.pubsubTopic));
return [...pubsubTopics];
}
}
export function wakuFilter(
connectionManager: ConnectionManager,
init?: ProtocolCreateOptions
): (libp2p: Libp2p) => IFilterSDK {
return (libp2p: Libp2p) => new FilterSDK(connectionManager, libp2p, init);
}
async function pushMessage<T extends IDecodedMessage>(
subscriptionCallback: SubscriptionCallback<T>,
pubsubTopic: PubsubTopic,

View File

@ -16,7 +16,7 @@ import type {
import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { wakuFilter } from "./protocols/filter.js";
import { wakuFilter } from "./protocols/filter/index.js";
import { wakuLightPush } from "./protocols/light_push.js";
import { wakuStore } from "./protocols/store.js";