fix: improve MissingMessageRetriever cleanup with abort signal

This commit is contained in:
Levente Kiss 2025-10-31 18:38:58 +13:00
parent a743ca41bc
commit 404d590a86

View File

@ -13,6 +13,8 @@ const DEFAULT_RETRIEVE_FREQUENCY_MS = 10 * 1000; // 10 seconds
export class MissingMessageRetriever<T extends IDecodedMessage> {
private retrieveInterval: ReturnType<typeof setInterval> | undefined;
private missingMessages: Map<MessageId, Uint8Array<ArrayBufferLike>>; // Waku Message Ids
private activeQueryPromise: Promise<void> | undefined;
private abortController?: AbortController;
public constructor(
private readonly decoder: IDecoder<T>,
@ -29,7 +31,11 @@ export class MissingMessageRetriever<T extends IDecodedMessage> {
public start(): void {
if (this.retrieveInterval) {
clearInterval(this.retrieveInterval);
this.retrieveInterval = undefined;
}
this.abortController = new AbortController();
if (this.retrieveFrequencyMs !== 0) {
log.info(`start retrieve loop every ${this.retrieveFrequencyMs}ms`);
this.retrieveInterval = setInterval(() => {
@ -38,10 +44,30 @@ export class MissingMessageRetriever<T extends IDecodedMessage> {
}
}
public stop(): void {
public async stop(): Promise<void> {
log.info("Stopping MissingMessageRetriever...");
if (this.retrieveInterval) {
clearInterval(this.retrieveInterval);
this.retrieveInterval = undefined;
}
if (this.abortController) {
this.abortController.abort();
this.abortController = undefined;
}
if (this.activeQueryPromise) {
log.info("Waiting for active query to complete...");
try {
await this.activeQueryPromise;
} catch (error) {
log.warn("Active query failed during stop:", error);
}
}
this.missingMessages.clear();
log.info("MissingMessageRetriever stopped");
}
public addMissingMessage(
@ -64,15 +90,30 @@ export class MissingMessageRetriever<T extends IDecodedMessage> {
if (this.missingMessages.size) {
const messageHashes = Array.from(this.missingMessages.values());
log.info("attempting to retrieve missing message", messageHashes.length);
for await (const page of this._retrieve([this.decoder], {
messageHashes
})) {
for await (const msg of page) {
if (msg && this.onMessageRetrieved) {
await this.onMessageRetrieved(msg);
this.activeQueryPromise = (async () => {
try {
for await (const page of this._retrieve([this.decoder], {
messageHashes,
abortSignal: this.abortController?.signal
})) {
for await (const msg of page) {
if (msg && this.onMessageRetrieved) {
await this.onMessageRetrieved(msg);
}
}
}
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
log.info("Store query aborted");
return;
}
log.error("Store query failed:", error);
}
}
})();
await this.activeQueryPromise;
this.activeQueryPromise = undefined;
}
}
}