From 27292edabce801a5d2296437ca3e6198da018a24 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Tue, 15 Jul 2025 00:59:45 +0200 Subject: [PATCH] feat!: unify events under one source (#2473) * move health indicator under waku.events and expose from Waku as a value * update tests * make new type for libp2p event handlers * fix types --- .../connection_manager/connection_limiter.ts | 3 +- .../connection_manager/discovery_dialer.ts | 6 +- packages/interfaces/src/health_indicator.ts | 24 ------ packages/interfaces/src/health_status.ts | 16 ++++ packages/interfaces/src/index.ts | 2 +- packages/interfaces/src/libp2p.ts | 2 + packages/interfaces/src/waku.ts | 49 ++++++++++- .../health_indicator/health_indicator.spec.ts | 45 +++++----- .../src/health_indicator/health_indicator.ts | 83 +++++-------------- packages/sdk/src/peer_manager/peer_manager.ts | 4 +- packages/sdk/src/waku/waku.ts | 18 ++-- 11 files changed, 128 insertions(+), 124 deletions(-) delete mode 100644 packages/interfaces/src/health_indicator.ts create mode 100644 packages/interfaces/src/health_status.ts diff --git a/packages/core/src/lib/connection_manager/connection_limiter.ts b/packages/core/src/lib/connection_manager/connection_limiter.ts index f62d41229e..4210a8f761 100644 --- a/packages/core/src/lib/connection_manager/connection_limiter.ts +++ b/packages/core/src/lib/connection_manager/connection_limiter.ts @@ -3,6 +3,7 @@ import { ConnectionManagerOptions, IWakuEventEmitter, Libp2p, + Libp2pEventHandler, Tags } from "@waku/interfaces"; import { Logger } from "@waku/utils"; @@ -12,8 +13,6 @@ import { NetworkMonitor } from "./network_monitor.js"; const log = new Logger("connection-limiter"); -type Libp2pEventHandler = (e: CustomEvent) => void; - type ConnectionLimiterConstructorOptions = { libp2p: Libp2p; events: IWakuEventEmitter; diff --git a/packages/core/src/lib/connection_manager/discovery_dialer.ts b/packages/core/src/lib/connection_manager/discovery_dialer.ts index 7c40003221..d9ad80abe6 100644 --- a/packages/core/src/lib/connection_manager/discovery_dialer.ts +++ b/packages/core/src/lib/connection_manager/discovery_dialer.ts @@ -1,12 +1,10 @@ -import { Peer, PeerId, PeerInfo } from "@libp2p/interface"; +import { Libp2p, Peer, PeerId, PeerInfo } from "@libp2p/interface"; import { Multiaddr } from "@multiformats/multiaddr"; +import { Libp2pEventHandler } from "@waku/interfaces"; import { Logger } from "@waku/utils"; -import { Libp2p } from "libp2p"; import { Dialer } from "./dialer.js"; -type Libp2pEventHandler = (e: CustomEvent) => void; - type DiscoveryDialerConstructorOptions = { libp2p: Libp2p; dialer: Dialer; diff --git a/packages/interfaces/src/health_indicator.ts b/packages/interfaces/src/health_indicator.ts deleted file mode 100644 index 125afd7883..0000000000 --- a/packages/interfaces/src/health_indicator.ts +++ /dev/null @@ -1,24 +0,0 @@ -import { TypedEventEmitter } from "@libp2p/interface"; - -import { Libp2p } from "./libp2p.js"; - -export enum HealthStatusChangeEvents { - StatusChange = "health:change" -} - -export enum HealthStatus { - Unhealthy = "Unhealthy", - MinimallyHealthy = "MinimallyHealthy", - SufficientlyHealthy = "SufficientlyHealthy" -} - -export type HealthIndicatorEvents = { - [HealthStatusChangeEvents.StatusChange]: CustomEvent; -}; - -export interface IHealthIndicator - extends TypedEventEmitter {} - -export type HealthIndicatorParams = { - libp2p: Libp2p; -}; diff --git a/packages/interfaces/src/health_status.ts b/packages/interfaces/src/health_status.ts new file mode 100644 index 0000000000..ed87150e20 --- /dev/null +++ b/packages/interfaces/src/health_status.ts @@ -0,0 +1,16 @@ +export enum HealthStatus { + /** + * No peer connections + */ + Unhealthy = "Unhealthy", + + /** + * At least 1 peer supporting both Filter and LightPush protocols + */ + MinimallyHealthy = "MinimallyHealthy", + + /** + * At least 2 peers supporting both Filter and LightPush protocols + */ + SufficientlyHealthy = "SufficientlyHealthy" +} diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 4887607c5c..042a27dbd5 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -17,4 +17,4 @@ export * from "./metadata.js"; export * from "./constants.js"; export * from "./local_storage.js"; export * from "./sharding.js"; -export * from "./health_indicator.js"; +export * from "./health_status.js"; diff --git a/packages/interfaces/src/libp2p.ts b/packages/interfaces/src/libp2p.ts index 53bb106308..d02b6a5717 100644 --- a/packages/interfaces/src/libp2p.ts +++ b/packages/interfaces/src/libp2p.ts @@ -36,3 +36,5 @@ export type CreateLibp2pOptions = Libp2pOptions & { */ filterMultiaddrs?: boolean; }; + +export type Libp2pEventHandler = (e: CustomEvent) => void; diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 9bc58f77e2..049588c819 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -7,7 +7,7 @@ import type { import type { MultiaddrInput } from "@multiformats/multiaddr"; import type { IFilter } from "./filter.js"; -import type { IHealthIndicator } from "./health_indicator.js"; +import type { HealthStatus } from "./health_status.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; import { IDecodedMessage, IDecoder, IEncoder } from "./message.js"; @@ -35,7 +35,27 @@ export type CreateEncoderParams = CreateDecoderParams & { }; export interface IWakuEvents { + /** + * Emitted when a connection is established or lost. + * + * @example + * ```typescript + * waku.addEventListener("waku:connection", (event) => { + * console.log(event.detail); // true if connected, false if disconnected + * }); + */ "waku:connection": CustomEvent; + + /** + * Emitted when the health status changes. + * + * @example + * ```typescript + * waku.addEventListener("waku:health", (event) => { + * console.log(event.detail); // 'Unhealthy', 'MinimallyHealthy', or 'SufficientlyHealthy' + * }); + */ + "waku:health": CustomEvent; } export type IWakuEventEmitter = TypedEventEmitter; @@ -47,7 +67,19 @@ export interface IWaku { filter?: IFilter; lightPush?: ILightPush; - health: IHealthIndicator; + /** + * Emits events related to the Waku node. + * Those are: + * - "waku:connection" + * - "waku:health" + * + * @example + * ```typescript + * waku.events.addEventListener("waku:connection", (event) => { + * console.log(event.detail); // true if connected, false if disconnected + * }); + * ``` + */ events: IWakuEventEmitter; /** @@ -60,6 +92,19 @@ export interface IWaku { */ peerId: PeerId; + /** + * The health status can be one of three states: + * - Unhealthy: No peer connections + * - MinimallyHealthy: At least 1 peer supporting both Filter and LightPush protocols + * - SufficientlyHealthy: At least 2 peers supporting both Filter and LightPush protocols + * + * @example + * ```typescript + * console.log(waku.health); // 'Unhealthy' + * ``` + */ + health: HealthStatus; + /** * Returns a list of supported protocols. * diff --git a/packages/sdk/src/health_indicator/health_indicator.spec.ts b/packages/sdk/src/health_indicator/health_indicator.spec.ts index 12148210ee..2afc7bd285 100644 --- a/packages/sdk/src/health_indicator/health_indicator.spec.ts +++ b/packages/sdk/src/health_indicator/health_indicator.spec.ts @@ -1,10 +1,6 @@ import { Connection, Peer } from "@libp2p/interface"; import { FilterCodecs, LightPushCodec } from "@waku/core"; -import { - HealthStatus, - HealthStatusChangeEvents, - Libp2p -} from "@waku/interfaces"; +import { HealthStatus, IWakuEventEmitter, Libp2p } from "@waku/interfaces"; import { expect } from "chai"; import sinon from "sinon"; @@ -12,11 +8,13 @@ import { HealthIndicator } from "./health_indicator.js"; describe("HealthIndicator", () => { let libp2p: Libp2p; + let events: IWakuEventEmitter; let healthIndicator: HealthIndicator; beforeEach(() => { libp2p = mockLibp2p(); - healthIndicator = new HealthIndicator({ libp2p }); + events = mockEvents(); + healthIndicator = new HealthIndicator({ libp2p, events }); healthIndicator.start(); }); @@ -26,14 +24,13 @@ describe("HealthIndicator", () => { }); it("should initialize with Unhealthy status", () => { - expect(healthIndicator.toString()).to.equal(HealthStatus.Unhealthy); + expect(healthIndicator.toValue()).to.equal(HealthStatus.Unhealthy); }); it("should transition to Unhealthy when no connections", async () => { const statusChangePromise = new Promise((resolve) => { - healthIndicator.addEventListener( - HealthStatusChangeEvents.StatusChange, - (e: CustomEvent) => resolve(e.detail) + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) ); }); @@ -44,14 +41,13 @@ describe("HealthIndicator", () => { const changedStatus = await statusChangePromise; expect(changedStatus).to.equal(HealthStatus.Unhealthy); - expect(healthIndicator.toString()).to.equal(HealthStatus.Unhealthy); + expect(healthIndicator.toValue()).to.equal(HealthStatus.Unhealthy); }); it("should transition to MinimallyHealthy with one compatible peer", async () => { const statusChangePromise = new Promise((resolve) => { - healthIndicator.addEventListener( - HealthStatusChangeEvents.StatusChange, - (e: CustomEvent) => resolve(e.detail) + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) ); }); @@ -66,14 +62,13 @@ describe("HealthIndicator", () => { const changedStatus = await statusChangePromise; expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy); - expect(healthIndicator.toString()).to.equal(HealthStatus.MinimallyHealthy); + expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy); }); it("should transition to SufficientlyHealthy with multiple compatible peers", async () => { const statusChangePromise = new Promise((resolve) => { - healthIndicator.addEventListener( - HealthStatusChangeEvents.StatusChange, - (e: CustomEvent) => resolve(e.detail) + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) ); }); @@ -92,7 +87,7 @@ describe("HealthIndicator", () => { const changedStatus = await statusChangePromise; expect(changedStatus).to.equal(HealthStatus.SufficientlyHealthy); - expect(healthIndicator.toString()).to.equal( + expect(healthIndicator.toValue()).to.equal( HealthStatus.SufficientlyHealthy ); }); @@ -135,6 +130,18 @@ function mockLibp2p(): Libp2p { } as unknown as Libp2p; } +function mockEvents(): IWakuEventEmitter { + const events = new EventTarget(); + + return { + addEventListener: (event: string, handler: EventListener) => + events.addEventListener(event, handler), + removeEventListener: (event: string, handler: EventListener) => + events.removeEventListener(event, handler), + dispatchEvent: (event: Event) => events.dispatchEvent(event) + } as unknown as IWakuEventEmitter; +} + function mockPeer(id: string, protocols: string[]): Peer { return { id, diff --git a/packages/sdk/src/health_indicator/health_indicator.ts b/packages/sdk/src/health_indicator/health_indicator.ts index d73e6cd6b0..a4378d5b5e 100644 --- a/packages/sdk/src/health_indicator/health_indicator.ts +++ b/packages/sdk/src/health_indicator/health_indicator.ts @@ -1,66 +1,37 @@ -import { TypedEventEmitter } from "@libp2p/interface"; import type { IdentifyResult, PeerId } from "@libp2p/interface"; import { FilterCodecs, LightPushCodec } from "@waku/core"; -import { - HealthIndicatorEvents, - HealthIndicatorParams, - HealthStatus, - HealthStatusChangeEvents, - IHealthIndicator, - Libp2p -} from "@waku/interfaces"; +import { HealthStatus, IWakuEventEmitter, Libp2p } from "@waku/interfaces"; import { Logger } from "@waku/utils"; type PeerEvent = (_event: CustomEvent) => void; const log = new Logger("health-indicator"); -/** - * HealthIndicator monitors the health status of a Waku node by tracking peer connections - * and their supported protocols. - * - * The health status can be one of three states: - * - Unhealthy: No peer connections - * - MinimallyHealthy: At least 1 peer supporting both Filter and LightPush protocols - * - SufficientlyHealthy: At least 2 peers supporting both Filter and LightPush protocols - * - * @example - * // Create and start a health indicator - * const healthIndicator = new HealthIndicator({ libp2p: node.libp2p }); - * healthIndicator.start(); - * - * // Listen for health status changes - * healthIndicator.addEventListener(HealthStatusChangeEvents.StatusChange, (event) => { - * console.log(`Health status changed to: ${event.detail}`); - * }); - * - * // Get current health status - * console.log(`Current health: ${healthIndicator.toString()}`); - * - * // Clean up when done - * healthIndicator.stop(); - * - * @implements {IHealthIndicator} - */ -export class HealthIndicator - extends TypedEventEmitter - implements IHealthIndicator -{ +type HealthIndicatorParams = { + libp2p: Libp2p; + events: IWakuEventEmitter; +}; + +interface IHealthIndicator { + start(): void; + stop(): void; + toValue(): HealthStatus; +} + +export class HealthIndicator implements IHealthIndicator { private readonly libp2p: Libp2p; + private readonly events: IWakuEventEmitter; + private value: HealthStatus = HealthStatus.Unhealthy; public constructor(params: HealthIndicatorParams) { - super(); this.libp2p = params.libp2p; + this.events = params.events; this.onPeerIdentify = this.onPeerIdentify.bind(this); this.onPeerDisconnected = this.onPeerDisconnected.bind(this); } - /** - * Starts monitoring the health status by adding event listeners to libp2p events. - * Listens to peer connect and disconnect events to determine the node's health status. - */ public start(): void { log.info("start: adding listeners to libp2p"); @@ -74,10 +45,6 @@ export class HealthIndicator ); } - /** - * Stops monitoring the health status by removing event listeners from libp2p events. - * Cleans up the peer connect and disconnect event listeners. - */ public stop(): void { log.info("stop: removing listeners to libp2p"); @@ -91,19 +58,7 @@ export class HealthIndicator ); } - /** - * Returns the current health status as a string. - * @returns {string} Current health status (Unhealthy, MinimallyHealthy, or SufficientlyHealthy) - */ - public toString(): string { - return this.value; - } - - /** - * Returns the current health status value. - * @returns {string} Current health status (Unhealthy, MinimallyHealthy, or SufficientlyHealthy) - */ - public toValue(): string { + public toValue(): HealthStatus { return this.value; } @@ -163,8 +118,8 @@ export class HealthIndicator } private dispatchHealthEvent(): void { - this.dispatchEvent( - new CustomEvent(HealthStatusChangeEvents.StatusChange, { + this.events.dispatchEvent( + new CustomEvent("waku:health", { detail: this.value }) ); diff --git a/packages/sdk/src/peer_manager/peer_manager.ts b/packages/sdk/src/peer_manager/peer_manager.ts index 08c600160d..b945bde100 100644 --- a/packages/sdk/src/peer_manager/peer_manager.ts +++ b/packages/sdk/src/peer_manager/peer_manager.ts @@ -10,7 +10,7 @@ import { LightPushCodec, StoreCodec } from "@waku/core"; -import { Libp2p, Protocols } from "@waku/interfaces"; +import { Libp2p, Libp2pEventHandler, Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; const log = new Logger("peer-manager"); @@ -49,8 +49,6 @@ interface IPeerManagerEvents { [PeerManagerEventNames.Disconnect]: CustomEvent; } -type Libp2pEventHandler = (e: CustomEvent) => void; - /** * @description * PeerManager is responsible for: diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index 6b35287bc6..e547130927 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -23,7 +23,11 @@ import type { NetworkConfig, PubsubTopic } from "@waku/interfaces"; -import { DefaultNetworkConfig, Protocols } from "@waku/interfaces"; +import { + DefaultNetworkConfig, + HealthStatus, + Protocols +} from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { Filter } from "../filter/index.js"; @@ -50,7 +54,6 @@ export class WakuNode implements IWaku { public filter?: IFilter; public lightPush?: ILightPush; - public readonly health: HealthIndicator; public readonly events: IWakuEventEmitter = new TypedEventEmitter(); private readonly networkConfig: NetworkConfig; @@ -61,6 +64,7 @@ export class WakuNode implements IWaku { private readonly connectionManager: ConnectionManager; private readonly peerManager: PeerManager; + private readonly healthIndicator: HealthIndicator; public constructor( pubsubTopics: PubsubTopic[], @@ -99,7 +103,7 @@ export class WakuNode implements IWaku { connectionManager: this.connectionManager }); - this.health = new HealthIndicator({ libp2p }); + this.healthIndicator = new HealthIndicator({ libp2p, events: this.events }); if (protocolsEnabled.store) { this.store = new Store({ @@ -144,6 +148,10 @@ export class WakuNode implements IWaku { return this.libp2p.getProtocols(); } + public get health(): HealthStatus { + return this.healthIndicator.toValue(); + } + public async dial( peer: PeerId | MultiaddrInput, protocols?: Protocols[] @@ -216,7 +224,7 @@ export class WakuNode implements IWaku { await this.libp2p.start(); this.connectionManager.start(); this.peerManager.start(); - this.health.start(); + this.healthIndicator.start(); this.lightPush?.start(); this._nodeStateLock = false; @@ -229,7 +237,7 @@ export class WakuNode implements IWaku { this._nodeStateLock = true; this.lightPush?.stop(); - this.health.stop(); + this.healthIndicator.stop(); this.peerManager.stop(); this.connectionManager.stop(); await this.libp2p.stop();