fix: add stop methods to protocols to prevent event listener leaks

This commit is contained in:
Levente Kiss 2025-10-31 18:36:29 +13:00
parent 5334a7fcc9
commit 519b335529
5 changed files with 46 additions and 10 deletions

View File

@ -61,6 +61,7 @@ export class FilterCore {
}
public async stop(): Promise<void> {
this.streamManager.stop();
try {
await this.libp2p.unhandle(FilterCodecs.PUSH);
} catch (e) {

View File

@ -33,6 +33,11 @@ export class LightPushCore {
this.streamManager = new StreamManager(CODECS.v3, libp2p.components);
}
public stop(): void {
this.streamManager.stop();
this.streamManagerV2.stop();
}
public async send(
encoder: IEncoder,
message: IMessage,

View File

@ -35,6 +35,10 @@ export class StoreCore {
this.streamManager = new StreamManager(StoreCodec, libp2p.components);
}
public stop(): void {
this.streamManager.stop();
}
public get maxTimeLimit(): number {
return MAX_TIME_RANGE;
}

View File

@ -23,6 +23,15 @@ export class StreamManager {
);
}
public stop(): void {
this.libp2p.events.removeEventListener(
"peer:update",
this.handlePeerUpdateStreamPool
);
this.streamPool.clear();
this.ongoingCreation.clear();
}
public async getStream(peerId: PeerId): Promise<Stream | undefined> {
try {
const peerIdStr = peerId.toString();

View File

@ -67,6 +67,10 @@ export class Relay implements IRelay {
* Observers under key `""` are always called.
*/
private observers: Map<PubsubTopic, Map<ContentTopic, Set<unknown>>>;
private messageEventHandlers: Map<
PubsubTopic,
(event: CustomEvent<GossipsubMessage>) => void
> = new Map();
public constructor(params: RelayConstructorParams) {
if (!this.isRelayPubsub(params.libp2p.services.pubsub)) {
@ -105,6 +109,19 @@ export class Relay implements IRelay {
this.subscribeToAllTopics();
}
public async stop(): Promise<void> {
for (const pubsubTopic of this.pubsubTopics) {
const handler = this.messageEventHandlers.get(pubsubTopic);
if (handler) {
this.gossipSub.removeEventListener("gossipsub:message", handler);
}
this.gossipSub.topicValidators.delete(pubsubTopic);
this.gossipSub.unsubscribe(pubsubTopic);
}
this.messageEventHandlers.clear();
this.observers.clear();
}
/**
* Wait for at least one peer with the given protocol to be connected and in the gossipsub
* mesh for all pubsubTopics.
@ -299,17 +316,17 @@ export class Relay implements IRelay {
* @override
*/
private gossipSubSubscribe(pubsubTopic: string): void {
this.gossipSub.addEventListener(
"gossipsub:message",
(event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic !== pubsubTopic) return;
const handler = (event: CustomEvent<GossipsubMessage>): void => {
if (event.detail.msg.topic !== pubsubTopic) return;
this.processIncomingMessage(
event.detail.msg.topic,
event.detail.msg.data
).catch((e) => log.error("Failed to process incoming message", e));
}
);
this.processIncomingMessage(
event.detail.msg.topic,
event.detail.msg.data
).catch((e) => log.error("Failed to process incoming message", e));
};
this.messageEventHandlers.set(pubsubTopic, handler);
this.gossipSub.addEventListener("gossipsub:message", handler);
this.gossipSub.topicValidators.set(pubsubTopic, messageValidator);
this.gossipSub.subscribe(pubsubTopic);