fix: cleanup routines on reliable channel and core protocols (#2733)

* fix: add stop methods to protocols to prevent event listener leaks

* fix: add abort signal support for graceful store query cancellation

* fix: call protocol stop methods in WakuNode.stop()

* fix: improve QueryOnConnect cleanup and abort signal handling

* fix: improve MissingMessageRetriever cleanup with abort signal

* fix: add stopAllRetries method to RetryManager for proper cleanup

* fix: implement comprehensive ReliableChannel stop() with proper cleanup

* fix: add active query tracking to QueryOnConnect and await its stop()

* fix: add stop() to IRelayAPI and IStore interfaces, implement in SDK wrappers

* align with usual naming (isStarted)

* remove unnecessary `await`

* test: `stop()` is now async

* chore: use more concise syntax

---------

Co-authored-by: Levente Kiss <levente.kiss@solarpunk.buzz>
This commit is contained in:
fryorcraken 2025-11-13 12:32:15 +11:00 committed by GitHub
parent 049e564e89
commit 84a6ea69cf
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
15 changed files with 312 additions and 66 deletions

View File

@ -61,6 +61,7 @@ export class FilterCore {
}
public async stop(): Promise<void> {
this.streamManager.stop();
try {
await this.libp2p.unhandle(FilterCodecs.PUSH);
} catch (e) {

View File

@ -33,6 +33,11 @@ export class LightPushCore {
this.streamManager = new StreamManager(CODECS.v3, libp2p.components);
}
public stop(): void {
this.streamManager.stop();
this.streamManagerV2.stop();
}
public async send(
encoder: IEncoder,
message: IMessage,

View File

@ -35,6 +35,10 @@ export class StoreCore {
this.streamManager = new StreamManager(StoreCodec, libp2p.components);
}
public stop(): void {
this.streamManager.stop();
}
public get maxTimeLimit(): number {
return MAX_TIME_RANGE;
}
@ -68,6 +72,11 @@ export class StoreCore {
let currentCursor = queryOpts.paginationCursor;
while (true) {
if (queryOpts.abortSignal?.aborted) {
log.info("Store query aborted by signal");
break;
}
const storeQueryRequest = StoreQueryRequest.create({
...queryOpts,
paginationCursor: currentCursor
@ -89,13 +98,22 @@ export class StoreCore {
break;
}
const res = await pipe(
let res;
try {
res = await pipe(
[storeQueryRequest.encode()],
lp.encode,
stream,
lp.decode,
async (source) => await all(source)
);
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
log.info(`Store query aborted for peer ${peerId.toString()}`);
break;
}
throw error;
}
const bytes = new Uint8ArrayList();
res.forEach((chunk) => {
@ -122,6 +140,11 @@ export class StoreCore {
`${storeQueryResponse.messages.length} messages retrieved from store`
);
if (queryOpts.abortSignal?.aborted) {
log.info("Store query aborted by signal before processing messages");
break;
}
const decodedMessages = storeQueryResponse.messages.map((protoMsg) => {
if (!protoMsg.message) {
return Promise.resolve(undefined);

View File

@ -23,6 +23,15 @@ export class StreamManager {
);
}
public stop(): void {
this.libp2p.events.removeEventListener(
"peer:update",
this.handlePeerUpdateStreamPool
);
this.streamPool.clear();
this.ongoingCreation.clear();
}
public async getStream(peerId: PeerId): Promise<Stream | undefined> {
try {
const peerIdStr = peerId.toString();

View File

@ -16,6 +16,7 @@ export interface IRelayAPI {
readonly pubsubTopics: Set<PubsubTopic>;
readonly gossipSub: GossipSub;
start: () => Promise<void>;
stop: () => Promise<void>;
waitForPeers: () => Promise<void>;
getMeshPeers: (topic?: TopicStr) => PeerIdStr[];
}

View File

@ -88,11 +88,18 @@ export type QueryRequestParams = {
* Only use if you know what you are doing.
*/
peerId?: PeerId;
/**
* An optional AbortSignal to cancel the query.
* When the signal is aborted, the query will stop processing and return early.
*/
abortSignal?: AbortSignal;
};
export type IStore = {
readonly multicodec: string;
stop(): void;
createCursor(message: IDecodedMessage): StoreCursor;
queryGenerator: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],

View File

@ -67,6 +67,10 @@ export class Relay implements IRelay {
* Observers under key `""` are always called.
*/
private observers: Map<PubsubTopic, Map<ContentTopic, Set<unknown>>>;
private messageEventHandlers: Map<
PubsubTopic,
(event: CustomEvent<GossipsubMessage>) => void
> = new Map();
public constructor(params: RelayConstructorParams) {
if (!this.isRelayPubsub(params.libp2p.services.pubsub)) {
@ -105,6 +109,19 @@ export class Relay implements IRelay {
this.subscribeToAllTopics();
}
public async stop(): Promise<void> {
for (const pubsubTopic of this.pubsubTopics) {
const handler = this.messageEventHandlers.get(pubsubTopic);
if (handler) {
this.gossipSub.removeEventListener("gossipsub:message", handler);
}
this.gossipSub.topicValidators.delete(pubsubTopic);
this.gossipSub.unsubscribe(pubsubTopic);
}
this.messageEventHandlers.clear();
this.observers.clear();
}
/**
* Wait for at least one peer with the given protocol to be connected and in the gossipsub
* mesh for all pubsubTopics.
@ -299,17 +316,17 @@ export class Relay implements IRelay {
* @override
*/
private gossipSubSubscribe(pubsubTopic: string): void {
this.gossipSub.addEventListener(
"gossipsub:message",
(event: CustomEvent<GossipsubMessage>) => {
const handler = (event: CustomEvent<GossipsubMessage>): void => {
if (event.detail.msg.topic !== pubsubTopic) return;
this.processIncomingMessage(
event.detail.msg.topic,
event.detail.msg.data
).catch((e) => log.error("Failed to process incoming message", e));
}
);
};
this.messageEventHandlers.set(pubsubTopic, handler);
this.gossipSub.addEventListener("gossipsub:message", handler);
this.gossipSub.topicValidators.set(pubsubTopic, messageValidator);
this.gossipSub.subscribe(pubsubTopic);

View File

@ -65,6 +65,7 @@ export class LightPush implements ILightPush {
public stop(): void {
this.retryManager.stop();
this.protocol.stop();
}
public async send(

View File

@ -158,14 +158,14 @@ describe("QueryOnConnect", () => {
expect(wakuEventSpy.calledWith(WakuEvent.Health)).to.be.true;
});
it("should remove event listeners when stopped", () => {
it("should remove event listeners when stopped", async () => {
const peerRemoveSpy =
mockPeerManagerEventEmitter.removeEventListener as sinon.SinonSpy;
const wakuRemoveSpy =
mockWakuEventEmitter.removeEventListener as sinon.SinonSpy;
queryOnConnect.start();
queryOnConnect.stop();
await queryOnConnect.stop();
expect(peerRemoveSpy.calledWith(PeerManagerEventNames.StoreConnect)).to.be
.true;

View File

@ -52,6 +52,13 @@ export class QueryOnConnect<
private lastTimeOffline: number;
private readonly forceQueryThresholdMs: number;
private isStarted: boolean = false;
private abortController?: AbortController;
private activeQueryPromise?: Promise<void>;
private boundStoreConnectHandler?: (event: CustomEvent<PeerId>) => void;
private boundHealthHandler?: (event: CustomEvent<HealthStatus>) => void;
public constructor(
public decoders: IDecoder<T>[],
public stopIfTrue: (msg: T) => boolean,
@ -71,11 +78,37 @@ export class QueryOnConnect<
}
public start(): void {
if (this.isStarted) {
log.warn("QueryOnConnect already running");
return;
}
log.info("starting query-on-connect service");
this.isStarted = true;
this.abortController = new AbortController();
this.setupEventListeners();
}
public stop(): void {
public async stop(): Promise<void> {
if (!this.isStarted) {
return;
}
log.info("stopping query-on-connect service");
this.isStarted = false;
if (this.abortController) {
this.abortController.abort();
this.abortController = undefined;
}
if (this.activeQueryPromise) {
log.info("Waiting for active query to complete...");
try {
await this.activeQueryPromise;
} catch (error) {
log.warn("Active query failed during stop:", error);
}
}
this.unsetEventListeners();
}
@ -107,7 +140,10 @@ export class QueryOnConnect<
this.lastTimeOffline > this.lastSuccessfulQuery ||
timeSinceLastQuery > this.forceQueryThresholdMs
) {
await this.query(peerId);
this.activeQueryPromise = this.query(peerId).finally(() => {
this.activeQueryPromise = undefined;
});
await this.activeQueryPromise;
} else {
log.info(`no querying`);
}
@ -120,7 +156,8 @@ export class QueryOnConnect<
for await (const page of this._queryGenerator(this.decoders, {
timeStart,
timeEnd,
peerId
peerId,
abortSignal: this.abortController?.signal
})) {
// Await for decoding
const messages = (await Promise.all(page)).filter(
@ -166,33 +203,41 @@ export class QueryOnConnect<
}
private setupEventListeners(): void {
this.peerManagerEventEmitter.addEventListener(
PeerManagerEventNames.StoreConnect,
(event) =>
this.boundStoreConnectHandler = (event: CustomEvent<PeerId>) => {
void this.maybeQuery(event.detail).catch((err) =>
log.error("query-on-connect error", err)
)
);
};
this.boundHealthHandler = this.updateLastOfflineDate.bind(this);
this.peerManagerEventEmitter.addEventListener(
PeerManagerEventNames.StoreConnect,
this.boundStoreConnectHandler
);
this.wakuEventEmitter.addEventListener(
WakuEvent.Health,
this.updateLastOfflineDate.bind(this)
this.boundHealthHandler
);
}
private unsetEventListeners(): void {
if (this.boundStoreConnectHandler) {
this.peerManagerEventEmitter.removeEventListener(
PeerManagerEventNames.StoreConnect,
(event) =>
void this.maybeQuery(event.detail).catch((err) =>
log.error("query-on-connect error", err)
)
this.boundStoreConnectHandler
);
this.boundStoreConnectHandler = undefined;
}
if (this.boundHealthHandler) {
this.wakuEventEmitter.removeEventListener(
WakuEvent.Health,
this.updateLastOfflineDate.bind(this)
this.boundHealthHandler
);
this.boundHealthHandler = undefined;
}
}
private updateLastOfflineDate(event: CustomEvent<HealthStatus>): void {

View File

@ -13,6 +13,8 @@ const DEFAULT_RETRIEVE_FREQUENCY_MS = 10 * 1000; // 10 seconds
export class MissingMessageRetriever<T extends IDecodedMessage> {
private retrieveInterval: ReturnType<typeof setInterval> | undefined;
private missingMessages: Map<MessageId, Uint8Array<ArrayBufferLike>>; // Waku Message Ids
private activeQueryPromise: Promise<void> | undefined;
private abortController?: AbortController;
public constructor(
private readonly decoder: IDecoder<T>,
@ -29,7 +31,11 @@ export class MissingMessageRetriever<T extends IDecodedMessage> {
public start(): void {
if (this.retrieveInterval) {
clearInterval(this.retrieveInterval);
this.retrieveInterval = undefined;
}
this.abortController = new AbortController();
if (this.retrieveFrequencyMs !== 0) {
log.info(`start retrieve loop every ${this.retrieveFrequencyMs}ms`);
this.retrieveInterval = setInterval(() => {
@ -38,10 +44,30 @@ export class MissingMessageRetriever<T extends IDecodedMessage> {
}
}
public stop(): void {
public async stop(): Promise<void> {
log.info("Stopping MissingMessageRetriever...");
if (this.retrieveInterval) {
clearInterval(this.retrieveInterval);
this.retrieveInterval = undefined;
}
if (this.abortController) {
this.abortController.abort();
this.abortController = undefined;
}
if (this.activeQueryPromise) {
log.info("Waiting for active query to complete...");
try {
await this.activeQueryPromise;
} catch (error) {
log.warn("Active query failed during stop:", error);
}
}
this.missingMessages.clear();
log.info("MissingMessageRetriever stopped");
}
public addMissingMessage(
@ -64,8 +90,12 @@ export class MissingMessageRetriever<T extends IDecodedMessage> {
if (this.missingMessages.size) {
const messageHashes = Array.from(this.missingMessages.values());
log.info("attempting to retrieve missing message", messageHashes.length);
this.activeQueryPromise = (async () => {
try {
for await (const page of this._retrieve([this.decoder], {
messageHashes
messageHashes,
abortSignal: this.abortController?.signal
})) {
for await (const msg of page) {
if (msg && this.onMessageRetrieved) {
@ -73,6 +103,17 @@ export class MissingMessageRetriever<T extends IDecodedMessage> {
}
}
}
} catch (error) {
if (error instanceof Error && error.name === "AbortError") {
log.info("Store query aborted");
return;
}
log.error("Store query failed:", error);
}
})();
await this.activeQueryPromise;
this.activeQueryPromise = undefined;
}
}
}

View File

@ -17,6 +17,7 @@ import {
isContentMessage,
MessageChannel,
MessageChannelEvent,
MessageChannelEvents,
type MessageChannelOptions,
Message as SdsMessage,
type SenderId,
@ -136,11 +137,16 @@ export class ReliableChannel<
callback: Callback<T>
) => Promise<boolean>;
private readonly _unsubscribe?: (
decoders: IDecoder<T> | IDecoder<T>[]
) => Promise<boolean>;
private readonly _retrieve?: <T extends IDecodedMessage>(
decoders: IDecoder<T>[],
options?: Partial<QueryRequestParams>
) => AsyncGenerator<Promise<T | undefined>[]>;
private eventListenerCleanups: Array<() => void> = [];
private readonly syncMinIntervalMs: number;
private syncTimeout: ReturnType<typeof setTimeout> | undefined;
private sweepInBufInterval: ReturnType<typeof setInterval> | undefined;
@ -151,6 +157,7 @@ export class ReliableChannel<
private readonly queryOnConnect?: QueryOnConnect<T>;
private readonly processTaskMinElapseMs: number;
private _started: boolean;
private activePendingProcessTask?: Promise<void>;
private constructor(
public node: IWaku,
@ -170,6 +177,7 @@ export class ReliableChannel<
if (node.filter) {
this._subscribe = node.filter.subscribe.bind(node.filter);
this._unsubscribe = node.filter.unsubscribe.bind(node.filter);
} else if (node.relay) {
// TODO: Why do relay and filter have different interfaces?
// this._subscribe = node.relay.subscribeWithUnsubscribe;
@ -384,10 +392,21 @@ export class ReliableChannel<
private async subscribe(): Promise<boolean> {
this.assertStarted();
return this._subscribe(this.decoder, async (message: T) => {
if (!this._started) {
log.info("ReliableChannel stopped, ignoring incoming message");
return;
}
await this.processIncomingMessage(message);
});
}
private async unsubscribe(): Promise<boolean> {
if (!this._unsubscribe) {
throw Error("No unsubscribe method available");
}
return this._unsubscribe(this.decoder);
}
/**
* Don't forget to call `this.messageChannel.sweepIncomingBuffer();` once done.
* @param msg
@ -458,11 +477,18 @@ export class ReliableChannel<
// TODO: For now we only queue process tasks for incoming messages
// As this is where there is most volume
private queueProcessTasks(): void {
if (!this._started) return;
// If one is already queued, then we can ignore it
if (this.processTaskTimeout === undefined) {
this.processTaskTimeout = setTimeout(() => {
void this.messageChannel.processTasks().catch((err) => {
this.activePendingProcessTask = this.messageChannel
.processTasks()
.catch((err) => {
log.error("error encountered when processing sds tasks", err);
})
.finally(() => {
this.activePendingProcessTask = undefined;
});
// Clear timeout once triggered
@ -485,15 +511,35 @@ export class ReliableChannel<
return this.subscribe();
}
public stop(): void {
public async stop(): Promise<void> {
if (!this._started) return;
log.info("Stopping ReliableChannel...");
this._started = false;
this.stopSync();
this.stopSweepIncomingBufferLoop();
this.missingMessageRetriever?.stop();
this.queryOnConnect?.stop();
// TODO unsubscribe
// TODO unsetMessageListeners
if (this.processTaskTimeout) {
clearTimeout(this.processTaskTimeout);
this.processTaskTimeout = undefined;
}
if (this.activePendingProcessTask) {
await this.activePendingProcessTask;
}
await this.missingMessageRetriever?.stop();
await this.queryOnConnect?.stop();
this.retryManager?.stopAllRetries();
await this.unsubscribe();
this.removeAllEventListeners();
log.info("ReliableChannel stopped successfully");
}
private assertStarted(): void {
@ -509,12 +555,16 @@ export class ReliableChannel<
}
private stopSweepIncomingBufferLoop(): void {
if (this.sweepInBufInterval) clearInterval(this.sweepInBufInterval);
if (this.sweepInBufInterval) {
clearInterval(this.sweepInBufInterval);
this.sweepInBufInterval = undefined;
}
}
private restartSync(multiplier: number = 1): void {
if (this.syncTimeout) {
clearTimeout(this.syncTimeout);
this.syncTimeout = undefined;
}
if (this.syncMinIntervalMs) {
const timeoutMs = this.random() * this.syncMinIntervalMs * multiplier;
@ -531,6 +581,7 @@ export class ReliableChannel<
private stopSync(): void {
if (this.syncTimeout) {
clearTimeout(this.syncTimeout);
this.syncTimeout = undefined;
}
}
@ -595,8 +646,19 @@ export class ReliableChannel<
return sdsMessage.causalHistory && sdsMessage.causalHistory.length > 0;
}
private addTrackedEventListener<K extends keyof MessageChannelEvents>(
eventName: K,
listener: (event: MessageChannelEvents[K]) => void
): void {
this.messageChannel.addEventListener(eventName, listener as any);
this.eventListenerCleanups.push(() => {
this.messageChannel.removeEventListener(eventName, listener as any);
});
}
private setupEventListeners(): void {
this.messageChannel.addEventListener(
this.addTrackedEventListener(
MessageChannelEvent.OutMessageSent,
(event) => {
if (event.detail.content) {
@ -608,7 +670,7 @@ export class ReliableChannel<
}
);
this.messageChannel.addEventListener(
this.addTrackedEventListener(
MessageChannelEvent.OutMessageAcknowledged,
(event) => {
if (event.detail) {
@ -622,7 +684,7 @@ export class ReliableChannel<
}
);
this.messageChannel.addEventListener(
this.addTrackedEventListener(
MessageChannelEvent.OutMessagePossiblyAcknowledged,
(event) => {
if (event.detail) {
@ -636,7 +698,7 @@ export class ReliableChannel<
}
);
this.messageChannel.addEventListener(
this.addTrackedEventListener(
MessageChannelEvent.InSyncReceived,
(_event) => {
// restart the timeout when a sync message has been received
@ -644,7 +706,7 @@ export class ReliableChannel<
}
);
this.messageChannel.addEventListener(
this.addTrackedEventListener(
MessageChannelEvent.InMessageReceived,
(event) => {
// restart the timeout when a content message has been received
@ -655,7 +717,7 @@ export class ReliableChannel<
}
);
this.messageChannel.addEventListener(
this.addTrackedEventListener(
MessageChannelEvent.OutMessageSent,
(event) => {
// restart the timeout when a content message has been sent
@ -665,7 +727,7 @@ export class ReliableChannel<
}
);
this.messageChannel.addEventListener(
this.addTrackedEventListener(
MessageChannelEvent.InMessageMissing,
(event) => {
for (const { messageId, retrievalHint } of event.detail) {
@ -680,12 +742,32 @@ export class ReliableChannel<
);
if (this.queryOnConnect) {
const queryListener = (event: any): void => {
void this.processIncomingMessages(event.detail);
};
this.queryOnConnect.addEventListener(
QueryOnConnectEvent.MessagesRetrieved,
(event) => {
void this.processIncomingMessages(event.detail);
}
queryListener
);
this.eventListenerCleanups.push(() => {
this.queryOnConnect?.removeEventListener(
QueryOnConnectEvent.MessagesRetrieved,
queryListener
);
});
}
}
private removeAllEventListeners(): void {
for (const cleanup of this.eventListenerCleanups) {
try {
cleanup();
} catch (error) {
log.error("error removing event listener:", error);
}
}
this.eventListenerCleanups = [];
}
}

View File

@ -24,9 +24,17 @@ export class RetryManager {
const timeout = this.timeouts.get(id);
if (timeout) {
clearTimeout(timeout);
this.timeouts.delete(id);
}
}
public stopAllRetries(): void {
for (const [_id, timeout] of this.timeouts.entries()) {
clearTimeout(timeout);
}
this.timeouts.clear();
}
public startRetries(id: string, retry: () => void | Promise<void>): void {
this.retry(id, retry, 0);
}
@ -36,7 +44,7 @@ export class RetryManager {
retry: () => void | Promise<void>,
attemptNumber: number
): void {
clearTimeout(this.timeouts.get(id));
this.stopRetries(id);
if (attemptNumber < this.maxRetryNumber) {
const interval = setTimeout(() => {
void retry();

View File

@ -46,6 +46,10 @@ export class Store implements IStore {
return this.protocol.multicodec;
}
public stop(): void {
this.protocol.stop();
}
/**
* Queries the Waku Store for historical messages using the provided decoders and options.
* Returns an asynchronous generator that yields promises of decoded messages.

View File

@ -232,7 +232,9 @@ export class WakuNode implements IWaku {
this._nodeStateLock = true;
this.lightPush?.stop();
this.store?.stop();
await this.filter?.stop();
await this.relay?.stop();
this.healthIndicator.stop();
this.peerManager.stop();
this.connectionManager.stop();