mirror of
https://github.com/logos-messaging/js-waku.git
synced 2026-01-11 02:03:10 +00:00
fix: improve QueryOnConnect cleanup and abort signal handling
This commit is contained in:
parent
705ca38bf3
commit
a743ca41bc
@ -52,6 +52,12 @@ export class QueryOnConnect<
|
||||
private lastTimeOffline: number;
|
||||
private readonly forceQueryThresholdMs: number;
|
||||
|
||||
private isRunning: boolean = false;
|
||||
private abortController?: AbortController;
|
||||
|
||||
private boundStoreConnectHandler?: (event: CustomEvent<PeerId>) => void;
|
||||
private boundHealthHandler?: (event: CustomEvent<HealthStatus>) => void;
|
||||
|
||||
public constructor(
|
||||
public decoders: IDecoder<T>[],
|
||||
public stopIfTrue: (msg: T) => boolean,
|
||||
@ -71,11 +77,28 @@ export class QueryOnConnect<
|
||||
}
|
||||
|
||||
public start(): void {
|
||||
if (this.isRunning) {
|
||||
log.warn("QueryOnConnect already running");
|
||||
return;
|
||||
}
|
||||
log.info("starting query-on-connect service");
|
||||
this.isRunning = true;
|
||||
this.abortController = new AbortController();
|
||||
this.setupEventListeners();
|
||||
}
|
||||
|
||||
public stop(): void {
|
||||
if (!this.isRunning) {
|
||||
return;
|
||||
}
|
||||
log.info("stopping query-on-connect service");
|
||||
this.isRunning = false;
|
||||
|
||||
if (this.abortController) {
|
||||
this.abortController.abort();
|
||||
this.abortController = undefined;
|
||||
}
|
||||
|
||||
this.unsetEventListeners();
|
||||
}
|
||||
|
||||
@ -120,7 +143,8 @@ export class QueryOnConnect<
|
||||
for await (const page of this._queryGenerator(this.decoders, {
|
||||
timeStart,
|
||||
timeEnd,
|
||||
peerId
|
||||
peerId,
|
||||
abortSignal: this.abortController?.signal
|
||||
})) {
|
||||
// Await for decoding
|
||||
const messages = (await Promise.all(page)).filter(
|
||||
@ -166,33 +190,41 @@ export class QueryOnConnect<
|
||||
}
|
||||
|
||||
private setupEventListeners(): void {
|
||||
this.boundStoreConnectHandler = (event: CustomEvent<PeerId>) => {
|
||||
void this.maybeQuery(event.detail).catch((err) =>
|
||||
log.error("query-on-connect error", err)
|
||||
);
|
||||
};
|
||||
|
||||
this.boundHealthHandler = this.updateLastOfflineDate.bind(this);
|
||||
|
||||
this.peerManagerEventEmitter.addEventListener(
|
||||
PeerManagerEventNames.StoreConnect,
|
||||
(event) =>
|
||||
void this.maybeQuery(event.detail).catch((err) =>
|
||||
log.error("query-on-connect error", err)
|
||||
)
|
||||
this.boundStoreConnectHandler
|
||||
);
|
||||
|
||||
this.wakuEventEmitter.addEventListener(
|
||||
WakuEvent.Health,
|
||||
this.updateLastOfflineDate.bind(this)
|
||||
this.boundHealthHandler
|
||||
);
|
||||
}
|
||||
|
||||
private unsetEventListeners(): void {
|
||||
this.peerManagerEventEmitter.removeEventListener(
|
||||
PeerManagerEventNames.StoreConnect,
|
||||
(event) =>
|
||||
void this.maybeQuery(event.detail).catch((err) =>
|
||||
log.error("query-on-connect error", err)
|
||||
)
|
||||
);
|
||||
if (this.boundStoreConnectHandler) {
|
||||
this.peerManagerEventEmitter.removeEventListener(
|
||||
PeerManagerEventNames.StoreConnect,
|
||||
this.boundStoreConnectHandler
|
||||
);
|
||||
this.boundStoreConnectHandler = undefined;
|
||||
}
|
||||
|
||||
this.wakuEventEmitter.removeEventListener(
|
||||
WakuEvent.Health,
|
||||
this.updateLastOfflineDate.bind(this)
|
||||
);
|
||||
if (this.boundHealthHandler) {
|
||||
this.wakuEventEmitter.removeEventListener(
|
||||
WakuEvent.Health,
|
||||
this.boundHealthHandler
|
||||
);
|
||||
this.boundHealthHandler = undefined;
|
||||
}
|
||||
}
|
||||
|
||||
private updateLastOfflineDate(event: CustomEvent<HealthStatus>): void {
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user