From 09127dfc9187bfc15dcd35330553de0de602e093 Mon Sep 17 00:00:00 2001 From: Sasha Date: Sun, 27 Oct 2024 18:13:26 +0100 Subject: [PATCH] move subscribe options to createLightNode Fitler protocol options --- packages/interfaces/src/filter.ts | 32 ++++++++++++++----- packages/interfaces/src/protocols.ts | 10 +++++- .../sdk/src/protocols/filter/constants.ts | 6 +--- packages/sdk/src/protocols/filter/index.ts | 27 ++++++++-------- .../protocols/filter/subscription_manager.ts | 26 ++++++--------- packages/sdk/src/protocols/filter/utils.ts | 15 +++++++++ packages/sdk/src/waku/waku.ts | 3 +- 7 files changed, 74 insertions(+), 45 deletions(-) create mode 100644 packages/sdk/src/protocols/filter/utils.ts diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index f992a497a7..7d5168e27b 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -15,17 +15,34 @@ export type SubscriptionCallback = { callback: Callback; }; -export type SubscribeOptions = { - keepAlive?: number; - pingsBeforePeerRenewed?: number; - enableLightPushFilterCheck?: boolean; +export type FilterProtocolOptions = { + /** + * Interval with which Filter subscription will attempt to send ping requests to subscribed peers. + * + * @default 60_000 + */ + keepAliveIntervalMs: number; + + /** + * Number of failed pings allowed to make to a remote peer before attempting to subscribe to a new one. + * + * @default 3 + */ + pingsBeforePeerRenewed: number; + + /** + * Enables js-waku to send probe LightPush message over subscribed pubsubTopics on created subscription. + * In case message won't be received back through Filter - js-waku will attempt to subscribe to another peer. + * + * @default false + */ + enableLightPushFilterCheck: boolean; }; export interface ISubscription { subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - options?: SubscribeOptions + callback: Callback ): Promise; unsubscribe(contentTopics: ContentTopic[]): Promise; @@ -38,8 +55,7 @@ export interface ISubscription { export type IFilter = IReceiver & { protocol: IBaseProtocolCore } & { subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - subscribeOptions?: SubscribeOptions + callback: Callback ): Promise; }; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 20cdcc77ee..b386e1d4fb 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -2,6 +2,7 @@ import type { Libp2p } from "@libp2p/interface"; import type { PeerId } from "@libp2p/interface"; import type { ConnectionManagerOptions } from "./connection_manager.js"; +import type { FilterProtocolOptions } from "./filter.js"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { IDecodedMessage } from "./message.js"; import { ThisAndThat, ThisOrThat } from "./misc.js"; @@ -86,9 +87,16 @@ export type ProtocolCreateOptions = { bootstrapPeers?: string[]; /** - * Configuration for connection manager. If not specified - default values are applied. + * Configuration for connection manager. + * If not specified - default values are applied. */ connectionManager?: Partial; + + /** + * Configuration for Filter protocol. + * If not specified - default values are applied. + */ + filter?: Partial; }; export type Callback = ( diff --git a/packages/sdk/src/protocols/filter/constants.ts b/packages/sdk/src/protocols/filter/constants.ts index 9477a7e417..90ec3794b6 100644 --- a/packages/sdk/src/protocols/filter/constants.ts +++ b/packages/sdk/src/protocols/filter/constants.ts @@ -1,8 +1,4 @@ export const DEFAULT_KEEP_ALIVE = 60_000; +export const DEFAULT_MAX_PINGS = 3; export const DEFAULT_LIGHT_PUSH_FILTER_CHECK = false; export const DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL = 10_000; - -export const DEFAULT_SUBSCRIBE_OPTIONS = { - keepAlive: DEFAULT_KEEP_ALIVE, - enableLightPushFilterCheck: DEFAULT_LIGHT_PUSH_FILTER_CHECK -}; diff --git a/packages/sdk/src/protocols/filter/index.ts b/packages/sdk/src/protocols/filter/index.ts index 1d4cc4e656..f78e4d8ae7 100644 --- a/packages/sdk/src/protocols/filter/index.ts +++ b/packages/sdk/src/protocols/filter/index.ts @@ -2,6 +2,7 @@ import { ConnectionManager, FilterCore } from "@waku/core"; import type { Callback, CreateSubscriptionResult, + FilterProtocolOptions, IAsyncIterator, IDecodedMessage, IDecoder, @@ -10,7 +11,6 @@ import type { IProtoMessage, Libp2p, PubsubTopic, - SubscribeOptions, SubscribeResult, Unsubscribe } from "@waku/interfaces"; @@ -25,15 +25,16 @@ import { import { PeerManager } from "../peer_manager.js"; -import { DEFAULT_SUBSCRIBE_OPTIONS } from "./constants.js"; import { MessageCache } from "./message_cache.js"; import { SubscriptionManager } from "./subscription_manager.js"; +import { buildConfig } from "./utils.js"; const log = new Logger("sdk:filter"); class Filter implements IFilter { public readonly protocol: FilterCore; + private readonly config: FilterProtocolOptions; private readonly messageCache: MessageCache; private activeSubscriptions = new Map(); @@ -41,8 +42,10 @@ class Filter implements IFilter { private connectionManager: ConnectionManager, private libp2p: Libp2p, private peerManager: PeerManager, - private lightPush?: ILightPush + private lightPush?: ILightPush, + config?: Partial ) { + this.config = buildConfig(config); this.messageCache = new MessageCache(libp2p); this.protocol = new FilterCore( @@ -79,7 +82,6 @@ class Filter implements IFilter { * * @param {IDecoder | IDecoder[]} decoders - A single decoder or an array of decoders to use for decoding messages. * @param {Callback} callback - The callback function to be invoked with decoded messages. - * @param {SubscribeOptions} [subscribeOptions=DEFAULT_SUBSCRIBE_OPTIONS] - Options for the subscription. * * @returns {Promise} A promise that resolves to an object containing: * - subscription: The created subscription object if successful, or null if failed. @@ -113,8 +115,7 @@ class Filter implements IFilter { */ public async subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - subscribeOptions: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + callback: Callback ): Promise { const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); @@ -140,8 +141,7 @@ class Filter implements IFilter { const { failures, successes } = await subscription.subscribe( decoders, - callback, - subscribeOptions + callback ); return { subscription, @@ -192,6 +192,7 @@ class Filter implements IFilter { this.connectionManager, this.peerManager, this.libp2p, + this.config, this.lightPush ) ); @@ -219,8 +220,7 @@ class Filter implements IFilter { */ public async subscribeWithUnsubscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + callback: Callback ): Promise { const uniquePubsubTopics = this.getUniquePubsubTopics(decoders); @@ -244,7 +244,7 @@ class Filter implements IFilter { throw Error(`Failed to create subscription: ${error}`); } - await subscription.subscribe(decoders, callback, options); + await subscription.subscribe(decoders, callback); const contentTopics = Array.from( groupByContentTopic( @@ -298,8 +298,9 @@ class Filter implements IFilter { export function wakuFilter( connectionManager: ConnectionManager, peerManager: PeerManager, - lightPush?: ILightPush + lightPush?: ILightPush, + config?: Partial ): (libp2p: Libp2p) => IFilter { return (libp2p: Libp2p) => - new Filter(connectionManager, libp2p, peerManager, lightPush); + new Filter(connectionManager, libp2p, peerManager, lightPush, config); } diff --git a/packages/sdk/src/protocols/filter/subscription_manager.ts b/packages/sdk/src/protocols/filter/subscription_manager.ts index 59af175121..21bc45a446 100644 --- a/packages/sdk/src/protocols/filter/subscription_manager.ts +++ b/packages/sdk/src/protocols/filter/subscription_manager.ts @@ -12,6 +12,7 @@ import { type ContentTopic, type CoreProtocolResult, EConnectionStateEvents, + FilterProtocolOptions, type IDecodedMessage, type IDecoder, type ILightPush, @@ -22,7 +23,6 @@ import { ProtocolError, type PubsubTopic, type SDKProtocolResult, - type SubscribeOptions, SubscriptionCallback } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; @@ -32,23 +32,17 @@ import { ReliabilityMonitorManager } from "../../reliability_monitor/index.js"; import { ReceiverReliabilityMonitor } from "../../reliability_monitor/receiver.js"; import { PeerManager } from "../peer_manager.js"; -import { - DEFAULT_KEEP_ALIVE, - DEFAULT_LIGHT_PUSH_FILTER_CHECK, - DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL, - DEFAULT_SUBSCRIBE_OPTIONS -} from "./constants.js"; +import { DEFAULT_LIGHT_PUSH_FILTER_CHECK_INTERVAL } from "./constants.js"; const log = new Logger("sdk:filter:subscription_manager"); export class SubscriptionManager implements ISubscription { private reliabilityMonitor: ReceiverReliabilityMonitor; - private keepAliveTimeout: number = DEFAULT_KEEP_ALIVE; + private keepAliveTimeout: number; + private enableLightPushFilterCheck: boolean; private keepAliveInterval: ReturnType | null = null; - private enableLightPushFilterCheck = DEFAULT_LIGHT_PUSH_FILTER_CHECK; - private subscriptionCallbacks: Map< ContentTopic, SubscriptionCallback @@ -60,6 +54,7 @@ export class SubscriptionManager implements ISubscription { private readonly connectionManager: ConnectionManager, private readonly peerManager: PeerManager, private readonly libp2p: Libp2p, + config: FilterProtocolOptions, private readonly lightPush?: ILightPush ) { this.pubsubTopic = pubsubTopic; @@ -72,18 +67,15 @@ export class SubscriptionManager implements ISubscription { this.protocol.subscribe.bind(this.protocol), this.sendLightPushCheckMessage.bind(this) ); + this.reliabilityMonitor.setMaxPingFailures(config.pingsBeforePeerRenewed); + this.keepAliveTimeout = config.keepAliveIntervalMs; + this.enableLightPushFilterCheck = config.enableLightPushFilterCheck; } public async subscribe( decoders: IDecoder | IDecoder[], - callback: Callback, - options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS + callback: Callback ): Promise { - this.reliabilityMonitor.setMaxPingFailures(options.pingsBeforePeerRenewed); - this.keepAliveTimeout = options.keepAlive || DEFAULT_KEEP_ALIVE; - this.enableLightPushFilterCheck = - options?.enableLightPushFilterCheck || DEFAULT_LIGHT_PUSH_FILTER_CHECK; - const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; // check that all decoders are configured for the same pubsub topic as this subscription diff --git a/packages/sdk/src/protocols/filter/utils.ts b/packages/sdk/src/protocols/filter/utils.ts new file mode 100644 index 0000000000..9c926ae36c --- /dev/null +++ b/packages/sdk/src/protocols/filter/utils.ts @@ -0,0 +1,15 @@ +import { FilterProtocolOptions } from "@waku/interfaces"; + +import * as C from "./constants.js"; + +export const buildConfig = ( + config?: Partial +): FilterProtocolOptions => { + return { + keepAliveIntervalMs: config?.keepAliveIntervalMs || C.DEFAULT_KEEP_ALIVE, + pingsBeforePeerRenewed: + config?.pingsBeforePeerRenewed || C.DEFAULT_MAX_PINGS, + enableLightPushFilterCheck: + config?.enableLightPushFilterCheck || C.DEFAULT_LIGHT_PUSH_FILTER_CHECK + }; +}; diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 4700c372e9..c2f3a339e2 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -92,7 +92,8 @@ export class WakuNode implements IWaku { const filter = wakuFilter( this.connectionManager, this.peerManager, - this.lightPush + this.lightPush, + options.filter ); this.filter = filter(libp2p); }