From 3136f3a70452cbec8b4361cc9697622b0a2debf7 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Wed, 5 Feb 2025 13:24:50 +0100 Subject: [PATCH] feat: add HealthIndicator with simplified logic and testing (#2251) * implement HealthIndicator * up libp2p interface version * up lock * remove unused tests * expose HealthIndicator from Waku * update test, add start and stop * fix error handling --- package-lock.json | 40 +--- packages/core/src/index.ts | 2 - packages/core/src/lib/health_manager.ts | 90 --------- packages/discovery/package.json | 2 +- packages/interfaces/src/health_indicator.ts | 24 +++ packages/interfaces/src/health_manager.ts | 26 --- packages/interfaces/src/index.ts | 2 +- packages/interfaces/src/waku.ts | 9 +- packages/sdk/package.json | 2 +- .../health_indicator/health_indicator.spec.ts | 145 +++++++++++++++ .../src/health_indicator/health_indicator.ts | 159 ++++++++++++++++ packages/sdk/src/health_indicator/index.ts | 1 + packages/sdk/src/light_push/light_push.ts | 7 +- packages/sdk/src/waku/waku.ts | 10 +- .../tests/tests/health-manager/node.spec.ts | 175 ------------------ .../tests/health-manager/protocols.spec.ts | 94 ---------- packages/tests/tests/health-manager/utils.ts | 21 --- 17 files changed, 348 insertions(+), 461 deletions(-) delete mode 100644 packages/core/src/lib/health_manager.ts create mode 100644 packages/interfaces/src/health_indicator.ts delete mode 100644 packages/interfaces/src/health_manager.ts create mode 100644 packages/sdk/src/health_indicator/health_indicator.spec.ts create mode 100644 packages/sdk/src/health_indicator/health_indicator.ts create mode 100644 packages/sdk/src/health_indicator/index.ts delete mode 100644 packages/tests/tests/health-manager/node.spec.ts delete mode 100644 packages/tests/tests/health-manager/protocols.spec.ts delete mode 100644 packages/tests/tests/health-manager/utils.ts diff --git a/package-lock.json b/package-lock.json index c17ac62759..97f2fe94b5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5768,9 +5768,9 @@ } }, "node_modules/@libp2p/interface": { - "version": "2.4.0", - "resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.4.0.tgz", - "integrity": "sha512-PfzxOaz7dU4sdnUNByGLoEk9iqhD0IS+LQMQB12CXh6VyYLA7J8oaoHk3yRBZze3Y4FPa5DHMm5Oi9O/IhreaQ==", + "version": "2.4.1", + "resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.4.1.tgz", + "integrity": "sha512-G80+rWn0d1+txM7TXMs+eK79qXdtS3yfepx2uGA5Kc7WSzXicwMN1Qw6ZJAB58SExdfQ0oWlS0E/v7kr8B025g==", "license": "Apache-2.0 OR MIT", "dependencies": { "@multiformats/multiaddr": "^12.3.3", @@ -41107,7 +41107,7 @@ "uint8arrays": "^5.0.1" }, "devDependencies": { - "@libp2p/interface": "2.0.1", + "@libp2p/interface": "^2.1.3", "@libp2p/peer-id": "5.0.1", "@multiformats/multiaddr": "^12.3.0", "@rollup/plugin-commonjs": "^25.0.7", @@ -41129,21 +41129,6 @@ "node": ">=20" } }, - "packages/discovery/node_modules/@libp2p/interface": { - "version": "2.0.1", - "resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.0.1.tgz", - "integrity": "sha512-zDAgu+ZNiYZxVsmcvCeNCLMnGORwLMMI8w0k2YcHwolATsv2q7QG3KpakmyKjH4m7C0hT86lGgf1sgGobPssYA==", - "dev": true, - "license": "Apache-2.0 OR MIT", - "dependencies": { - "@multiformats/multiaddr": "^12.2.3", - "it-pushable": "^3.2.3", - "it-stream-types": "^2.0.1", - "multiformats": "^13.1.0", - "progress-events": "^1.0.0", - "uint8arraylist": "^2.4.8" - } - }, "packages/discovery/node_modules/@libp2p/peer-id": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/@libp2p/peer-id/-/peer-id-5.0.1.tgz", @@ -41381,7 +41366,7 @@ "libp2p": "2.1.8" }, "devDependencies": { - "@libp2p/interface": "2.1.3", + "@libp2p/interface": "^2.1.3", "@rollup/plugin-commonjs": "^25.0.7", "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", @@ -41401,21 +41386,6 @@ "node": ">=20" } }, - "packages/sdk/node_modules/@libp2p/interface": { - "version": "2.1.3", - "resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.1.3.tgz", - "integrity": "sha512-t1i2LWcnTGJEr7fDMslA8wYwBzJP81QKBlrBHoGhXxqqpRQa9035roCh/Akuw5RUgjKE47/ezjuzo90aWsJB8g==", - "dev": true, - "license": "Apache-2.0 OR MIT", - "dependencies": { - "@multiformats/multiaddr": "^12.2.3", - "it-pushable": "^3.2.3", - "it-stream-types": "^2.0.1", - "multiformats": "^13.1.0", - "progress-events": "^1.0.0", - "uint8arraylist": "^2.4.8" - } - }, "packages/sdk/node_modules/@sinonjs/fake-timers": { "version": "13.0.5", "resolved": "https://registry.npmjs.org/@sinonjs/fake-timers/-/fake-timers-13.0.5.tgz", diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index cd9f894730..c744a72ef8 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -17,8 +17,6 @@ export { StoreCore, StoreCodec } from "./lib/store/index.js"; export { ConnectionManager } from "./lib/connection_manager/index.js"; -export { getHealthManager } from "./lib/health_manager.js"; - export { StreamManager } from "./lib/stream_manager/index.js"; export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js"; diff --git a/packages/core/src/lib/health_manager.ts b/packages/core/src/lib/health_manager.ts deleted file mode 100644 index c41cf7950f..0000000000 --- a/packages/core/src/lib/health_manager.ts +++ /dev/null @@ -1,90 +0,0 @@ -import { - HealthStatus, - type IHealthManager, - NodeHealth, - type ProtocolHealth, - Protocols -} from "@waku/interfaces"; - -class HealthManager implements IHealthManager { - public static instance: HealthManager; - private readonly health: NodeHealth; - - private constructor() { - this.health = { - overallStatus: HealthStatus.Unhealthy, - protocolStatuses: new Map() - }; - } - - public static getInstance(): HealthManager { - if (!HealthManager.instance) { - HealthManager.instance = new HealthManager(); - } - return HealthManager.instance; - } - - public getHealthStatus(): HealthStatus { - return this.health.overallStatus; - } - - public getProtocolStatus(protocol: Protocols): ProtocolHealth | undefined { - return this.health.protocolStatuses.get(protocol); - } - - public updateProtocolHealth( - multicodec: string, - connectedPeers: number - ): void { - const protocol = this.getNameFromMulticodec(multicodec); - - let status: HealthStatus = HealthStatus.Unhealthy; - if (connectedPeers == 1) { - status = HealthStatus.MinimallyHealthy; - } else if (connectedPeers >= 2) { - status = HealthStatus.SufficientlyHealthy; - } - - this.health.protocolStatuses.set(protocol, { - name: protocol, - status: status, - lastUpdate: new Date() - }); - - this.updateOverallHealth(); - } - - private getNameFromMulticodec(multicodec: string): Protocols { - let name: Protocols; - if (multicodec.includes("filter")) { - name = Protocols.Filter; - } else if (multicodec.includes("lightpush")) { - name = Protocols.LightPush; - } else if (multicodec.includes("store")) { - name = Protocols.Store; - } else { - throw new Error(`Unknown protocol: ${multicodec}`); - } - return name; - } - - private updateOverallHealth(): void { - const relevantProtocols = [Protocols.LightPush, Protocols.Filter]; - const statuses = relevantProtocols.map( - (p) => this.getProtocolStatus(p)?.status - ); - - if (statuses.some((status) => status === HealthStatus.Unhealthy)) { - this.health.overallStatus = HealthStatus.Unhealthy; - } else if ( - statuses.some((status) => status === HealthStatus.MinimallyHealthy) - ) { - this.health.overallStatus = HealthStatus.MinimallyHealthy; - } else { - this.health.overallStatus = HealthStatus.SufficientlyHealthy; - } - } -} - -export const getHealthManager = (): HealthManager => - HealthManager.getInstance(); diff --git a/packages/discovery/package.json b/packages/discovery/package.json index 4757f0dfc6..066142110b 100644 --- a/packages/discovery/package.json +++ b/packages/discovery/package.json @@ -62,7 +62,7 @@ "uint8arrays": "^5.0.1" }, "devDependencies": { - "@libp2p/interface": "2.0.1", + "@libp2p/interface": "^2.1.3", "@libp2p/peer-id": "5.0.1", "@multiformats/multiaddr": "^12.3.0", "@rollup/plugin-commonjs": "^25.0.7", diff --git a/packages/interfaces/src/health_indicator.ts b/packages/interfaces/src/health_indicator.ts new file mode 100644 index 0000000000..125afd7883 --- /dev/null +++ b/packages/interfaces/src/health_indicator.ts @@ -0,0 +1,24 @@ +import { TypedEventEmitter } from "@libp2p/interface"; + +import { Libp2p } from "./libp2p.js"; + +export enum HealthStatusChangeEvents { + StatusChange = "health:change" +} + +export enum HealthStatus { + Unhealthy = "Unhealthy", + MinimallyHealthy = "MinimallyHealthy", + SufficientlyHealthy = "SufficientlyHealthy" +} + +export type HealthIndicatorEvents = { + [HealthStatusChangeEvents.StatusChange]: CustomEvent; +}; + +export interface IHealthIndicator + extends TypedEventEmitter {} + +export type HealthIndicatorParams = { + libp2p: Libp2p; +}; diff --git a/packages/interfaces/src/health_manager.ts b/packages/interfaces/src/health_manager.ts deleted file mode 100644 index 8d3a1a9863..0000000000 --- a/packages/interfaces/src/health_manager.ts +++ /dev/null @@ -1,26 +0,0 @@ -import { Protocols } from "./protocols.js"; - -export enum HealthStatus { - Unhealthy = "Unhealthy", - MinimallyHealthy = "MinimallyHealthy", - SufficientlyHealthy = "SufficientlyHealthy" -} - -export interface IHealthManager { - getHealthStatus: () => HealthStatus; - getProtocolStatus: (protocol: Protocols) => ProtocolHealth | undefined; - updateProtocolHealth: (multicodec: string, connectedPeers: number) => void; -} - -export type NodeHealth = { - overallStatus: HealthStatus; - protocolStatuses: ProtocolsHealthStatus; -}; - -export type ProtocolHealth = { - name: Protocols; - status: HealthStatus; - lastUpdate: Date; -}; - -export type ProtocolsHealthStatus = Map; diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index ed2cb5240a..4887607c5c 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -16,5 +16,5 @@ export * from "./dns_discovery.js"; export * from "./metadata.js"; export * from "./constants.js"; export * from "./local_storage.js"; -export * from "./health_manager.js"; export * from "./sharding.js"; +export * from "./health_indicator.js"; diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 2b19f84479..ecf791a46a 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -1,12 +1,12 @@ import type { Peer, PeerId, Stream } from "@libp2p/interface"; import type { MultiaddrInput } from "@multiformats/multiaddr"; -import { IConnectionManager } from "./connection_manager.js"; +import type { IConnectionManager } from "./connection_manager.js"; import type { IFilter } from "./filter.js"; -import { IHealthManager } from "./health_manager.js"; +import type { IHealthIndicator } from "./health_indicator.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; -import { Protocols } from "./protocols.js"; +import type { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { IStore } from "./store.js"; @@ -16,9 +16,8 @@ export interface IWaku { store?: IStore; filter?: IFilter; lightPush?: ILightPush; - - health: IHealthManager; connectionManager: IConnectionManager; + health: IHealthIndicator; /** * Returns a unique identifier for a node on the network. diff --git a/packages/sdk/package.json b/packages/sdk/package.json index 33e0879d3e..ce3a6cdedb 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -76,7 +76,7 @@ "libp2p": "2.1.8" }, "devDependencies": { - "@libp2p/interface": "2.1.3", + "@libp2p/interface": "^2.1.3", "@types/chai": "^4.3.11", "@rollup/plugin-commonjs": "^25.0.7", "@rollup/plugin-json": "^6.0.0", diff --git a/packages/sdk/src/health_indicator/health_indicator.spec.ts b/packages/sdk/src/health_indicator/health_indicator.spec.ts new file mode 100644 index 0000000000..e9a0f9c1e5 --- /dev/null +++ b/packages/sdk/src/health_indicator/health_indicator.spec.ts @@ -0,0 +1,145 @@ +import { Connection, Peer } from "@libp2p/interface"; +import { FilterCodecs, LightPushCodec } from "@waku/core"; +import { + HealthStatus, + HealthStatusChangeEvents, + Libp2p +} from "@waku/interfaces"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { HealthIndicator } from "./health_indicator.js"; + +describe("HealthIndicator", () => { + let libp2p: Libp2p; + let healthIndicator: HealthIndicator; + + beforeEach(() => { + libp2p = mockLibp2p(); + healthIndicator = new HealthIndicator({ libp2p }); + healthIndicator.start(); + }); + + afterEach(() => { + healthIndicator.stop(); + sinon.restore(); + }); + + it("should initialize with Unhealthy status", () => { + expect(healthIndicator.toString()).to.equal(HealthStatus.Unhealthy); + }); + + it("should transition to Unhealthy when no connections", async () => { + const statusChangePromise = new Promise((resolve) => { + healthIndicator.addEventListener( + HealthStatusChangeEvents.StatusChange, + (e: CustomEvent) => resolve(e.detail) + ); + }); + + const connections: Connection[] = []; + sinon.stub(libp2p, "getConnections").returns(connections); + + libp2p.dispatchEvent(new CustomEvent("peer:disconnect", { detail: "1" })); + + const changedStatus = await statusChangePromise; + expect(changedStatus).to.equal(HealthStatus.Unhealthy); + expect(healthIndicator.toString()).to.equal(HealthStatus.Unhealthy); + }); + + it("should transition to MinimallyHealthy with one compatible peer", async () => { + const statusChangePromise = new Promise((resolve) => { + healthIndicator.addEventListener( + HealthStatusChangeEvents.StatusChange, + (e: CustomEvent) => resolve(e.detail) + ); + }); + + const peer = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]); + const connections = [mockConnection("1")]; + sinon.stub(libp2p, "getConnections").returns(connections); + sinon.stub(libp2p.peerStore, "get").resolves(peer); + + libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "1" })); + + const changedStatus = await statusChangePromise; + expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy); + expect(healthIndicator.toString()).to.equal(HealthStatus.MinimallyHealthy); + }); + + it("should transition to SufficientlyHealthy with multiple compatible peers", async () => { + const statusChangePromise = new Promise((resolve) => { + healthIndicator.addEventListener( + HealthStatusChangeEvents.StatusChange, + (e: CustomEvent) => resolve(e.detail) + ); + }); + + const peer1 = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]); + const peer2 = mockPeer("2", [FilterCodecs.SUBSCRIBE, LightPushCodec]); + const connections = [mockConnection("1"), mockConnection("2")]; + + sinon.stub(libp2p, "getConnections").returns(connections); + const peerStoreStub = sinon.stub(libp2p.peerStore, "get"); + peerStoreStub.withArgs(connections[0].remotePeer).resolves(peer1); + peerStoreStub.withArgs(connections[1].remotePeer).resolves(peer2); + + libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "2" })); + + const changedStatus = await statusChangePromise; + expect(changedStatus).to.equal(HealthStatus.SufficientlyHealthy); + expect(healthIndicator.toString()).to.equal( + HealthStatus.SufficientlyHealthy + ); + }); + + it("should properly start and stop event listening", () => { + const addEventSpy = sinon.spy(libp2p, "addEventListener"); + const removeEventSpy = sinon.spy(libp2p, "removeEventListener"); + + healthIndicator.start(); + expect(addEventSpy.calledTwice).to.be.true; + + healthIndicator.stop(); + expect(removeEventSpy.calledTwice).to.be.true; + }); +}); + +function mockLibp2p(): Libp2p { + const peerStore = { + get: (id: any) => Promise.resolve(mockPeer(id.toString(), [])) + }; + + const events = new EventTarget(); + + return { + peerStore, + addEventListener: (event: string, handler: EventListener) => + events.addEventListener(event, handler), + removeEventListener: (event: string, handler: EventListener) => + events.removeEventListener(event, handler), + dispatchEvent: (event: Event) => events.dispatchEvent(event), + getConnections: () => [], + components: { + events, + peerStore + } + } as unknown as Libp2p; +} + +function mockPeer(id: string, protocols: string[]): Peer { + return { + id, + protocols + } as unknown as Peer; +} + +function mockConnection(id: string): Connection { + return { + remotePeer: { + toString: () => id, + equals: (other: any) => other.toString() === id + }, + status: "open" + } as unknown as Connection; +} diff --git a/packages/sdk/src/health_indicator/health_indicator.ts b/packages/sdk/src/health_indicator/health_indicator.ts new file mode 100644 index 0000000000..bf3f3aaa6e --- /dev/null +++ b/packages/sdk/src/health_indicator/health_indicator.ts @@ -0,0 +1,159 @@ +import { TypedEventEmitter } from "@libp2p/interface"; +import type { PeerId } from "@libp2p/interface"; +import { FilterCodecs, LightPushCodec } from "@waku/core"; +import { + HealthIndicatorEvents, + HealthIndicatorParams, + HealthStatus, + HealthStatusChangeEvents, + IHealthIndicator, + Libp2p +} from "@waku/interfaces"; +import { Logger } from "@waku/utils"; + +type PeerEvent = (_event: CustomEvent) => void; + +const log = new Logger("health-indicator"); + +/** + * HealthIndicator monitors the health status of a Waku node by tracking peer connections + * and their supported protocols. + * + * The health status can be one of three states: + * - Unhealthy: No peer connections + * - MinimallyHealthy: At least 1 peer supporting both Filter and LightPush protocols + * - SufficientlyHealthy: At least 2 peers supporting both Filter and LightPush protocols + * + * @example + * // Create and start a health indicator + * const healthIndicator = new HealthIndicator({ libp2p: node.libp2p }); + * healthIndicator.start(); + * + * // Listen for health status changes + * healthIndicator.addEventListener(HealthStatusChangeEvents.StatusChange, (event) => { + * console.log(`Health status changed to: ${event.detail}`); + * }); + * + * // Get current health status + * console.log(`Current health: ${healthIndicator.toString()}`); + * + * // Clean up when done + * healthIndicator.stop(); + * + * @implements {IHealthIndicator} + */ +export class HealthIndicator + extends TypedEventEmitter + implements IHealthIndicator +{ + private readonly libp2p: Libp2p; + private value: HealthStatus = HealthStatus.Unhealthy; + + public constructor(params: HealthIndicatorParams) { + super(); + this.libp2p = params.libp2p; + + this.onPeerChange = this.onPeerChange.bind(this); + } + + /** + * Starts monitoring the health status by adding event listeners to libp2p events. + * Listens to peer connect and disconnect events to determine the node's health status. + */ + public start(): void { + log.info("start: adding listeners to libp2p"); + + this.libp2p.addEventListener( + "peer:connect", + this.onPeerChange as PeerEvent + ); + this.libp2p.addEventListener( + "peer:disconnect", + this.onPeerChange as PeerEvent + ); + } + + /** + * Stops monitoring the health status by removing event listeners from libp2p events. + * Cleans up the peer connect and disconnect event listeners. + */ + public stop(): void { + log.info("stop: removing listeners to libp2p"); + + this.libp2p.removeEventListener( + "peer:connect", + this.onPeerChange as PeerEvent + ); + this.libp2p.removeEventListener( + "peer:disconnect", + this.onPeerChange as PeerEvent + ); + } + + /** + * Returns the current health status as a string. + * @returns {string} Current health status (Unhealthy, MinimallyHealthy, or SufficientlyHealthy) + */ + public toString(): string { + return this.value; + } + + /** + * Returns the current health status value. + * @returns {string} Current health status (Unhealthy, MinimallyHealthy, or SufficientlyHealthy) + */ + public toValue(): string { + return this.value; + } + + private async onPeerChange(event: CustomEvent): Promise { + log.info(`onPeerChange: received libp2p event - ${event.type}`); + + const connections = this.libp2p.getConnections(); + + const peers = await Promise.all( + connections.map(async (c) => { + try { + return await this.libp2p.peerStore.get(c.remotePeer); + } catch (e) { + return null; + } + }) + ); + const filterPeers = peers.filter((p) => + p?.protocols.includes(FilterCodecs.SUBSCRIBE) + ).length; + const lightPushPeers = peers.filter((p) => + p?.protocols.includes(LightPushCodec) + ).length; + + if (connections.length === 0) { + log.info(`onPeerChange: node identified as ${HealthStatus.Unhealthy}`); + + this.value = HealthStatus.Unhealthy; + } else if (filterPeers >= 2 && lightPushPeers >= 2) { + log.info( + `onPeerChange: node identified as ${HealthStatus.SufficientlyHealthy}` + ); + + this.value = HealthStatus.SufficientlyHealthy; + } else if (filterPeers === 1 && lightPushPeers === 1) { + log.info( + `onPeerChange: node identified as ${HealthStatus.MinimallyHealthy}` + ); + + this.value = HealthStatus.MinimallyHealthy; + } + + this.dispatchEvent( + new CustomEvent(HealthStatusChangeEvents.StatusChange, { + detail: this.value + }) + ); + + // this shouldn't happen as we expect service nodes implement Filter and LightPush at the same time + log.error( + `onPeerChange: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}` + ); + } +} diff --git a/packages/sdk/src/health_indicator/index.ts b/packages/sdk/src/health_indicator/index.ts new file mode 100644 index 0000000000..2bd864b4dc --- /dev/null +++ b/packages/sdk/src/health_indicator/index.ts @@ -0,0 +1 @@ +export { HealthIndicator } from "./health_indicator.js"; diff --git a/packages/sdk/src/light_push/light_push.ts b/packages/sdk/src/light_push/light_push.ts index 5b3f6b67be..e2681314d8 100644 --- a/packages/sdk/src/light_push/light_push.ts +++ b/packages/sdk/src/light_push/light_push.ts @@ -1,5 +1,5 @@ import type { PeerId } from "@libp2p/interface"; -import { ConnectionManager, getHealthManager, LightPushCore } from "@waku/core"; +import { ConnectionManager, LightPushCore } from "@waku/core"; import { type CoreProtocolResult, Failure, @@ -101,11 +101,6 @@ export class LightPush implements ILightPush { } } - getHealthManager().updateProtocolHealth( - this.protocol.multicodec, - successes.length - ); - return { successes, failures diff --git a/packages/sdk/src/waku/waku.ts b/packages/sdk/src/waku/waku.ts index d057b758db..95cf465cc3 100644 --- a/packages/sdk/src/waku/waku.ts +++ b/packages/sdk/src/waku/waku.ts @@ -1,11 +1,10 @@ import { isPeerId } from "@libp2p/interface"; import type { Peer, PeerId, Stream } from "@libp2p/interface"; import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; -import { ConnectionManager, getHealthManager, StoreCodec } from "@waku/core"; +import { ConnectionManager, StoreCodec } from "@waku/core"; import type { CreateNodeOptions, IFilter, - IHealthManager, ILightPush, IRelay, IStore, @@ -17,6 +16,7 @@ import { Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { wakuFilter } from "../filter/index.js"; +import { HealthIndicator } from "../health_indicator/index.js"; import { wakuLightPush } from "../light_push/index.js"; import { PeerManager } from "../peer_manager/index.js"; import { wakuStore } from "../store/index.js"; @@ -38,7 +38,7 @@ export class WakuNode implements IWaku { public filter?: IFilter; public lightPush?: ILightPush; public connectionManager: ConnectionManager; - public readonly health: IHealthManager; + public health: HealthIndicator; private readonly peerManager: PeerManager; @@ -75,7 +75,7 @@ export class WakuNode implements IWaku { } }); - this.health = getHealthManager(); + this.health = new HealthIndicator({ libp2p }); if (protocolsEnabled.store) { if (options.store?.peer) { @@ -183,9 +183,11 @@ export class WakuNode implements IWaku { public async start(): Promise { await this.libp2p.start(); + this.health.start(); } public async stop(): Promise { + this.health.stop(); this.peerManager.stop(); this.connectionManager.stop(); await this.libp2p.stop(); diff --git a/packages/tests/tests/health-manager/node.spec.ts b/packages/tests/tests/health-manager/node.spec.ts deleted file mode 100644 index 6e50522a17..0000000000 --- a/packages/tests/tests/health-manager/node.spec.ts +++ /dev/null @@ -1,175 +0,0 @@ -import { HealthStatus, IWaku, LightNode, Protocols } from "@waku/interfaces"; -import { createLightNode } from "@waku/sdk"; -import { shardInfoToPubsubTopics } from "@waku/utils"; -import { expect } from "chai"; - -import { - afterEachCustom, - runMultipleNodes, - ServiceNode, - ServiceNodesFleet -} from "../../src/index.js"; - -import { - messagePayload, - TestDecoder, - TestEncoder, - TestShardInfo -} from "./utils.js"; - -// TODO(weboko): resolve https://github.com/waku-org/js-waku/issues/2186 -describe.skip("Node Health Status Matrix Tests", function () { - let waku: LightNode; - let serviceNodes: ServiceNode[]; - - afterEachCustom(this, async function () { - if (waku) { - await waku.stop(); - } - if (serviceNodes) { - await Promise.all(serviceNodes.map((node) => node.stop())); - } - }); - - const peerCounts = [0, 1, 2, 3]; - - peerCounts.forEach((lightPushPeers) => { - peerCounts.forEach((filterPeers) => { - it(`LightPush: ${lightPushPeers} peers, Filter: ${filterPeers} peers`, async function () { - this.timeout(10_000); - - [waku, serviceNodes] = await setupTestEnvironment( - this.ctx, - lightPushPeers, - filterPeers - ); - - if (lightPushPeers > 0) { - await waku.lightPush.send(TestEncoder, messagePayload); - } - - if (filterPeers > 0) { - await waku.filter.subscribe([TestDecoder], () => {}); - } - - const lightPushHealth = waku.health.getProtocolStatus( - Protocols.LightPush - ); - const filterHealth = waku.health.getProtocolStatus(Protocols.Filter); - - lightPushPeers = await getPeerCounBasedOnConnections( - waku, - waku.lightPush.protocol.multicodec - ); - expect(lightPushHealth?.status).to.equal( - getExpectedProtocolStatus(lightPushPeers) - ); - expect(filterHealth?.status).to.equal( - getExpectedProtocolStatus(filterPeers) - ); - - const expectedHealth = getExpectedNodeHealth( - lightPushPeers, - filterPeers - ); - const nodeHealth = waku.health.getHealthStatus(); - expect(nodeHealth).to.equal(expectedHealth); - }); - }); - }); -}); - -function getExpectedProtocolStatus(peerCount: number): HealthStatus { - if (peerCount === 0) return HealthStatus.Unhealthy; - if (peerCount === 1) return HealthStatus.MinimallyHealthy; - return HealthStatus.SufficientlyHealthy; -} - -async function getPeerCounBasedOnConnections( - waku: IWaku, - codec: string -): Promise { - const peerIDs = waku.libp2p - .getConnections() - .map((c) => c.remotePeer.toString()); - - const peers = await waku.libp2p.peerStore.all(); - - return peers - .filter((peer) => peerIDs.includes(peer.id.toString())) - .filter((peer) => peer.protocols.includes(codec)).length; -} - -function getExpectedNodeHealth( - lightPushPeers: number, - filterPeers: number -): HealthStatus { - if (lightPushPeers === 0 || filterPeers === 0) { - return HealthStatus.Unhealthy; - } else if (lightPushPeers === 1 || filterPeers === 1) { - return HealthStatus.MinimallyHealthy; - } else { - return HealthStatus.SufficientlyHealthy; - } -} - -async function runNodeWithProtocols( - lightPush: boolean, - filter: boolean -): Promise { - const serviceNode = new ServiceNode(`node-${Date.now()}`); - await serviceNode.start({ - lightpush: lightPush, - filter: filter, - relay: true, - clusterId: TestShardInfo.clusterId, - pubsubTopic: shardInfoToPubsubTopics(TestShardInfo) - }); - return serviceNode; -} - -async function setupTestEnvironment( - context: Mocha.Context, - lightPushPeers: number, - filterPeers: number -): Promise<[LightNode, ServiceNode[]]> { - let commonPeers: number; - if (lightPushPeers === 0 || filterPeers === 0) { - commonPeers = Math.max(lightPushPeers, filterPeers); - } else { - commonPeers = Math.min(lightPushPeers, filterPeers); - } - - let waku: LightNode; - const serviceNodes: ServiceNode[] = []; - let serviceNodesFleet: ServiceNodesFleet; - - if (commonPeers > 0) { - [serviceNodesFleet, waku] = await runMultipleNodes( - context, - TestShardInfo, - { filter: true, lightpush: true }, - undefined, - commonPeers - ); - serviceNodes.push(...serviceNodesFleet.nodes); - } else { - waku = await createLightNode({ networkConfig: TestShardInfo }); - } - - // Create additional LightPush nodes if needed - for (let i = commonPeers; i < lightPushPeers; i++) { - const node = await runNodeWithProtocols(true, false); - serviceNodes.push(node); - await waku.dial(await node.getMultiaddrWithId()); - } - - // Create additional Filter nodes if needed - for (let i = commonPeers; i < filterPeers; i++) { - const node = await runNodeWithProtocols(false, true); - serviceNodes.push(node); - await waku.dial(await node.getMultiaddrWithId()); - } - - return [waku, serviceNodes]; -} diff --git a/packages/tests/tests/health-manager/protocols.spec.ts b/packages/tests/tests/health-manager/protocols.spec.ts deleted file mode 100644 index 95d4ec5d71..0000000000 --- a/packages/tests/tests/health-manager/protocols.spec.ts +++ /dev/null @@ -1,94 +0,0 @@ -import { HealthStatus, type LightNode, Protocols } from "@waku/sdk"; -import { expect } from "chai"; - -import { - afterEachCustom, - runMultipleNodes, - ServiceNodesFleet, - teardownNodesWithRedundancy -} from "../../src/index.js"; - -import { - messagePayload, - TestDecoder, - TestEncoder, - TestShardInfo -} from "./utils.js"; - -const NUM_NODES = [0, 1, 2, 3]; - -// TODO(weboko): resolve https://github.com/waku-org/js-waku/issues/2186 -describe.skip("Health Manager", function () { - this.timeout(10_000); - - let waku: LightNode; - let serviceNodes: ServiceNodesFleet; - - afterEachCustom(this, async () => { - await teardownNodesWithRedundancy(serviceNodes, waku); - }); - - describe("Should update the health status for protocols", () => { - this.timeout(10_000); - - NUM_NODES.map((num) => { - it(`LightPush with ${num} connections`, async function () { - this.timeout(10_000); - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - TestShardInfo, - { lightpush: true, filter: true }, - undefined, - num - ); - - await waku.lightPush.send(TestEncoder, messagePayload); - - const health = waku.health.getProtocolStatus(Protocols.LightPush); - if (!health) { - expect(health).to.not.equal(undefined); - } - - if (num === 0) { - expect(health?.status).to.equal(HealthStatus.Unhealthy); - } else if (num < 2) { - expect(health?.status).to.equal(HealthStatus.MinimallyHealthy); - } else if (num >= 2) { - expect(health?.status).to.equal(HealthStatus.SufficientlyHealthy); - } else { - throw new Error("Invalid number of connections"); - } - }); - it(`Filter with ${num} connections`, async function () { - [serviceNodes, waku] = await runMultipleNodes( - this.ctx, - TestShardInfo, - { filter: true, lightpush: true }, - undefined, - num - ); - - const { error } = await waku.filter.subscribe([TestDecoder], () => {}); - - if (error) { - expect(error).to.not.equal(undefined); - } - - const health = waku.health.getProtocolStatus(Protocols.Filter); - if (!health) { - expect(health).to.not.equal(undefined); - } - - if (num === 0) { - expect(health?.status).to.equal(HealthStatus.Unhealthy); - } else if (num < 2) { - expect(health?.status).to.equal(HealthStatus.MinimallyHealthy); - } else if (num >= 2) { - expect(health?.status).to.equal(HealthStatus.SufficientlyHealthy); - } else { - throw new Error("Invalid number of connections"); - } - }); - }); - }); -}); diff --git a/packages/tests/tests/health-manager/utils.ts b/packages/tests/tests/health-manager/utils.ts deleted file mode 100644 index 564e4cfdf4..0000000000 --- a/packages/tests/tests/health-manager/utils.ts +++ /dev/null @@ -1,21 +0,0 @@ -import { createDecoder, createEncoder } from "@waku/core"; -import { utf8ToBytes } from "@waku/sdk"; -import { contentTopicToPubsubTopic } from "@waku/utils"; - -export const TestContentTopic = "/test/1/waku-filter/default"; -export const ClusterId = 2; -export const TestShardInfo = { - contentTopics: [TestContentTopic], - clusterId: ClusterId -}; -export const TestPubsubTopic = contentTopicToPubsubTopic( - TestContentTopic, - ClusterId -); -export const TestEncoder = createEncoder({ - contentTopic: TestContentTopic, - pubsubTopic: TestPubsubTopic -}); -export const TestDecoder = createDecoder(TestContentTopic, TestPubsubTopic); -export const messageText = "Filtering works!"; -export const messagePayload = { payload: utf8ToBytes(messageText) };