diff --git a/packages/sdk/src/query_on_connect/query_on_connect.ts b/packages/sdk/src/query_on_connect/query_on_connect.ts index da9e78a763..2fc16e9a4b 100644 --- a/packages/sdk/src/query_on_connect/query_on_connect.ts +++ b/packages/sdk/src/query_on_connect/query_on_connect.ts @@ -52,6 +52,12 @@ export class QueryOnConnect< private lastTimeOffline: number; private readonly forceQueryThresholdMs: number; + private isRunning: boolean = false; + private abortController?: AbortController; + + private boundStoreConnectHandler?: (event: CustomEvent) => void; + private boundHealthHandler?: (event: CustomEvent) => void; + public constructor( public decoders: IDecoder[], public stopIfTrue: (msg: T) => boolean, @@ -71,11 +77,28 @@ export class QueryOnConnect< } public start(): void { + if (this.isRunning) { + log.warn("QueryOnConnect already running"); + return; + } log.info("starting query-on-connect service"); + this.isRunning = true; + this.abortController = new AbortController(); this.setupEventListeners(); } public stop(): void { + if (!this.isRunning) { + return; + } + log.info("stopping query-on-connect service"); + this.isRunning = false; + + if (this.abortController) { + this.abortController.abort(); + this.abortController = undefined; + } + this.unsetEventListeners(); } @@ -120,7 +143,8 @@ export class QueryOnConnect< for await (const page of this._queryGenerator(this.decoders, { timeStart, timeEnd, - peerId + peerId, + abortSignal: this.abortController?.signal })) { // Await for decoding const messages = (await Promise.all(page)).filter( @@ -166,33 +190,41 @@ export class QueryOnConnect< } private setupEventListeners(): void { + this.boundStoreConnectHandler = (event: CustomEvent) => { + void this.maybeQuery(event.detail).catch((err) => + log.error("query-on-connect error", err) + ); + }; + + this.boundHealthHandler = this.updateLastOfflineDate.bind(this); + this.peerManagerEventEmitter.addEventListener( PeerManagerEventNames.StoreConnect, - (event) => - void this.maybeQuery(event.detail).catch((err) => - log.error("query-on-connect error", err) - ) + this.boundStoreConnectHandler ); this.wakuEventEmitter.addEventListener( WakuEvent.Health, - this.updateLastOfflineDate.bind(this) + this.boundHealthHandler ); } private unsetEventListeners(): void { - this.peerManagerEventEmitter.removeEventListener( - PeerManagerEventNames.StoreConnect, - (event) => - void this.maybeQuery(event.detail).catch((err) => - log.error("query-on-connect error", err) - ) - ); + if (this.boundStoreConnectHandler) { + this.peerManagerEventEmitter.removeEventListener( + PeerManagerEventNames.StoreConnect, + this.boundStoreConnectHandler + ); + this.boundStoreConnectHandler = undefined; + } - this.wakuEventEmitter.removeEventListener( - WakuEvent.Health, - this.updateLastOfflineDate.bind(this) - ); + if (this.boundHealthHandler) { + this.wakuEventEmitter.removeEventListener( + WakuEvent.Health, + this.boundHealthHandler + ); + this.boundHealthHandler = undefined; + } } private updateLastOfflineDate(event: CustomEvent): void {