From 84a6ea69cf8630dacea0cafd58dd8c605ee8dc48 Mon Sep 17 00:00:00 2001 From: fryorcraken <110212804+fryorcraken@users.noreply.github.com> Date: Thu, 13 Nov 2025 12:32:15 +1100 Subject: [PATCH] fix: cleanup routines on reliable channel and core protocols (#2733) * fix: add stop methods to protocols to prevent event listener leaks * fix: add abort signal support for graceful store query cancellation * fix: call protocol stop methods in WakuNode.stop() * fix: improve QueryOnConnect cleanup and abort signal handling * fix: improve MissingMessageRetriever cleanup with abort signal * fix: add stopAllRetries method to RetryManager for proper cleanup * fix: implement comprehensive ReliableChannel stop() with proper cleanup * fix: add active query tracking to QueryOnConnect and await its stop() * fix: add stop() to IRelayAPI and IStore interfaces, implement in SDK wrappers * align with usual naming (isStarted) * remove unnecessary `await` * test: `stop()` is now async * chore: use more concise syntax --------- Co-authored-by: Levente Kiss --- packages/core/src/lib/filter/filter.ts | 1 + .../core/src/lib/light_push/light_push.ts | 5 + packages/core/src/lib/store/store.ts | 37 +++++- .../src/lib/stream_manager/stream_manager.ts | 9 ++ packages/interfaces/src/relay.ts | 1 + packages/interfaces/src/store.ts | 7 + packages/relay/src/relay.ts | 37 ++++-- packages/sdk/src/light_push/light_push.ts | 1 + .../query_on_connect/query_on_connect.spec.ts | 4 +- .../src/query_on_connect/query_on_connect.ts | 83 +++++++++--- .../missing_message_retriever.ts | 57 +++++++-- .../src/reliable_channel/reliable_channel.ts | 120 +++++++++++++++--- .../sdk/src/reliable_channel/retry_manager.ts | 10 +- packages/sdk/src/store/store.ts | 4 + packages/sdk/src/waku/waku.ts | 2 + 15 files changed, 312 insertions(+), 66 deletions(-) diff --git a/packages/core/src/lib/filter/filter.ts b/packages/core/src/lib/filter/filter.ts index 0b8a32b92e..4c6d7dcfcb 100644 --- a/packages/core/src/lib/filter/filter.ts +++ b/packages/core/src/lib/filter/filter.ts @@ -61,6 +61,7 @@ export class FilterCore { } public async stop(): Promise { + this.streamManager.stop(); try { await this.libp2p.unhandle(FilterCodecs.PUSH); } catch (e) { diff --git a/packages/core/src/lib/light_push/light_push.ts b/packages/core/src/lib/light_push/light_push.ts index eb3b517eeb..6b7f7b8a9f 100644 --- a/packages/core/src/lib/light_push/light_push.ts +++ b/packages/core/src/lib/light_push/light_push.ts @@ -33,6 +33,11 @@ export class LightPushCore { this.streamManager = new StreamManager(CODECS.v3, libp2p.components); } + public stop(): void { + this.streamManager.stop(); + this.streamManagerV2.stop(); + } + public async send( encoder: IEncoder, message: IMessage, diff --git a/packages/core/src/lib/store/store.ts b/packages/core/src/lib/store/store.ts index acf2398f9b..5a25cfb4de 100644 --- a/packages/core/src/lib/store/store.ts +++ b/packages/core/src/lib/store/store.ts @@ -35,6 +35,10 @@ export class StoreCore { this.streamManager = new StreamManager(StoreCodec, libp2p.components); } + public stop(): void { + this.streamManager.stop(); + } + public get maxTimeLimit(): number { return MAX_TIME_RANGE; } @@ -68,6 +72,11 @@ export class StoreCore { let currentCursor = queryOpts.paginationCursor; while (true) { + if (queryOpts.abortSignal?.aborted) { + log.info("Store query aborted by signal"); + break; + } + const storeQueryRequest = StoreQueryRequest.create({ ...queryOpts, paginationCursor: currentCursor @@ -89,13 +98,22 @@ export class StoreCore { break; } - const res = await pipe( - [storeQueryRequest.encode()], - lp.encode, - stream, - lp.decode, - async (source) => await all(source) - ); + let res; + try { + res = await pipe( + [storeQueryRequest.encode()], + lp.encode, + stream, + lp.decode, + async (source) => await all(source) + ); + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + log.info(`Store query aborted for peer ${peerId.toString()}`); + break; + } + throw error; + } const bytes = new Uint8ArrayList(); res.forEach((chunk) => { @@ -122,6 +140,11 @@ export class StoreCore { `${storeQueryResponse.messages.length} messages retrieved from store` ); + if (queryOpts.abortSignal?.aborted) { + log.info("Store query aborted by signal before processing messages"); + break; + } + const decodedMessages = storeQueryResponse.messages.map((protoMsg) => { if (!protoMsg.message) { return Promise.resolve(undefined); diff --git a/packages/core/src/lib/stream_manager/stream_manager.ts b/packages/core/src/lib/stream_manager/stream_manager.ts index 63584c5086..d2fe90bd4c 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.ts @@ -23,6 +23,15 @@ export class StreamManager { ); } + public stop(): void { + this.libp2p.events.removeEventListener( + "peer:update", + this.handlePeerUpdateStreamPool + ); + this.streamPool.clear(); + this.ongoingCreation.clear(); + } + public async getStream(peerId: PeerId): Promise { try { const peerIdStr = peerId.toString(); diff --git a/packages/interfaces/src/relay.ts b/packages/interfaces/src/relay.ts index 2172ac7be8..58910fed74 100644 --- a/packages/interfaces/src/relay.ts +++ b/packages/interfaces/src/relay.ts @@ -16,6 +16,7 @@ export interface IRelayAPI { readonly pubsubTopics: Set; readonly gossipSub: GossipSub; start: () => Promise; + stop: () => Promise; waitForPeers: () => Promise; getMeshPeers: (topic?: TopicStr) => PeerIdStr[]; } diff --git a/packages/interfaces/src/store.ts b/packages/interfaces/src/store.ts index 8a1e91a451..d5fb262577 100644 --- a/packages/interfaces/src/store.ts +++ b/packages/interfaces/src/store.ts @@ -88,11 +88,18 @@ export type QueryRequestParams = { * Only use if you know what you are doing. */ peerId?: PeerId; + + /** + * An optional AbortSignal to cancel the query. + * When the signal is aborted, the query will stop processing and return early. + */ + abortSignal?: AbortSignal; }; export type IStore = { readonly multicodec: string; + stop(): void; createCursor(message: IDecodedMessage): StoreCursor; queryGenerator: ( decoders: IDecoder[], diff --git a/packages/relay/src/relay.ts b/packages/relay/src/relay.ts index cd1336ef72..c802ec7f6c 100644 --- a/packages/relay/src/relay.ts +++ b/packages/relay/src/relay.ts @@ -67,6 +67,10 @@ export class Relay implements IRelay { * Observers under key `""` are always called. */ private observers: Map>>; + private messageEventHandlers: Map< + PubsubTopic, + (event: CustomEvent) => void + > = new Map(); public constructor(params: RelayConstructorParams) { if (!this.isRelayPubsub(params.libp2p.services.pubsub)) { @@ -105,6 +109,19 @@ export class Relay implements IRelay { this.subscribeToAllTopics(); } + public async stop(): Promise { + for (const pubsubTopic of this.pubsubTopics) { + const handler = this.messageEventHandlers.get(pubsubTopic); + if (handler) { + this.gossipSub.removeEventListener("gossipsub:message", handler); + } + this.gossipSub.topicValidators.delete(pubsubTopic); + this.gossipSub.unsubscribe(pubsubTopic); + } + this.messageEventHandlers.clear(); + this.observers.clear(); + } + /** * Wait for at least one peer with the given protocol to be connected and in the gossipsub * mesh for all pubsubTopics. @@ -299,17 +316,17 @@ export class Relay implements IRelay { * @override */ private gossipSubSubscribe(pubsubTopic: string): void { - this.gossipSub.addEventListener( - "gossipsub:message", - (event: CustomEvent) => { - if (event.detail.msg.topic !== pubsubTopic) return; + const handler = (event: CustomEvent): void => { + if (event.detail.msg.topic !== pubsubTopic) return; - this.processIncomingMessage( - event.detail.msg.topic, - event.detail.msg.data - ).catch((e) => log.error("Failed to process incoming message", e)); - } - ); + this.processIncomingMessage( + event.detail.msg.topic, + event.detail.msg.data + ).catch((e) => log.error("Failed to process incoming message", e)); + }; + + this.messageEventHandlers.set(pubsubTopic, handler); + this.gossipSub.addEventListener("gossipsub:message", handler); this.gossipSub.topicValidators.set(pubsubTopic, messageValidator); this.gossipSub.subscribe(pubsubTopic); diff --git a/packages/sdk/src/light_push/light_push.ts b/packages/sdk/src/light_push/light_push.ts index 669c77e38c..7cf2bc752e 100644 --- a/packages/sdk/src/light_push/light_push.ts +++ b/packages/sdk/src/light_push/light_push.ts @@ -65,6 +65,7 @@ export class LightPush implements ILightPush { public stop(): void { this.retryManager.stop(); + this.protocol.stop(); } public async send( diff --git a/packages/sdk/src/query_on_connect/query_on_connect.spec.ts b/packages/sdk/src/query_on_connect/query_on_connect.spec.ts index b87caa5ce7..c296b7a7fa 100644 --- a/packages/sdk/src/query_on_connect/query_on_connect.spec.ts +++ b/packages/sdk/src/query_on_connect/query_on_connect.spec.ts @@ -158,14 +158,14 @@ describe("QueryOnConnect", () => { expect(wakuEventSpy.calledWith(WakuEvent.Health)).to.be.true; }); - it("should remove event listeners when stopped", () => { + it("should remove event listeners when stopped", async () => { const peerRemoveSpy = mockPeerManagerEventEmitter.removeEventListener as sinon.SinonSpy; const wakuRemoveSpy = mockWakuEventEmitter.removeEventListener as sinon.SinonSpy; queryOnConnect.start(); - queryOnConnect.stop(); + await queryOnConnect.stop(); expect(peerRemoveSpy.calledWith(PeerManagerEventNames.StoreConnect)).to.be .true; diff --git a/packages/sdk/src/query_on_connect/query_on_connect.ts b/packages/sdk/src/query_on_connect/query_on_connect.ts index da9e78a763..a7e4973670 100644 --- a/packages/sdk/src/query_on_connect/query_on_connect.ts +++ b/packages/sdk/src/query_on_connect/query_on_connect.ts @@ -52,6 +52,13 @@ export class QueryOnConnect< private lastTimeOffline: number; private readonly forceQueryThresholdMs: number; + private isStarted: boolean = false; + private abortController?: AbortController; + private activeQueryPromise?: Promise; + + private boundStoreConnectHandler?: (event: CustomEvent) => void; + private boundHealthHandler?: (event: CustomEvent) => void; + public constructor( public decoders: IDecoder[], public stopIfTrue: (msg: T) => boolean, @@ -71,11 +78,37 @@ export class QueryOnConnect< } public start(): void { + if (this.isStarted) { + log.warn("QueryOnConnect already running"); + return; + } log.info("starting query-on-connect service"); + this.isStarted = true; + this.abortController = new AbortController(); this.setupEventListeners(); } - public stop(): void { + public async stop(): Promise { + if (!this.isStarted) { + return; + } + log.info("stopping query-on-connect service"); + this.isStarted = false; + + if (this.abortController) { + this.abortController.abort(); + this.abortController = undefined; + } + + if (this.activeQueryPromise) { + log.info("Waiting for active query to complete..."); + try { + await this.activeQueryPromise; + } catch (error) { + log.warn("Active query failed during stop:", error); + } + } + this.unsetEventListeners(); } @@ -107,7 +140,10 @@ export class QueryOnConnect< this.lastTimeOffline > this.lastSuccessfulQuery || timeSinceLastQuery > this.forceQueryThresholdMs ) { - await this.query(peerId); + this.activeQueryPromise = this.query(peerId).finally(() => { + this.activeQueryPromise = undefined; + }); + await this.activeQueryPromise; } else { log.info(`no querying`); } @@ -120,7 +156,8 @@ export class QueryOnConnect< for await (const page of this._queryGenerator(this.decoders, { timeStart, timeEnd, - peerId + peerId, + abortSignal: this.abortController?.signal })) { // Await for decoding const messages = (await Promise.all(page)).filter( @@ -166,33 +203,41 @@ export class QueryOnConnect< } private setupEventListeners(): void { + this.boundStoreConnectHandler = (event: CustomEvent) => { + void this.maybeQuery(event.detail).catch((err) => + log.error("query-on-connect error", err) + ); + }; + + this.boundHealthHandler = this.updateLastOfflineDate.bind(this); + this.peerManagerEventEmitter.addEventListener( PeerManagerEventNames.StoreConnect, - (event) => - void this.maybeQuery(event.detail).catch((err) => - log.error("query-on-connect error", err) - ) + this.boundStoreConnectHandler ); this.wakuEventEmitter.addEventListener( WakuEvent.Health, - this.updateLastOfflineDate.bind(this) + this.boundHealthHandler ); } private unsetEventListeners(): void { - this.peerManagerEventEmitter.removeEventListener( - PeerManagerEventNames.StoreConnect, - (event) => - void this.maybeQuery(event.detail).catch((err) => - log.error("query-on-connect error", err) - ) - ); + if (this.boundStoreConnectHandler) { + this.peerManagerEventEmitter.removeEventListener( + PeerManagerEventNames.StoreConnect, + this.boundStoreConnectHandler + ); + this.boundStoreConnectHandler = undefined; + } - this.wakuEventEmitter.removeEventListener( - WakuEvent.Health, - this.updateLastOfflineDate.bind(this) - ); + if (this.boundHealthHandler) { + this.wakuEventEmitter.removeEventListener( + WakuEvent.Health, + this.boundHealthHandler + ); + this.boundHealthHandler = undefined; + } } private updateLastOfflineDate(event: CustomEvent): void { diff --git a/packages/sdk/src/reliable_channel/missing_message_retriever.ts b/packages/sdk/src/reliable_channel/missing_message_retriever.ts index f5f1cb3503..2f9e63217c 100644 --- a/packages/sdk/src/reliable_channel/missing_message_retriever.ts +++ b/packages/sdk/src/reliable_channel/missing_message_retriever.ts @@ -13,6 +13,8 @@ const DEFAULT_RETRIEVE_FREQUENCY_MS = 10 * 1000; // 10 seconds export class MissingMessageRetriever { private retrieveInterval: ReturnType | undefined; private missingMessages: Map>; // Waku Message Ids + private activeQueryPromise: Promise | undefined; + private abortController?: AbortController; public constructor( private readonly decoder: IDecoder, @@ -29,7 +31,11 @@ export class MissingMessageRetriever { public start(): void { if (this.retrieveInterval) { clearInterval(this.retrieveInterval); + this.retrieveInterval = undefined; } + + this.abortController = new AbortController(); + if (this.retrieveFrequencyMs !== 0) { log.info(`start retrieve loop every ${this.retrieveFrequencyMs}ms`); this.retrieveInterval = setInterval(() => { @@ -38,10 +44,30 @@ export class MissingMessageRetriever { } } - public stop(): void { + public async stop(): Promise { + log.info("Stopping MissingMessageRetriever..."); + if (this.retrieveInterval) { clearInterval(this.retrieveInterval); + this.retrieveInterval = undefined; } + + if (this.abortController) { + this.abortController.abort(); + this.abortController = undefined; + } + + if (this.activeQueryPromise) { + log.info("Waiting for active query to complete..."); + try { + await this.activeQueryPromise; + } catch (error) { + log.warn("Active query failed during stop:", error); + } + } + + this.missingMessages.clear(); + log.info("MissingMessageRetriever stopped"); } public addMissingMessage( @@ -64,15 +90,30 @@ export class MissingMessageRetriever { if (this.missingMessages.size) { const messageHashes = Array.from(this.missingMessages.values()); log.info("attempting to retrieve missing message", messageHashes.length); - for await (const page of this._retrieve([this.decoder], { - messageHashes - })) { - for await (const msg of page) { - if (msg && this.onMessageRetrieved) { - await this.onMessageRetrieved(msg); + + this.activeQueryPromise = (async () => { + try { + for await (const page of this._retrieve([this.decoder], { + messageHashes, + abortSignal: this.abortController?.signal + })) { + for await (const msg of page) { + if (msg && this.onMessageRetrieved) { + await this.onMessageRetrieved(msg); + } + } } + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + log.info("Store query aborted"); + return; + } + log.error("Store query failed:", error); } - } + })(); + + await this.activeQueryPromise; + this.activeQueryPromise = undefined; } } } diff --git a/packages/sdk/src/reliable_channel/reliable_channel.ts b/packages/sdk/src/reliable_channel/reliable_channel.ts index 49b55aa495..cc55a06d0b 100644 --- a/packages/sdk/src/reliable_channel/reliable_channel.ts +++ b/packages/sdk/src/reliable_channel/reliable_channel.ts @@ -17,6 +17,7 @@ import { isContentMessage, MessageChannel, MessageChannelEvent, + MessageChannelEvents, type MessageChannelOptions, Message as SdsMessage, type SenderId, @@ -136,11 +137,16 @@ export class ReliableChannel< callback: Callback ) => Promise; + private readonly _unsubscribe?: ( + decoders: IDecoder | IDecoder[] + ) => Promise; + private readonly _retrieve?: ( decoders: IDecoder[], options?: Partial ) => AsyncGenerator[]>; + private eventListenerCleanups: Array<() => void> = []; private readonly syncMinIntervalMs: number; private syncTimeout: ReturnType | undefined; private sweepInBufInterval: ReturnType | undefined; @@ -151,6 +157,7 @@ export class ReliableChannel< private readonly queryOnConnect?: QueryOnConnect; private readonly processTaskMinElapseMs: number; private _started: boolean; + private activePendingProcessTask?: Promise; private constructor( public node: IWaku, @@ -170,6 +177,7 @@ export class ReliableChannel< if (node.filter) { this._subscribe = node.filter.subscribe.bind(node.filter); + this._unsubscribe = node.filter.unsubscribe.bind(node.filter); } else if (node.relay) { // TODO: Why do relay and filter have different interfaces? // this._subscribe = node.relay.subscribeWithUnsubscribe; @@ -384,10 +392,21 @@ export class ReliableChannel< private async subscribe(): Promise { this.assertStarted(); return this._subscribe(this.decoder, async (message: T) => { + if (!this._started) { + log.info("ReliableChannel stopped, ignoring incoming message"); + return; + } await this.processIncomingMessage(message); }); } + private async unsubscribe(): Promise { + if (!this._unsubscribe) { + throw Error("No unsubscribe method available"); + } + return this._unsubscribe(this.decoder); + } + /** * Don't forget to call `this.messageChannel.sweepIncomingBuffer();` once done. * @param msg @@ -458,12 +477,19 @@ export class ReliableChannel< // TODO: For now we only queue process tasks for incoming messages // As this is where there is most volume private queueProcessTasks(): void { + if (!this._started) return; + // If one is already queued, then we can ignore it if (this.processTaskTimeout === undefined) { this.processTaskTimeout = setTimeout(() => { - void this.messageChannel.processTasks().catch((err) => { - log.error("error encountered when processing sds tasks", err); - }); + this.activePendingProcessTask = this.messageChannel + .processTasks() + .catch((err) => { + log.error("error encountered when processing sds tasks", err); + }) + .finally(() => { + this.activePendingProcessTask = undefined; + }); // Clear timeout once triggered clearTimeout(this.processTaskTimeout); @@ -485,15 +511,35 @@ export class ReliableChannel< return this.subscribe(); } - public stop(): void { + public async stop(): Promise { if (!this._started) return; + + log.info("Stopping ReliableChannel..."); this._started = false; + this.stopSync(); this.stopSweepIncomingBufferLoop(); - this.missingMessageRetriever?.stop(); - this.queryOnConnect?.stop(); - // TODO unsubscribe - // TODO unsetMessageListeners + + if (this.processTaskTimeout) { + clearTimeout(this.processTaskTimeout); + this.processTaskTimeout = undefined; + } + + if (this.activePendingProcessTask) { + await this.activePendingProcessTask; + } + + await this.missingMessageRetriever?.stop(); + + await this.queryOnConnect?.stop(); + + this.retryManager?.stopAllRetries(); + + await this.unsubscribe(); + + this.removeAllEventListeners(); + + log.info("ReliableChannel stopped successfully"); } private assertStarted(): void { @@ -509,12 +555,16 @@ export class ReliableChannel< } private stopSweepIncomingBufferLoop(): void { - if (this.sweepInBufInterval) clearInterval(this.sweepInBufInterval); + if (this.sweepInBufInterval) { + clearInterval(this.sweepInBufInterval); + this.sweepInBufInterval = undefined; + } } private restartSync(multiplier: number = 1): void { if (this.syncTimeout) { clearTimeout(this.syncTimeout); + this.syncTimeout = undefined; } if (this.syncMinIntervalMs) { const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier; @@ -531,6 +581,7 @@ export class ReliableChannel< private stopSync(): void { if (this.syncTimeout) { clearTimeout(this.syncTimeout); + this.syncTimeout = undefined; } } @@ -595,8 +646,19 @@ export class ReliableChannel< return sdsMessage.causalHistory && sdsMessage.causalHistory.length > 0; } + private addTrackedEventListener( + eventName: K, + listener: (event: MessageChannelEvents[K]) => void + ): void { + this.messageChannel.addEventListener(eventName, listener as any); + + this.eventListenerCleanups.push(() => { + this.messageChannel.removeEventListener(eventName, listener as any); + }); + } + private setupEventListeners(): void { - this.messageChannel.addEventListener( + this.addTrackedEventListener( MessageChannelEvent.OutMessageSent, (event) => { if (event.detail.content) { @@ -608,7 +670,7 @@ export class ReliableChannel< } ); - this.messageChannel.addEventListener( + this.addTrackedEventListener( MessageChannelEvent.OutMessageAcknowledged, (event) => { if (event.detail) { @@ -622,7 +684,7 @@ export class ReliableChannel< } ); - this.messageChannel.addEventListener( + this.addTrackedEventListener( MessageChannelEvent.OutMessagePossiblyAcknowledged, (event) => { if (event.detail) { @@ -636,7 +698,7 @@ export class ReliableChannel< } ); - this.messageChannel.addEventListener( + this.addTrackedEventListener( MessageChannelEvent.InSyncReceived, (_event) => { // restart the timeout when a sync message has been received @@ -644,7 +706,7 @@ export class ReliableChannel< } ); - this.messageChannel.addEventListener( + this.addTrackedEventListener( MessageChannelEvent.InMessageReceived, (event) => { // restart the timeout when a content message has been received @@ -655,7 +717,7 @@ export class ReliableChannel< } ); - this.messageChannel.addEventListener( + this.addTrackedEventListener( MessageChannelEvent.OutMessageSent, (event) => { // restart the timeout when a content message has been sent @@ -665,7 +727,7 @@ export class ReliableChannel< } ); - this.messageChannel.addEventListener( + this.addTrackedEventListener( MessageChannelEvent.InMessageMissing, (event) => { for (const { messageId, retrievalHint } of event.detail) { @@ -680,12 +742,32 @@ export class ReliableChannel< ); if (this.queryOnConnect) { + const queryListener = (event: any): void => { + void this.processIncomingMessages(event.detail); + }; + this.queryOnConnect.addEventListener( QueryOnConnectEvent.MessagesRetrieved, - (event) => { - void this.processIncomingMessages(event.detail); - } + queryListener ); + + this.eventListenerCleanups.push(() => { + this.queryOnConnect?.removeEventListener( + QueryOnConnectEvent.MessagesRetrieved, + queryListener + ); + }); } } + + private removeAllEventListeners(): void { + for (const cleanup of this.eventListenerCleanups) { + try { + cleanup(); + } catch (error) { + log.error("error removing event listener:", error); + } + } + this.eventListenerCleanups = []; + } } diff --git a/packages/sdk/src/reliable_channel/retry_manager.ts b/packages/sdk/src/reliable_channel/retry_manager.ts index 00426fd854..199b0fa80d 100644 --- a/packages/sdk/src/reliable_channel/retry_manager.ts +++ b/packages/sdk/src/reliable_channel/retry_manager.ts @@ -24,9 +24,17 @@ export class RetryManager { const timeout = this.timeouts.get(id); if (timeout) { clearTimeout(timeout); + this.timeouts.delete(id); } } + public stopAllRetries(): void { + for (const [_id, timeout] of this.timeouts.entries()) { + clearTimeout(timeout); + } + this.timeouts.clear(); + } + public startRetries(id: string, retry: () => void | Promise): void { this.retry(id, retry, 0); } @@ -36,7 +44,7 @@ export class RetryManager { retry: () => void | Promise, attemptNumber: number ): void { - clearTimeout(this.timeouts.get(id)); + this.stopRetries(id); if (attemptNumber < this.maxRetryNumber) { const interval = setTimeout(() => { void retry(); diff --git a/packages/sdk/src/store/store.ts b/packages/sdk/src/store/store.ts index 2165005899..cf7921b54c 100644 --- a/packages/sdk/src/store/store.ts +++ b/packages/sdk/src/store/store.ts @@ -46,6 +46,10 @@ export class Store implements IStore { return this.protocol.multicodec; } + public stop(): void { + this.protocol.stop(); + } + /** * Queries the Waku Store for historical messages using the provided decoders and options. * Returns an asynchronous generator that yields promises of decoded messages. diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 7336b06df7..5cc0be30ce 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -232,7 +232,9 @@ export class WakuNode implements IWaku { this._nodeStateLock = true; this.lightPush?.stop(); + this.store?.stop(); await this.filter?.stop(); + await this.relay?.stop(); this.healthIndicator.stop(); this.peerManager.stop(); this.connectionManager.stop();