From 1a6bc4f8ce5d3409b3e82b8b0685beb80f48269a Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Mon, 29 Apr 2024 23:31:09 +0200 Subject: [PATCH] feat: add keep alive to Filter (#1970) * fix: use pubsubTopic from current ones if not set * fix: improve type on dial method * enforce same pubusb on filter.subscribe, make content topic to pubsub mapping default for decoder / encoder * fix mapping problem * update tests * add error handling * fix typo * up lock * rm lock * up lock * remove only * feat: implement keep alive for filter subscription * remove * address comments --- packages/interfaces/src/filter.ts | 12 +++--- packages/sdk/src/protocols/filter.ts | 59 ++++++++++++++++++++++++++-- 2 files changed, 62 insertions(+), 9 deletions(-) diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index b167493930..e953910721 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -8,14 +8,15 @@ import type { } from "./protocols.js"; import type { IReceiver } from "./receiver.js"; -export type ContentFilter = { - contentTopic: string; +export type SubscribeOptions = { + keepAlive?: number; }; export interface IFilterSubscription { subscribe( decoders: IDecoder | IDecoder[], - callback: Callback + callback: Callback, + options?: SubscribeOptions ): Promise; unsubscribe(contentTopics: ContentTopic[]): Promise; @@ -25,11 +26,10 @@ export interface IFilterSubscription { unsubscribeAll(): Promise; } -export type IFilter = IReceiver & IBaseProtocolCore; - export type IFilterSDK = IReceiver & IBaseProtocolSDK & { protocol: IBaseProtocolCore } & { createSubscription( - pubsubTopicShardInfo?: ShardingParams | PubsubTopic + pubsubTopicShardInfo?: ShardingParams | PubsubTopic, + options?: SubscribeOptions ): Promise; }; diff --git a/packages/sdk/src/protocols/filter.ts b/packages/sdk/src/protocols/filter.ts index 134b5083c6..e13af706ff 100644 --- a/packages/sdk/src/protocols/filter.ts +++ b/packages/sdk/src/protocols/filter.ts @@ -12,6 +12,7 @@ import type { ProtocolCreateOptions, PubsubTopic, ShardingParams, + SubscribeOptions, Unsubscribe } from "@waku/interfaces"; import { messageHashStr } from "@waku/message-hash"; @@ -33,10 +34,16 @@ type SubscriptionCallback = { const log = new Logger("sdk:filter"); +const MINUTE = 60 * 1000; +const DEFAULT_SUBSCRIBE_OPTIONS = { + keepAlive: MINUTE +}; + export class SubscriptionManager { private readonly pubsubTopic: PubsubTopic; readonly peers: Peer[]; readonly receivedMessagesHashStr: string[] = []; + private keepAliveTimer: number | null = null; private subscriptionCallbacks: Map< ContentTopic, @@ -55,7 +62,8 @@ export class SubscriptionManager { async subscribe( decoders: IDecoder | IDecoder[], - callback: Callback + callback: Callback, + options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { const decodersArray = Array.isArray(decoders) ? decoders : [decoders]; @@ -94,6 +102,10 @@ export class SubscriptionManager { // purpose as the user may call `subscribe` to refresh the subscription this.subscriptionCallbacks.set(contentTopic, subscriptionCallback); }); + + if (options?.keepAlive) { + this.startKeepAlivePings(options.keepAlive); + } } async unsubscribe(contentTopics: ContentTopic[]): Promise { @@ -108,6 +120,10 @@ export class SubscriptionManager { const results = await Promise.allSettled(promises); this.handleErrors(results, "unsubscribe"); + + if (this.subscriptionCallbacks.size === 0 && this.keepAliveTimer) { + this.stopKeepAlivePings(); + } } async ping(): Promise { @@ -130,6 +146,10 @@ export class SubscriptionManager { this.subscriptionCallbacks.clear(); this.handleErrors(results, "unsubscribeAll"); + + if (this.keepAliveTimer) { + this.stopKeepAlivePings(); + } } async processIncomingMessage(message: WakuMessage): Promise { @@ -193,6 +213,38 @@ export class SubscriptionManager { log.info(`${type} successful for all peers`); } } + + private startKeepAlivePings(interval: number): void { + if (this.keepAliveTimer) { + log.info("Recurring pings already set up."); + return; + } + + this.keepAliveTimer = setInterval(() => { + const run = async (): Promise => { + try { + log.info("Recurring ping to peers."); + await this.ping(); + } catch (error) { + log.error("Stopping recurring pings due to failure", error); + this.stopKeepAlivePings(); + } + }; + + void run(); + }, interval) as unknown as number; + } + + private stopKeepAlivePings(): void { + if (!this.keepAliveTimer) { + log.info("Already stopped recurring pings."); + return; + } + + log.info("Stopping recurring pings."); + clearInterval(this.keepAliveTimer); + this.keepAliveTimer = null; + } } class FilterSDK extends BaseProtocolSDK implements IFilterSDK { @@ -291,7 +343,8 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { */ async subscribe( decoders: IDecoder | IDecoder[], - callback: Callback + callback: Callback, + options: SubscribeOptions = DEFAULT_SUBSCRIBE_OPTIONS ): Promise { const pubsubTopics = this.getPubsubTopics(decoders); @@ -309,7 +362,7 @@ class FilterSDK extends BaseProtocolSDK implements IFilterSDK { const subscription = await this.createSubscription(pubsubTopics[0]); - await subscription.subscribe(decoders, callback); + await subscription.subscribe(decoders, callback, options); const contentTopics = Array.from( groupByContentTopic(