fix: health indicator properly handle peer:identify event (#2369)

This commit is contained in:
Sasha 2025-04-18 20:11:09 +02:00 committed by GitHub
parent 3038c48917
commit 551238006c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 50 additions and 29 deletions

View File

@ -60,7 +60,9 @@ describe("HealthIndicator", () => {
sinon.stub(libp2p, "getConnections").returns(connections);
sinon.stub(libp2p.peerStore, "get").resolves(peer);
libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "1" }));
libp2p.dispatchEvent(
new CustomEvent("peer:identify", { detail: { peerId: "1" } })
);
const changedStatus = await statusChangePromise;
expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy);
@ -84,7 +86,9 @@ describe("HealthIndicator", () => {
peerStoreStub.withArgs(connections[0].remotePeer).resolves(peer1);
peerStoreStub.withArgs(connections[1].remotePeer).resolves(peer2);
libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "2" }));
libp2p.dispatchEvent(
new CustomEvent("peer:identify", { detail: { peerId: "2" } })
);
const changedStatus = await statusChangePromise;
expect(changedStatus).to.equal(HealthStatus.SufficientlyHealthy);
@ -99,9 +103,13 @@ describe("HealthIndicator", () => {
healthIndicator.start();
expect(addEventSpy.calledTwice).to.be.true;
expect(addEventSpy.firstCall.args[0]).to.equal("peer:identify");
expect(addEventSpy.secondCall.args[0]).to.equal("peer:disconnect");
healthIndicator.stop();
expect(removeEventSpy.calledTwice).to.be.true;
expect(removeEventSpy.firstCall.args[0]).to.equal("peer:identify");
expect(removeEventSpy.secondCall.args[0]).to.equal("peer:disconnect");
});
});

View File

@ -1,5 +1,5 @@
import { TypedEventEmitter } from "@libp2p/interface";
import type { PeerId } from "@libp2p/interface";
import type { IdentifyResult, PeerId } from "@libp2p/interface";
import { FilterCodecs, LightPushCodec } from "@waku/core";
import {
HealthIndicatorEvents,
@ -11,7 +11,7 @@ import {
} from "@waku/interfaces";
import { Logger } from "@waku/utils";
type PeerEvent = (_event: CustomEvent<PeerId>) => void;
type PeerEvent<T> = (_event: CustomEvent<T>) => void;
const log = new Logger("health-indicator");
@ -53,7 +53,8 @@ export class HealthIndicator
super();
this.libp2p = params.libp2p;
this.onPeerChange = this.onPeerChange.bind(this);
this.onPeerIdentify = this.onPeerIdentify.bind(this);
this.onPeerDisconnected = this.onPeerDisconnected.bind(this);
}
/**
@ -64,12 +65,12 @@ export class HealthIndicator
log.info("start: adding listeners to libp2p");
this.libp2p.addEventListener(
"peer:connect",
this.onPeerChange as PeerEvent
"peer:identify",
this.onPeerIdentify as PeerEvent<IdentifyResult>
);
this.libp2p.addEventListener(
"peer:disconnect",
this.onPeerChange as PeerEvent
this.onPeerDisconnected as PeerEvent<PeerId>
);
}
@ -81,12 +82,12 @@ export class HealthIndicator
log.info("stop: removing listeners to libp2p");
this.libp2p.removeEventListener(
"peer:connect",
this.onPeerChange as PeerEvent
"peer:identify",
this.onPeerIdentify as PeerEvent<IdentifyResult>
);
this.libp2p.removeEventListener(
"peer:disconnect",
this.onPeerChange as PeerEvent
this.onPeerDisconnected as PeerEvent<PeerId>
);
}
@ -106,8 +107,26 @@ export class HealthIndicator
return this.value;
}
private async onPeerChange(event: CustomEvent<PeerId>): Promise<void> {
log.info(`onPeerChange: received libp2p event - ${event.type}`);
private async onPeerDisconnected(_event: CustomEvent<PeerId>): Promise<void> {
log.info(`onPeerDisconnected: received libp2p event`);
const connections = this.libp2p.getConnections();
// we handle only Unhealthy here and onPeerIdentify will cover other cases
if (connections.length > 0) {
log.info("onPeerDisconnected: has connections, ignoring");
}
this.value = HealthStatus.Unhealthy;
log.info(`onPeerDisconnected: node identified as ${this.value}`);
this.dispatchHealthEvent();
}
private async onPeerIdentify(
_event: CustomEvent<IdentifyResult>
): Promise<void> {
log.info(`onPeerIdentify: received libp2p event`);
const connections = this.libp2p.getConnections();
@ -127,33 +146,27 @@ export class HealthIndicator
p?.protocols.includes(LightPushCodec)
).length;
if (connections.length === 0) {
log.info(`onPeerChange: node identified as ${HealthStatus.Unhealthy}`);
if (filterPeers === 0 || lightPushPeers === 0) {
this.value = HealthStatus.Unhealthy;
} else if (filterPeers >= 2 && lightPushPeers >= 2) {
log.info(
`onPeerChange: node identified as ${HealthStatus.SufficientlyHealthy}`
);
this.value = HealthStatus.SufficientlyHealthy;
} else if (filterPeers === 1 && lightPushPeers === 1) {
log.info(
`onPeerChange: node identified as ${HealthStatus.MinimallyHealthy}`
);
this.value = HealthStatus.MinimallyHealthy;
} else {
log.error(
`onPeerChange: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}`
);
}
log.info(`onPeerChange: node identified as ${this.value}`);
this.dispatchHealthEvent();
}
private dispatchHealthEvent(): void {
this.dispatchEvent(
new CustomEvent<HealthStatus>(HealthStatusChangeEvents.StatusChange, {
detail: this.value
})
);
// this shouldn't happen as we expect service nodes implement Filter and LightPush at the same time
log.error(
`onPeerChange: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}`
);
}
}