From c8dfdb1ace8f0f8f668d8f2bb6e0eaed90041782 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Wed, 20 Aug 2025 13:13:38 +0200 Subject: [PATCH] fix: make health events emission consistent (#2570) * fix health events emission * fix bloom test flakiness due to bloom filter properties --- .../health_indicator/health_indicator.spec.ts | 122 +++++++++++++++++- .../src/health_indicator/health_indicator.ts | 36 +++--- packages/sds/src/bloom_filter/bloom.spec.ts | 16 ++- 3 files changed, 155 insertions(+), 19 deletions(-) diff --git a/packages/sdk/src/health_indicator/health_indicator.spec.ts b/packages/sdk/src/health_indicator/health_indicator.spec.ts index f291fce68b..fdda7ede55 100644 --- a/packages/sdk/src/health_indicator/health_indicator.spec.ts +++ b/packages/sdk/src/health_indicator/health_indicator.spec.ts @@ -15,7 +15,6 @@ describe("HealthIndicator", () => { libp2p = mockLibp2p(); events = mockEvents(); healthIndicator = new HealthIndicator({ libp2p, events }); - healthIndicator.start(); }); afterEach(() => { @@ -28,6 +27,8 @@ describe("HealthIndicator", () => { }); it("should transition to Unhealthy when no connections", async () => { + healthIndicator.start(); + // Only track transition, starting as healthy (healthIndicator as any).value = HealthStatus.SufficientlyHealthy; @@ -49,6 +50,8 @@ describe("HealthIndicator", () => { }); it("should transition to MinimallyHealthy with one compatible peer", async () => { + healthIndicator.start(); + const statusChangePromise = new Promise((resolve) => { events.addEventListener("waku:health", (e: CustomEvent) => resolve(e.detail) @@ -70,6 +73,8 @@ describe("HealthIndicator", () => { }); it("should transition to SufficientlyHealthy with multiple compatible peers", async () => { + healthIndicator.start(); + const statusChangePromise = new Promise((resolve) => { events.addEventListener("waku:health", (e: CustomEvent) => resolve(e.detail) @@ -110,11 +115,124 @@ describe("HealthIndicator", () => { expect(removeEventSpy.firstCall.args[0]).to.equal("peer:identify"); expect(removeEventSpy.secondCall.args[0]).to.equal("peer:disconnect"); }); + + it("should reassess health immediately when peer disconnects", async () => { + const peer1 = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]); + const peer2 = mockPeer("2", [FilterCodecs.SUBSCRIBE, LightPushCodec]); + const connection1 = mockConnection("1"); + const connection2 = mockConnection("2"); + const connections = [connection1, connection2]; + + const getConnectionsStub = sinon + .stub(libp2p, "getConnections") + .returns(connections); + const peerStoreStub = sinon.stub(libp2p.peerStore, "get"); + peerStoreStub.withArgs(connection1.remotePeer).resolves(peer1); + peerStoreStub.withArgs(connection2.remotePeer).resolves(peer2); + + const statusChangePromise = new Promise((resolve) => { + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) + ); + }); + + healthIndicator.start(); + + await statusChangePromise; + expect(healthIndicator.toValue()).to.equal( + HealthStatus.SufficientlyHealthy + ); + + const statusChangePromise2 = new Promise((resolve) => { + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) + ); + }); + + const remainingConnections = [connection1]; + getConnectionsStub.returns(remainingConnections); + + libp2p.dispatchEvent(new CustomEvent("peer:disconnect", { detail: "2" })); + + const changedStatus = await statusChangePromise2; + expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy); + expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy); + }); + + it("should perform initial health assessment on start", async () => { + const peer = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]); + const connections = [mockConnection("1")]; + sinon.stub(libp2p, "getConnections").returns(connections); + sinon.stub(libp2p.peerStore, "get").resolves(peer); + + const statusChangePromise = new Promise((resolve) => { + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) + ); + }); + + healthIndicator.start(); + + const changedStatus = await statusChangePromise; + expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy); + expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy); + }); + + it("should handle peer store errors gracefully", async function () { + this.timeout(5000); + + // Start with a healthy state + (healthIndicator as any).value = HealthStatus.SufficientlyHealthy; + + const connections = [mockConnection("1")]; + sinon.stub(libp2p, "getConnections").returns(connections); + sinon.stub(libp2p.peerStore, "get").rejects(new Error("Peer not found")); + + const statusChangePromise = new Promise((resolve) => { + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) + ); + }); + + healthIndicator.start(); + + const changedStatus = await statusChangePromise; + expect(changedStatus).to.equal(HealthStatus.Unhealthy); + expect(healthIndicator.toValue()).to.equal(HealthStatus.Unhealthy); + }); + + it("should handle mixed protocol support correctly", async () => { + // Start with a healthy state to ensure status change + (healthIndicator as any).value = HealthStatus.SufficientlyHealthy; + + const peer1 = mockPeer("1", [FilterCodecs.SUBSCRIBE]); + const peer2 = mockPeer("2", [LightPushCodec]); + const connection1 = mockConnection("1"); + const connection2 = mockConnection("2"); + const connections = [connection1, connection2]; + + sinon.stub(libp2p, "getConnections").returns(connections); + const peerStoreStub = sinon.stub(libp2p.peerStore, "get"); + peerStoreStub.withArgs(connection1.remotePeer).resolves(peer1); + peerStoreStub.withArgs(connection2.remotePeer).resolves(peer2); + + const statusChangePromise = new Promise((resolve) => { + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) + ); + }); + + healthIndicator.start(); + + const changedStatus = await statusChangePromise; + expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy); + expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy); + }); }); function mockLibp2p(): Libp2p { const peerStore = { - get: (id: any) => Promise.resolve(mockPeer(id.toString(), [])) + get: () => Promise.reject(new Error("Peer not found")) }; const events = new EventTarget(); diff --git a/packages/sdk/src/health_indicator/health_indicator.ts b/packages/sdk/src/health_indicator/health_indicator.ts index ea45cd489e..b3d71fb9d0 100644 --- a/packages/sdk/src/health_indicator/health_indicator.ts +++ b/packages/sdk/src/health_indicator/health_indicator.ts @@ -43,6 +43,8 @@ export class HealthIndicator implements IHealthIndicator { "peer:disconnect", this.onPeerDisconnected as PeerEvent ); + + void this.assessHealth(); } public stop(): void { @@ -64,40 +66,42 @@ export class HealthIndicator implements IHealthIndicator { private async onPeerDisconnected(_event: CustomEvent): Promise { log.info(`onPeerDisconnected: received libp2p event`); - - const connections = this.libp2p.getConnections(); - - // we handle only Unhealthy here and onPeerIdentify will cover other cases - if (connections.length > 0) { - log.info("onPeerDisconnected: has connections, ignoring"); - } - - log.info( - `onPeerDisconnected: node identified as ${HealthStatus.Unhealthy}` - ); - - this.updateAndDispatchHealthEvent(HealthStatus.Unhealthy); + await this.assessHealth(); } private async onPeerIdentify( _event: CustomEvent ): Promise { log.info(`onPeerIdentify: received libp2p event`); + await this.assessHealth(); + } + private async assessHealth(): Promise { const connections = this.libp2p.getConnections(); + if (connections.length === 0) { + log.info("assessHealth: no connections, setting to Unhealthy"); + this.updateAndDispatchHealthEvent(HealthStatus.Unhealthy); + return; + } + const peers = await Promise.all( connections.map(async (c) => { try { return await this.libp2p.peerStore.get(c.remotePeer); } catch (e) { + log.warn( + `assessHealth: failed to get peer ${c.remotePeer}, skipping` + ); return null; } }) ); + const filterPeers = peers.filter((p) => p?.protocols.includes(FilterCodecs.SUBSCRIBE) ).length; + const lightPushPeers = peers.filter((p) => p?.protocols.includes(LightPushCodec) ).length; @@ -111,12 +115,14 @@ export class HealthIndicator implements IHealthIndicator { newValue = HealthStatus.MinimallyHealthy; } else { log.error( - `onPeerIdentify: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}` + `assessHealth: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}` ); newValue = this.value; } - log.info(`onPeerIdentify: node identified as ${newValue}`); + log.info( + `assessHealth: node identified as ${newValue} Filter:${filterPeers}; LightPush:${lightPushPeers}` + ); this.updateAndDispatchHealthEvent(newValue); } diff --git a/packages/sds/src/bloom_filter/bloom.spec.ts b/packages/sds/src/bloom_filter/bloom.spec.ts index 22d6991302..2ac1cdfeb7 100644 --- a/packages/sds/src/bloom_filter/bloom.spec.ts +++ b/packages/sds/src/bloom_filter/bloom.spec.ts @@ -92,7 +92,13 @@ describe("BloomFilter", () => { } const actualErrorRate = falsePositives / testSize; - expect(actualErrorRate).to.be.lessThan(bloomFilter.errorRate * 1.5); + const expectedErrorRate = bloomFilter.errorRate; + const zScore = 2; + const stdError = Math.sqrt( + (expectedErrorRate * (1 - expectedErrorRate)) / testSize + ); + const upperBound = expectedErrorRate + zScore * stdError; + expect(actualErrorRate).to.be.lessThan(upperBound); }); it("should never report false negatives", () => { @@ -153,6 +159,12 @@ describe("BloomFilter with special patterns", () => { } const fpRate = falsePositives / testSize; - expect(fpRate).to.be.lessThan(bloomFilter.errorRate * 1.5); + const expectedErrorRate = bloomFilter.errorRate; + const zScore = 2; + const stdError = Math.sqrt( + (expectedErrorRate * (1 - expectedErrorRate)) / testSize + ); + const upperBound = expectedErrorRate + zScore * stdError; + expect(fpRate).to.be.lessThan(upperBound); }); });