fix: make health events emission consistent (#2570)

* fix health events emission

* fix bloom test flakiness due to bloom filter properties
This commit is contained in:
Sasha 2025-08-20 13:13:38 +02:00 committed by GitHub
parent 26de2d11c8
commit c8dfdb1ace
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 155 additions and 19 deletions

View File

@ -15,7 +15,6 @@ describe("HealthIndicator", () => {
libp2p = mockLibp2p();
events = mockEvents();
healthIndicator = new HealthIndicator({ libp2p, events });
healthIndicator.start();
});
afterEach(() => {
@ -28,6 +27,8 @@ describe("HealthIndicator", () => {
});
it("should transition to Unhealthy when no connections", async () => {
healthIndicator.start();
// Only track transition, starting as healthy
(healthIndicator as any).value = HealthStatus.SufficientlyHealthy;
@ -49,6 +50,8 @@ describe("HealthIndicator", () => {
});
it("should transition to MinimallyHealthy with one compatible peer", async () => {
healthIndicator.start();
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
@ -70,6 +73,8 @@ describe("HealthIndicator", () => {
});
it("should transition to SufficientlyHealthy with multiple compatible peers", async () => {
healthIndicator.start();
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
@ -110,11 +115,124 @@ describe("HealthIndicator", () => {
expect(removeEventSpy.firstCall.args[0]).to.equal("peer:identify");
expect(removeEventSpy.secondCall.args[0]).to.equal("peer:disconnect");
});
it("should reassess health immediately when peer disconnects", async () => {
const peer1 = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]);
const peer2 = mockPeer("2", [FilterCodecs.SUBSCRIBE, LightPushCodec]);
const connection1 = mockConnection("1");
const connection2 = mockConnection("2");
const connections = [connection1, connection2];
const getConnectionsStub = sinon
.stub(libp2p, "getConnections")
.returns(connections);
const peerStoreStub = sinon.stub(libp2p.peerStore, "get");
peerStoreStub.withArgs(connection1.remotePeer).resolves(peer1);
peerStoreStub.withArgs(connection2.remotePeer).resolves(peer2);
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
healthIndicator.start();
await statusChangePromise;
expect(healthIndicator.toValue()).to.equal(
HealthStatus.SufficientlyHealthy
);
const statusChangePromise2 = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
const remainingConnections = [connection1];
getConnectionsStub.returns(remainingConnections);
libp2p.dispatchEvent(new CustomEvent("peer:disconnect", { detail: "2" }));
const changedStatus = await statusChangePromise2;
expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy);
expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy);
});
it("should perform initial health assessment on start", async () => {
const peer = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]);
const connections = [mockConnection("1")];
sinon.stub(libp2p, "getConnections").returns(connections);
sinon.stub(libp2p.peerStore, "get").resolves(peer);
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
healthIndicator.start();
const changedStatus = await statusChangePromise;
expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy);
expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy);
});
it("should handle peer store errors gracefully", async function () {
this.timeout(5000);
// Start with a healthy state
(healthIndicator as any).value = HealthStatus.SufficientlyHealthy;
const connections = [mockConnection("1")];
sinon.stub(libp2p, "getConnections").returns(connections);
sinon.stub(libp2p.peerStore, "get").rejects(new Error("Peer not found"));
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
healthIndicator.start();
const changedStatus = await statusChangePromise;
expect(changedStatus).to.equal(HealthStatus.Unhealthy);
expect(healthIndicator.toValue()).to.equal(HealthStatus.Unhealthy);
});
it("should handle mixed protocol support correctly", async () => {
// Start with a healthy state to ensure status change
(healthIndicator as any).value = HealthStatus.SufficientlyHealthy;
const peer1 = mockPeer("1", [FilterCodecs.SUBSCRIBE]);
const peer2 = mockPeer("2", [LightPushCodec]);
const connection1 = mockConnection("1");
const connection2 = mockConnection("2");
const connections = [connection1, connection2];
sinon.stub(libp2p, "getConnections").returns(connections);
const peerStoreStub = sinon.stub(libp2p.peerStore, "get");
peerStoreStub.withArgs(connection1.remotePeer).resolves(peer1);
peerStoreStub.withArgs(connection2.remotePeer).resolves(peer2);
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
healthIndicator.start();
const changedStatus = await statusChangePromise;
expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy);
expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy);
});
});
function mockLibp2p(): Libp2p {
const peerStore = {
get: (id: any) => Promise.resolve(mockPeer(id.toString(), []))
get: () => Promise.reject(new Error("Peer not found"))
};
const events = new EventTarget();

View File

@ -43,6 +43,8 @@ export class HealthIndicator implements IHealthIndicator {
"peer:disconnect",
this.onPeerDisconnected as PeerEvent<PeerId>
);
void this.assessHealth();
}
public stop(): void {
@ -64,40 +66,42 @@ export class HealthIndicator implements IHealthIndicator {
private async onPeerDisconnected(_event: CustomEvent<PeerId>): Promise<void> {
log.info(`onPeerDisconnected: received libp2p event`);
const connections = this.libp2p.getConnections();
// we handle only Unhealthy here and onPeerIdentify will cover other cases
if (connections.length > 0) {
log.info("onPeerDisconnected: has connections, ignoring");
}
log.info(
`onPeerDisconnected: node identified as ${HealthStatus.Unhealthy}`
);
this.updateAndDispatchHealthEvent(HealthStatus.Unhealthy);
await this.assessHealth();
}
private async onPeerIdentify(
_event: CustomEvent<IdentifyResult>
): Promise<void> {
log.info(`onPeerIdentify: received libp2p event`);
await this.assessHealth();
}
private async assessHealth(): Promise<void> {
const connections = this.libp2p.getConnections();
if (connections.length === 0) {
log.info("assessHealth: no connections, setting to Unhealthy");
this.updateAndDispatchHealthEvent(HealthStatus.Unhealthy);
return;
}
const peers = await Promise.all(
connections.map(async (c) => {
try {
return await this.libp2p.peerStore.get(c.remotePeer);
} catch (e) {
log.warn(
`assessHealth: failed to get peer ${c.remotePeer}, skipping`
);
return null;
}
})
);
const filterPeers = peers.filter((p) =>
p?.protocols.includes(FilterCodecs.SUBSCRIBE)
).length;
const lightPushPeers = peers.filter((p) =>
p?.protocols.includes(LightPushCodec)
).length;
@ -111,12 +115,14 @@ export class HealthIndicator implements IHealthIndicator {
newValue = HealthStatus.MinimallyHealthy;
} else {
log.error(
`onPeerIdentify: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}`
`assessHealth: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}`
);
newValue = this.value;
}
log.info(`onPeerIdentify: node identified as ${newValue}`);
log.info(
`assessHealth: node identified as ${newValue} Filter:${filterPeers}; LightPush:${lightPushPeers}`
);
this.updateAndDispatchHealthEvent(newValue);
}

View File

@ -92,7 +92,13 @@ describe("BloomFilter", () => {
}
const actualErrorRate = falsePositives / testSize;
expect(actualErrorRate).to.be.lessThan(bloomFilter.errorRate * 1.5);
const expectedErrorRate = bloomFilter.errorRate;
const zScore = 2;
const stdError = Math.sqrt(
(expectedErrorRate * (1 - expectedErrorRate)) / testSize
);
const upperBound = expectedErrorRate + zScore * stdError;
expect(actualErrorRate).to.be.lessThan(upperBound);
});
it("should never report false negatives", () => {
@ -153,6 +159,12 @@ describe("BloomFilter with special patterns", () => {
}
const fpRate = falsePositives / testSize;
expect(fpRate).to.be.lessThan(bloomFilter.errorRate * 1.5);
const expectedErrorRate = bloomFilter.errorRate;
const zScore = 2;
const stdError = Math.sqrt(
(expectedErrorRate * (1 - expectedErrorRate)) / testSize
);
const upperBound = expectedErrorRate + zScore * stdError;
expect(fpRate).to.be.lessThan(upperBound);
});
});