diff --git a/packages/core/src/lib/filter/filter.ts b/packages/core/src/lib/filter/filter.ts index 0b8a32b92e..4c6d7dcfcb 100644 --- a/packages/core/src/lib/filter/filter.ts +++ b/packages/core/src/lib/filter/filter.ts @@ -61,6 +61,7 @@ export class FilterCore { } public async stop(): Promise { + this.streamManager.stop(); try { await this.libp2p.unhandle(FilterCodecs.PUSH); } catch (e) { diff --git a/packages/core/src/lib/light_push/light_push.ts b/packages/core/src/lib/light_push/light_push.ts index eb3b517eeb..6b7f7b8a9f 100644 --- a/packages/core/src/lib/light_push/light_push.ts +++ b/packages/core/src/lib/light_push/light_push.ts @@ -33,6 +33,11 @@ export class LightPushCore { this.streamManager = new StreamManager(CODECS.v3, libp2p.components); } + public stop(): void { + this.streamManager.stop(); + this.streamManagerV2.stop(); + } + public async send( encoder: IEncoder, message: IMessage, diff --git a/packages/core/src/lib/store/store.ts b/packages/core/src/lib/store/store.ts index acf2398f9b..9597ecf1d2 100644 --- a/packages/core/src/lib/store/store.ts +++ b/packages/core/src/lib/store/store.ts @@ -35,6 +35,10 @@ export class StoreCore { this.streamManager = new StreamManager(StoreCodec, libp2p.components); } + public stop(): void { + this.streamManager.stop(); + } + public get maxTimeLimit(): number { return MAX_TIME_RANGE; } diff --git a/packages/core/src/lib/stream_manager/stream_manager.ts b/packages/core/src/lib/stream_manager/stream_manager.ts index 63584c5086..d2fe90bd4c 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.ts @@ -23,6 +23,15 @@ export class StreamManager { ); } + public stop(): void { + this.libp2p.events.removeEventListener( + "peer:update", + this.handlePeerUpdateStreamPool + ); + this.streamPool.clear(); + this.ongoingCreation.clear(); + } + public async getStream(peerId: PeerId): Promise { try { const peerIdStr = peerId.toString(); diff --git a/packages/relay/src/relay.ts b/packages/relay/src/relay.ts index cd1336ef72..c802ec7f6c 100644 --- a/packages/relay/src/relay.ts +++ b/packages/relay/src/relay.ts @@ -67,6 +67,10 @@ export class Relay implements IRelay { * Observers under key `""` are always called. */ private observers: Map>>; + private messageEventHandlers: Map< + PubsubTopic, + (event: CustomEvent) => void + > = new Map(); public constructor(params: RelayConstructorParams) { if (!this.isRelayPubsub(params.libp2p.services.pubsub)) { @@ -105,6 +109,19 @@ export class Relay implements IRelay { this.subscribeToAllTopics(); } + public async stop(): Promise { + for (const pubsubTopic of this.pubsubTopics) { + const handler = this.messageEventHandlers.get(pubsubTopic); + if (handler) { + this.gossipSub.removeEventListener("gossipsub:message", handler); + } + this.gossipSub.topicValidators.delete(pubsubTopic); + this.gossipSub.unsubscribe(pubsubTopic); + } + this.messageEventHandlers.clear(); + this.observers.clear(); + } + /** * Wait for at least one peer with the given protocol to be connected and in the gossipsub * mesh for all pubsubTopics. @@ -299,17 +316,17 @@ export class Relay implements IRelay { * @override */ private gossipSubSubscribe(pubsubTopic: string): void { - this.gossipSub.addEventListener( - "gossipsub:message", - (event: CustomEvent) => { - if (event.detail.msg.topic !== pubsubTopic) return; + const handler = (event: CustomEvent): void => { + if (event.detail.msg.topic !== pubsubTopic) return; - this.processIncomingMessage( - event.detail.msg.topic, - event.detail.msg.data - ).catch((e) => log.error("Failed to process incoming message", e)); - } - ); + this.processIncomingMessage( + event.detail.msg.topic, + event.detail.msg.data + ).catch((e) => log.error("Failed to process incoming message", e)); + }; + + this.messageEventHandlers.set(pubsubTopic, handler); + this.gossipSub.addEventListener("gossipsub:message", handler); this.gossipSub.topicValidators.set(pubsubTopic, messageValidator); this.gossipSub.subscribe(pubsubTopic);