From 551238006c82547394f49fb842c27a91aa1b89d1 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Fri, 18 Apr 2025 20:11:09 +0200 Subject: [PATCH] fix: health indicator properly handle peer:identify event (#2369) --- .../health_indicator/health_indicator.spec.ts | 12 +++- .../src/health_indicator/health_indicator.ts | 67 +++++++++++-------- 2 files changed, 50 insertions(+), 29 deletions(-) diff --git a/packages/sdk/src/health_indicator/health_indicator.spec.ts b/packages/sdk/src/health_indicator/health_indicator.spec.ts index e9a0f9c1e5..12148210ee 100644 --- a/packages/sdk/src/health_indicator/health_indicator.spec.ts +++ b/packages/sdk/src/health_indicator/health_indicator.spec.ts @@ -60,7 +60,9 @@ describe("HealthIndicator", () => { sinon.stub(libp2p, "getConnections").returns(connections); sinon.stub(libp2p.peerStore, "get").resolves(peer); - libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "1" })); + libp2p.dispatchEvent( + new CustomEvent("peer:identify", { detail: { peerId: "1" } }) + ); const changedStatus = await statusChangePromise; expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy); @@ -84,7 +86,9 @@ describe("HealthIndicator", () => { peerStoreStub.withArgs(connections[0].remotePeer).resolves(peer1); peerStoreStub.withArgs(connections[1].remotePeer).resolves(peer2); - libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "2" })); + libp2p.dispatchEvent( + new CustomEvent("peer:identify", { detail: { peerId: "2" } }) + ); const changedStatus = await statusChangePromise; expect(changedStatus).to.equal(HealthStatus.SufficientlyHealthy); @@ -99,9 +103,13 @@ describe("HealthIndicator", () => { healthIndicator.start(); expect(addEventSpy.calledTwice).to.be.true; + expect(addEventSpy.firstCall.args[0]).to.equal("peer:identify"); + expect(addEventSpy.secondCall.args[0]).to.equal("peer:disconnect"); healthIndicator.stop(); expect(removeEventSpy.calledTwice).to.be.true; + expect(removeEventSpy.firstCall.args[0]).to.equal("peer:identify"); + expect(removeEventSpy.secondCall.args[0]).to.equal("peer:disconnect"); }); }); diff --git a/packages/sdk/src/health_indicator/health_indicator.ts b/packages/sdk/src/health_indicator/health_indicator.ts index bf3f3aaa6e..d73e6cd6b0 100644 --- a/packages/sdk/src/health_indicator/health_indicator.ts +++ b/packages/sdk/src/health_indicator/health_indicator.ts @@ -1,5 +1,5 @@ import { TypedEventEmitter } from "@libp2p/interface"; -import type { PeerId } from "@libp2p/interface"; +import type { IdentifyResult, PeerId } from "@libp2p/interface"; import { FilterCodecs, LightPushCodec } from "@waku/core"; import { HealthIndicatorEvents, @@ -11,7 +11,7 @@ import { } from "@waku/interfaces"; import { Logger } from "@waku/utils"; -type PeerEvent = (_event: CustomEvent) => void; +type PeerEvent = (_event: CustomEvent) => void; const log = new Logger("health-indicator"); @@ -53,7 +53,8 @@ export class HealthIndicator super(); this.libp2p = params.libp2p; - this.onPeerChange = this.onPeerChange.bind(this); + this.onPeerIdentify = this.onPeerIdentify.bind(this); + this.onPeerDisconnected = this.onPeerDisconnected.bind(this); } /** @@ -64,12 +65,12 @@ export class HealthIndicator log.info("start: adding listeners to libp2p"); this.libp2p.addEventListener( - "peer:connect", - this.onPeerChange as PeerEvent + "peer:identify", + this.onPeerIdentify as PeerEvent ); this.libp2p.addEventListener( "peer:disconnect", - this.onPeerChange as PeerEvent + this.onPeerDisconnected as PeerEvent ); } @@ -81,12 +82,12 @@ export class HealthIndicator log.info("stop: removing listeners to libp2p"); this.libp2p.removeEventListener( - "peer:connect", - this.onPeerChange as PeerEvent + "peer:identify", + this.onPeerIdentify as PeerEvent ); this.libp2p.removeEventListener( "peer:disconnect", - this.onPeerChange as PeerEvent + this.onPeerDisconnected as PeerEvent ); } @@ -106,8 +107,26 @@ export class HealthIndicator return this.value; } - private async onPeerChange(event: CustomEvent): Promise { - log.info(`onPeerChange: received libp2p event - ${event.type}`); + private async onPeerDisconnected(_event: CustomEvent): Promise { + log.info(`onPeerDisconnected: received libp2p event`); + + const connections = this.libp2p.getConnections(); + + // we handle only Unhealthy here and onPeerIdentify will cover other cases + if (connections.length > 0) { + log.info("onPeerDisconnected: has connections, ignoring"); + } + + this.value = HealthStatus.Unhealthy; + log.info(`onPeerDisconnected: node identified as ${this.value}`); + + this.dispatchHealthEvent(); + } + + private async onPeerIdentify( + _event: CustomEvent + ): Promise { + log.info(`onPeerIdentify: received libp2p event`); const connections = this.libp2p.getConnections(); @@ -127,33 +146,27 @@ export class HealthIndicator p?.protocols.includes(LightPushCodec) ).length; - if (connections.length === 0) { - log.info(`onPeerChange: node identified as ${HealthStatus.Unhealthy}`); - + if (filterPeers === 0 || lightPushPeers === 0) { this.value = HealthStatus.Unhealthy; } else if (filterPeers >= 2 && lightPushPeers >= 2) { - log.info( - `onPeerChange: node identified as ${HealthStatus.SufficientlyHealthy}` - ); - this.value = HealthStatus.SufficientlyHealthy; } else if (filterPeers === 1 && lightPushPeers === 1) { - log.info( - `onPeerChange: node identified as ${HealthStatus.MinimallyHealthy}` - ); - this.value = HealthStatus.MinimallyHealthy; + } else { + log.error( + `onPeerChange: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}` + ); } + log.info(`onPeerChange: node identified as ${this.value}`); + this.dispatchHealthEvent(); + } + + private dispatchHealthEvent(): void { this.dispatchEvent( new CustomEvent(HealthStatusChangeEvents.StatusChange, { detail: this.value }) ); - - // this shouldn't happen as we expect service nodes implement Filter and LightPush at the same time - log.error( - `onPeerChange: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}` - ); } }