feat!: unify events under one source (#2473)

* move health indicator under waku.events and expose from Waku as a value

* update tests

* make new type for libp2p event handlers

* fix types
This commit is contained in:
Sasha 2025-07-15 00:59:45 +02:00 committed by GitHub
parent 14085de3c4
commit 27292edabc
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 128 additions and 124 deletions

View File

@ -3,6 +3,7 @@ import {
ConnectionManagerOptions,
IWakuEventEmitter,
Libp2p,
Libp2pEventHandler,
Tags
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
@ -12,8 +13,6 @@ import { NetworkMonitor } from "./network_monitor.js";
const log = new Logger("connection-limiter");
type Libp2pEventHandler<T> = (e: CustomEvent<T>) => void;
type ConnectionLimiterConstructorOptions = {
libp2p: Libp2p;
events: IWakuEventEmitter;

View File

@ -1,12 +1,10 @@
import { Peer, PeerId, PeerInfo } from "@libp2p/interface";
import { Libp2p, Peer, PeerId, PeerInfo } from "@libp2p/interface";
import { Multiaddr } from "@multiformats/multiaddr";
import { Libp2pEventHandler } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { Libp2p } from "libp2p";
import { Dialer } from "./dialer.js";
type Libp2pEventHandler<T> = (e: CustomEvent<T>) => void;
type DiscoveryDialerConstructorOptions = {
libp2p: Libp2p;
dialer: Dialer;

View File

@ -1,24 +0,0 @@
import { TypedEventEmitter } from "@libp2p/interface";
import { Libp2p } from "./libp2p.js";
export enum HealthStatusChangeEvents {
StatusChange = "health:change"
}
export enum HealthStatus {
Unhealthy = "Unhealthy",
MinimallyHealthy = "MinimallyHealthy",
SufficientlyHealthy = "SufficientlyHealthy"
}
export type HealthIndicatorEvents = {
[HealthStatusChangeEvents.StatusChange]: CustomEvent<HealthStatus>;
};
export interface IHealthIndicator
extends TypedEventEmitter<HealthIndicatorEvents> {}
export type HealthIndicatorParams = {
libp2p: Libp2p;
};

View File

@ -0,0 +1,16 @@
export enum HealthStatus {
/**
* No peer connections
*/
Unhealthy = "Unhealthy",
/**
* At least 1 peer supporting both Filter and LightPush protocols
*/
MinimallyHealthy = "MinimallyHealthy",
/**
* At least 2 peers supporting both Filter and LightPush protocols
*/
SufficientlyHealthy = "SufficientlyHealthy"
}

View File

@ -17,4 +17,4 @@ export * from "./metadata.js";
export * from "./constants.js";
export * from "./local_storage.js";
export * from "./sharding.js";
export * from "./health_indicator.js";
export * from "./health_status.js";

View File

@ -36,3 +36,5 @@ export type CreateLibp2pOptions = Libp2pOptions & {
*/
filterMultiaddrs?: boolean;
};
export type Libp2pEventHandler<T> = (e: CustomEvent<T>) => void;

View File

@ -7,7 +7,7 @@ import type {
import type { MultiaddrInput } from "@multiformats/multiaddr";
import type { IFilter } from "./filter.js";
import type { IHealthIndicator } from "./health_indicator.js";
import type { HealthStatus } from "./health_status.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPush } from "./light_push.js";
import { IDecodedMessage, IDecoder, IEncoder } from "./message.js";
@ -35,7 +35,27 @@ export type CreateEncoderParams = CreateDecoderParams & {
};
export interface IWakuEvents {
/**
* Emitted when a connection is established or lost.
*
* @example
* ```typescript
* waku.addEventListener("waku:connection", (event) => {
* console.log(event.detail); // true if connected, false if disconnected
* });
*/
"waku:connection": CustomEvent<boolean>;
/**
* Emitted when the health status changes.
*
* @example
* ```typescript
* waku.addEventListener("waku:health", (event) => {
* console.log(event.detail); // 'Unhealthy', 'MinimallyHealthy', or 'SufficientlyHealthy'
* });
*/
"waku:health": CustomEvent<HealthStatus>;
}
export type IWakuEventEmitter = TypedEventEmitter<IWakuEvents>;
@ -47,7 +67,19 @@ export interface IWaku {
filter?: IFilter;
lightPush?: ILightPush;
health: IHealthIndicator;
/**
* Emits events related to the Waku node.
* Those are:
* - "waku:connection"
* - "waku:health"
*
* @example
* ```typescript
* waku.events.addEventListener("waku:connection", (event) => {
* console.log(event.detail); // true if connected, false if disconnected
* });
* ```
*/
events: IWakuEventEmitter;
/**
@ -60,6 +92,19 @@ export interface IWaku {
*/
peerId: PeerId;
/**
* The health status can be one of three states:
* - Unhealthy: No peer connections
* - MinimallyHealthy: At least 1 peer supporting both Filter and LightPush protocols
* - SufficientlyHealthy: At least 2 peers supporting both Filter and LightPush protocols
*
* @example
* ```typescript
* console.log(waku.health); // 'Unhealthy'
* ```
*/
health: HealthStatus;
/**
* Returns a list of supported protocols.
*

View File

@ -1,10 +1,6 @@
import { Connection, Peer } from "@libp2p/interface";
import { FilterCodecs, LightPushCodec } from "@waku/core";
import {
HealthStatus,
HealthStatusChangeEvents,
Libp2p
} from "@waku/interfaces";
import { HealthStatus, IWakuEventEmitter, Libp2p } from "@waku/interfaces";
import { expect } from "chai";
import sinon from "sinon";
@ -12,11 +8,13 @@ import { HealthIndicator } from "./health_indicator.js";
describe("HealthIndicator", () => {
let libp2p: Libp2p;
let events: IWakuEventEmitter;
let healthIndicator: HealthIndicator;
beforeEach(() => {
libp2p = mockLibp2p();
healthIndicator = new HealthIndicator({ libp2p });
events = mockEvents();
healthIndicator = new HealthIndicator({ libp2p, events });
healthIndicator.start();
});
@ -26,14 +24,13 @@ describe("HealthIndicator", () => {
});
it("should initialize with Unhealthy status", () => {
expect(healthIndicator.toString()).to.equal(HealthStatus.Unhealthy);
expect(healthIndicator.toValue()).to.equal(HealthStatus.Unhealthy);
});
it("should transition to Unhealthy when no connections", async () => {
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
healthIndicator.addEventListener(
HealthStatusChangeEvents.StatusChange,
(e: CustomEvent<HealthStatus>) => resolve(e.detail)
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
@ -44,14 +41,13 @@ describe("HealthIndicator", () => {
const changedStatus = await statusChangePromise;
expect(changedStatus).to.equal(HealthStatus.Unhealthy);
expect(healthIndicator.toString()).to.equal(HealthStatus.Unhealthy);
expect(healthIndicator.toValue()).to.equal(HealthStatus.Unhealthy);
});
it("should transition to MinimallyHealthy with one compatible peer", async () => {
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
healthIndicator.addEventListener(
HealthStatusChangeEvents.StatusChange,
(e: CustomEvent<HealthStatus>) => resolve(e.detail)
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
@ -66,14 +62,13 @@ describe("HealthIndicator", () => {
const changedStatus = await statusChangePromise;
expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy);
expect(healthIndicator.toString()).to.equal(HealthStatus.MinimallyHealthy);
expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy);
});
it("should transition to SufficientlyHealthy with multiple compatible peers", async () => {
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
healthIndicator.addEventListener(
HealthStatusChangeEvents.StatusChange,
(e: CustomEvent<HealthStatus>) => resolve(e.detail)
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
@ -92,7 +87,7 @@ describe("HealthIndicator", () => {
const changedStatus = await statusChangePromise;
expect(changedStatus).to.equal(HealthStatus.SufficientlyHealthy);
expect(healthIndicator.toString()).to.equal(
expect(healthIndicator.toValue()).to.equal(
HealthStatus.SufficientlyHealthy
);
});
@ -135,6 +130,18 @@ function mockLibp2p(): Libp2p {
} as unknown as Libp2p;
}
function mockEvents(): IWakuEventEmitter {
const events = new EventTarget();
return {
addEventListener: (event: string, handler: EventListener) =>
events.addEventListener(event, handler),
removeEventListener: (event: string, handler: EventListener) =>
events.removeEventListener(event, handler),
dispatchEvent: (event: Event) => events.dispatchEvent(event)
} as unknown as IWakuEventEmitter;
}
function mockPeer(id: string, protocols: string[]): Peer {
return {
id,

View File

@ -1,66 +1,37 @@
import { TypedEventEmitter } from "@libp2p/interface";
import type { IdentifyResult, PeerId } from "@libp2p/interface";
import { FilterCodecs, LightPushCodec } from "@waku/core";
import {
HealthIndicatorEvents,
HealthIndicatorParams,
HealthStatus,
HealthStatusChangeEvents,
IHealthIndicator,
Libp2p
} from "@waku/interfaces";
import { HealthStatus, IWakuEventEmitter, Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils";
type PeerEvent<T> = (_event: CustomEvent<T>) => void;
const log = new Logger("health-indicator");
/**
* HealthIndicator monitors the health status of a Waku node by tracking peer connections
* and their supported protocols.
*
* The health status can be one of three states:
* - Unhealthy: No peer connections
* - MinimallyHealthy: At least 1 peer supporting both Filter and LightPush protocols
* - SufficientlyHealthy: At least 2 peers supporting both Filter and LightPush protocols
*
* @example
* // Create and start a health indicator
* const healthIndicator = new HealthIndicator({ libp2p: node.libp2p });
* healthIndicator.start();
*
* // Listen for health status changes
* healthIndicator.addEventListener(HealthStatusChangeEvents.StatusChange, (event) => {
* console.log(`Health status changed to: ${event.detail}`);
* });
*
* // Get current health status
* console.log(`Current health: ${healthIndicator.toString()}`);
*
* // Clean up when done
* healthIndicator.stop();
*
* @implements {IHealthIndicator}
*/
export class HealthIndicator
extends TypedEventEmitter<HealthIndicatorEvents>
implements IHealthIndicator
{
type HealthIndicatorParams = {
libp2p: Libp2p;
events: IWakuEventEmitter;
};
interface IHealthIndicator {
start(): void;
stop(): void;
toValue(): HealthStatus;
}
export class HealthIndicator implements IHealthIndicator {
private readonly libp2p: Libp2p;
private readonly events: IWakuEventEmitter;
private value: HealthStatus = HealthStatus.Unhealthy;
public constructor(params: HealthIndicatorParams) {
super();
this.libp2p = params.libp2p;
this.events = params.events;
this.onPeerIdentify = this.onPeerIdentify.bind(this);
this.onPeerDisconnected = this.onPeerDisconnected.bind(this);
}
/**
* Starts monitoring the health status by adding event listeners to libp2p events.
* Listens to peer connect and disconnect events to determine the node's health status.
*/
public start(): void {
log.info("start: adding listeners to libp2p");
@ -74,10 +45,6 @@ export class HealthIndicator
);
}
/**
* Stops monitoring the health status by removing event listeners from libp2p events.
* Cleans up the peer connect and disconnect event listeners.
*/
public stop(): void {
log.info("stop: removing listeners to libp2p");
@ -91,19 +58,7 @@ export class HealthIndicator
);
}
/**
* Returns the current health status as a string.
* @returns {string} Current health status (Unhealthy, MinimallyHealthy, or SufficientlyHealthy)
*/
public toString(): string {
return this.value;
}
/**
* Returns the current health status value.
* @returns {string} Current health status (Unhealthy, MinimallyHealthy, or SufficientlyHealthy)
*/
public toValue(): string {
public toValue(): HealthStatus {
return this.value;
}
@ -163,8 +118,8 @@ export class HealthIndicator
}
private dispatchHealthEvent(): void {
this.dispatchEvent(
new CustomEvent<HealthStatus>(HealthStatusChangeEvents.StatusChange, {
this.events.dispatchEvent(
new CustomEvent<HealthStatus>("waku:health", {
detail: this.value
})
);

View File

@ -10,7 +10,7 @@ import {
LightPushCodec,
StoreCodec
} from "@waku/core";
import { Libp2p, Protocols } from "@waku/interfaces";
import { Libp2p, Libp2pEventHandler, Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
const log = new Logger("peer-manager");
@ -49,8 +49,6 @@ interface IPeerManagerEvents {
[PeerManagerEventNames.Disconnect]: CustomEvent<PeerId>;
}
type Libp2pEventHandler<T> = (e: CustomEvent<T>) => void;
/**
* @description
* PeerManager is responsible for:

View File

@ -23,7 +23,11 @@ import type {
NetworkConfig,
PubsubTopic
} from "@waku/interfaces";
import { DefaultNetworkConfig, Protocols } from "@waku/interfaces";
import {
DefaultNetworkConfig,
HealthStatus,
Protocols
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { Filter } from "../filter/index.js";
@ -50,7 +54,6 @@ export class WakuNode implements IWaku {
public filter?: IFilter;
public lightPush?: ILightPush;
public readonly health: HealthIndicator;
public readonly events: IWakuEventEmitter = new TypedEventEmitter();
private readonly networkConfig: NetworkConfig;
@ -61,6 +64,7 @@ export class WakuNode implements IWaku {
private readonly connectionManager: ConnectionManager;
private readonly peerManager: PeerManager;
private readonly healthIndicator: HealthIndicator;
public constructor(
pubsubTopics: PubsubTopic[],
@ -99,7 +103,7 @@ export class WakuNode implements IWaku {
connectionManager: this.connectionManager
});
this.health = new HealthIndicator({ libp2p });
this.healthIndicator = new HealthIndicator({ libp2p, events: this.events });
if (protocolsEnabled.store) {
this.store = new Store({
@ -144,6 +148,10 @@ export class WakuNode implements IWaku {
return this.libp2p.getProtocols();
}
public get health(): HealthStatus {
return this.healthIndicator.toValue();
}
public async dial(
peer: PeerId | MultiaddrInput,
protocols?: Protocols[]
@ -216,7 +224,7 @@ export class WakuNode implements IWaku {
await this.libp2p.start();
this.connectionManager.start();
this.peerManager.start();
this.health.start();
this.healthIndicator.start();
this.lightPush?.start();
this._nodeStateLock = false;
@ -229,7 +237,7 @@ export class WakuNode implements IWaku {
this._nodeStateLock = true;
this.lightPush?.stop();
this.health.stop();
this.healthIndicator.stop();
this.peerManager.stop();
this.connectionManager.stop();
await this.libp2p.stop();