diff --git a/packages/core/src/lib/filter/filter.ts b/packages/core/src/lib/filter/filter.ts index 146fcd73c2..f76203fbfb 100644 --- a/packages/core/src/lib/filter/filter.ts +++ b/packages/core/src/lib/filter/filter.ts @@ -42,20 +42,30 @@ export class FilterCore { public constructor( private handleIncomingMessage: IncomingMessageHandler, - libp2p: Libp2p + private libp2p: Libp2p ) { this.streamManager = new StreamManager( FilterCodecs.SUBSCRIBE, libp2p.components ); + } - libp2p - .handle(FilterCodecs.PUSH, this.onRequest.bind(this), { + public async start(): Promise { + try { + await this.libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this), { maxInboundStreams: 100 - }) - .catch((e) => { - log.error("Failed to register ", FilterCodecs.PUSH, e); }); + } catch (e) { + log.error("Failed to register ", FilterCodecs.PUSH, e); + } + } + + public async stop(): Promise { + try { + await this.libp2p.unhandle(FilterCodecs.PUSH); + } catch (e) { + log.error("Failed to unregister ", FilterCodecs.PUSH, e); + } } public async subscribe( diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index 1eff38e21b..cf383fcf4e 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -4,6 +4,16 @@ import type { Callback } from "./protocols.js"; export type IFilter = { readonly multicodec: string; + /** + * Starts the filter protocol. + */ + start(): Promise; + + /** + * Stops the filter protocol. + */ + stop(): Promise; + /** * Subscribes to messages that match the filtering criteria defined in the specified decoders. * Executes a callback upon receiving each message. diff --git a/packages/sdk/src/filter/filter.ts b/packages/sdk/src/filter/filter.ts index 43895fab7c..4d12f8d32d 100644 --- a/packages/sdk/src/filter/filter.ts +++ b/packages/sdk/src/filter/filter.ts @@ -45,6 +45,14 @@ export class Filter implements IFilter { return this.protocol.multicodec; } + public async start(): Promise { + await this.protocol.start(); + } + + public async stop(): Promise { + await this.protocol.stop(); + } + public unsubscribeAll(): void { for (const subscription of this.subscriptions.values()) { subscription.stop(); diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 4f89080215..a568474b1a 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -216,6 +216,7 @@ export class WakuNode implements IWaku { this._nodeStateLock = true; await this.libp2p.start(); + await this.filter?.start(); this.connectionManager.start(); this.peerManager.start(); this.healthIndicator.start(); @@ -231,6 +232,7 @@ export class WakuNode implements IWaku { this._nodeStateLock = true; this.lightPush?.stop(); + await this.filter?.stop(); this.healthIndicator.stop(); this.peerManager.stop(); this.connectionManager.stop();