From 404d590a866101ec76787b476abd385e7d516710 Mon Sep 17 00:00:00 2001 From: Levente Kiss Date: Fri, 31 Oct 2025 18:38:58 +1300 Subject: [PATCH] fix: improve MissingMessageRetriever cleanup with abort signal --- .../missing_message_retriever.ts | 57 ++++++++++++++++--- 1 file changed, 49 insertions(+), 8 deletions(-) diff --git a/packages/sdk/src/reliable_channel/missing_message_retriever.ts b/packages/sdk/src/reliable_channel/missing_message_retriever.ts index f5f1cb3503..2f9e63217c 100644 --- a/packages/sdk/src/reliable_channel/missing_message_retriever.ts +++ b/packages/sdk/src/reliable_channel/missing_message_retriever.ts @@ -13,6 +13,8 @@ const DEFAULT_RETRIEVE_FREQUENCY_MS = 10 * 1000; // 10 seconds export class MissingMessageRetriever { private retrieveInterval: ReturnType | undefined; private missingMessages: Map>; // Waku Message Ids + private activeQueryPromise: Promise | undefined; + private abortController?: AbortController; public constructor( private readonly decoder: IDecoder, @@ -29,7 +31,11 @@ export class MissingMessageRetriever { public start(): void { if (this.retrieveInterval) { clearInterval(this.retrieveInterval); + this.retrieveInterval = undefined; } + + this.abortController = new AbortController(); + if (this.retrieveFrequencyMs !== 0) { log.info(`start retrieve loop every ${this.retrieveFrequencyMs}ms`); this.retrieveInterval = setInterval(() => { @@ -38,10 +44,30 @@ export class MissingMessageRetriever { } } - public stop(): void { + public async stop(): Promise { + log.info("Stopping MissingMessageRetriever..."); + if (this.retrieveInterval) { clearInterval(this.retrieveInterval); + this.retrieveInterval = undefined; } + + if (this.abortController) { + this.abortController.abort(); + this.abortController = undefined; + } + + if (this.activeQueryPromise) { + log.info("Waiting for active query to complete..."); + try { + await this.activeQueryPromise; + } catch (error) { + log.warn("Active query failed during stop:", error); + } + } + + this.missingMessages.clear(); + log.info("MissingMessageRetriever stopped"); } public addMissingMessage( @@ -64,15 +90,30 @@ export class MissingMessageRetriever { if (this.missingMessages.size) { const messageHashes = Array.from(this.missingMessages.values()); log.info("attempting to retrieve missing message", messageHashes.length); - for await (const page of this._retrieve([this.decoder], { - messageHashes - })) { - for await (const msg of page) { - if (msg && this.onMessageRetrieved) { - await this.onMessageRetrieved(msg); + + this.activeQueryPromise = (async () => { + try { + for await (const page of this._retrieve([this.decoder], { + messageHashes, + abortSignal: this.abortController?.signal + })) { + for await (const msg of page) { + if (msg && this.onMessageRetrieved) { + await this.onMessageRetrieved(msg); + } + } } + } catch (error) { + if (error instanceof Error && error.name === "AbortError") { + log.info("Store query aborted"); + return; + } + log.error("Store query failed:", error); } - } + })(); + + await this.activeQueryPromise; + this.activeQueryPromise = undefined; } } }