From c8dfdb1ace8f0f8f668d8f2bb6e0eaed90041782 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Wed, 20 Aug 2025 13:13:38 +0200 Subject: [PATCH 1/2] fix: make health events emission consistent (#2570) * fix health events emission * fix bloom test flakiness due to bloom filter properties --- .../health_indicator/health_indicator.spec.ts | 122 +++++++++++++++++- .../src/health_indicator/health_indicator.ts | 36 +++--- packages/sds/src/bloom_filter/bloom.spec.ts | 16 ++- 3 files changed, 155 insertions(+), 19 deletions(-) diff --git a/packages/sdk/src/health_indicator/health_indicator.spec.ts b/packages/sdk/src/health_indicator/health_indicator.spec.ts index f291fce68b..fdda7ede55 100644 --- a/packages/sdk/src/health_indicator/health_indicator.spec.ts +++ b/packages/sdk/src/health_indicator/health_indicator.spec.ts @@ -15,7 +15,6 @@ describe("HealthIndicator", () => { libp2p = mockLibp2p(); events = mockEvents(); healthIndicator = new HealthIndicator({ libp2p, events }); - healthIndicator.start(); }); afterEach(() => { @@ -28,6 +27,8 @@ describe("HealthIndicator", () => { }); it("should transition to Unhealthy when no connections", async () => { + healthIndicator.start(); + // Only track transition, starting as healthy (healthIndicator as any).value = HealthStatus.SufficientlyHealthy; @@ -49,6 +50,8 @@ describe("HealthIndicator", () => { }); it("should transition to MinimallyHealthy with one compatible peer", async () => { + healthIndicator.start(); + const statusChangePromise = new Promise((resolve) => { events.addEventListener("waku:health", (e: CustomEvent) => resolve(e.detail) @@ -70,6 +73,8 @@ describe("HealthIndicator", () => { }); it("should transition to SufficientlyHealthy with multiple compatible peers", async () => { + healthIndicator.start(); + const statusChangePromise = new Promise((resolve) => { events.addEventListener("waku:health", (e: CustomEvent) => resolve(e.detail) @@ -110,11 +115,124 @@ describe("HealthIndicator", () => { expect(removeEventSpy.firstCall.args[0]).to.equal("peer:identify"); expect(removeEventSpy.secondCall.args[0]).to.equal("peer:disconnect"); }); + + it("should reassess health immediately when peer disconnects", async () => { + const peer1 = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]); + const peer2 = mockPeer("2", [FilterCodecs.SUBSCRIBE, LightPushCodec]); + const connection1 = mockConnection("1"); + const connection2 = mockConnection("2"); + const connections = [connection1, connection2]; + + const getConnectionsStub = sinon + .stub(libp2p, "getConnections") + .returns(connections); + const peerStoreStub = sinon.stub(libp2p.peerStore, "get"); + peerStoreStub.withArgs(connection1.remotePeer).resolves(peer1); + peerStoreStub.withArgs(connection2.remotePeer).resolves(peer2); + + const statusChangePromise = new Promise((resolve) => { + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) + ); + }); + + healthIndicator.start(); + + await statusChangePromise; + expect(healthIndicator.toValue()).to.equal( + HealthStatus.SufficientlyHealthy + ); + + const statusChangePromise2 = new Promise((resolve) => { + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) + ); + }); + + const remainingConnections = [connection1]; + getConnectionsStub.returns(remainingConnections); + + libp2p.dispatchEvent(new CustomEvent("peer:disconnect", { detail: "2" })); + + const changedStatus = await statusChangePromise2; + expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy); + expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy); + }); + + it("should perform initial health assessment on start", async () => { + const peer = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]); + const connections = [mockConnection("1")]; + sinon.stub(libp2p, "getConnections").returns(connections); + sinon.stub(libp2p.peerStore, "get").resolves(peer); + + const statusChangePromise = new Promise((resolve) => { + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) + ); + }); + + healthIndicator.start(); + + const changedStatus = await statusChangePromise; + expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy); + expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy); + }); + + it("should handle peer store errors gracefully", async function () { + this.timeout(5000); + + // Start with a healthy state + (healthIndicator as any).value = HealthStatus.SufficientlyHealthy; + + const connections = [mockConnection("1")]; + sinon.stub(libp2p, "getConnections").returns(connections); + sinon.stub(libp2p.peerStore, "get").rejects(new Error("Peer not found")); + + const statusChangePromise = new Promise((resolve) => { + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) + ); + }); + + healthIndicator.start(); + + const changedStatus = await statusChangePromise; + expect(changedStatus).to.equal(HealthStatus.Unhealthy); + expect(healthIndicator.toValue()).to.equal(HealthStatus.Unhealthy); + }); + + it("should handle mixed protocol support correctly", async () => { + // Start with a healthy state to ensure status change + (healthIndicator as any).value = HealthStatus.SufficientlyHealthy; + + const peer1 = mockPeer("1", [FilterCodecs.SUBSCRIBE]); + const peer2 = mockPeer("2", [LightPushCodec]); + const connection1 = mockConnection("1"); + const connection2 = mockConnection("2"); + const connections = [connection1, connection2]; + + sinon.stub(libp2p, "getConnections").returns(connections); + const peerStoreStub = sinon.stub(libp2p.peerStore, "get"); + peerStoreStub.withArgs(connection1.remotePeer).resolves(peer1); + peerStoreStub.withArgs(connection2.remotePeer).resolves(peer2); + + const statusChangePromise = new Promise((resolve) => { + events.addEventListener("waku:health", (e: CustomEvent) => + resolve(e.detail) + ); + }); + + healthIndicator.start(); + + const changedStatus = await statusChangePromise; + expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy); + expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy); + }); }); function mockLibp2p(): Libp2p { const peerStore = { - get: (id: any) => Promise.resolve(mockPeer(id.toString(), [])) + get: () => Promise.reject(new Error("Peer not found")) }; const events = new EventTarget(); diff --git a/packages/sdk/src/health_indicator/health_indicator.ts b/packages/sdk/src/health_indicator/health_indicator.ts index ea45cd489e..b3d71fb9d0 100644 --- a/packages/sdk/src/health_indicator/health_indicator.ts +++ b/packages/sdk/src/health_indicator/health_indicator.ts @@ -43,6 +43,8 @@ export class HealthIndicator implements IHealthIndicator { "peer:disconnect", this.onPeerDisconnected as PeerEvent ); + + void this.assessHealth(); } public stop(): void { @@ -64,40 +66,42 @@ export class HealthIndicator implements IHealthIndicator { private async onPeerDisconnected(_event: CustomEvent): Promise { log.info(`onPeerDisconnected: received libp2p event`); - - const connections = this.libp2p.getConnections(); - - // we handle only Unhealthy here and onPeerIdentify will cover other cases - if (connections.length > 0) { - log.info("onPeerDisconnected: has connections, ignoring"); - } - - log.info( - `onPeerDisconnected: node identified as ${HealthStatus.Unhealthy}` - ); - - this.updateAndDispatchHealthEvent(HealthStatus.Unhealthy); + await this.assessHealth(); } private async onPeerIdentify( _event: CustomEvent ): Promise { log.info(`onPeerIdentify: received libp2p event`); + await this.assessHealth(); + } + private async assessHealth(): Promise { const connections = this.libp2p.getConnections(); + if (connections.length === 0) { + log.info("assessHealth: no connections, setting to Unhealthy"); + this.updateAndDispatchHealthEvent(HealthStatus.Unhealthy); + return; + } + const peers = await Promise.all( connections.map(async (c) => { try { return await this.libp2p.peerStore.get(c.remotePeer); } catch (e) { + log.warn( + `assessHealth: failed to get peer ${c.remotePeer}, skipping` + ); return null; } }) ); + const filterPeers = peers.filter((p) => p?.protocols.includes(FilterCodecs.SUBSCRIBE) ).length; + const lightPushPeers = peers.filter((p) => p?.protocols.includes(LightPushCodec) ).length; @@ -111,12 +115,14 @@ export class HealthIndicator implements IHealthIndicator { newValue = HealthStatus.MinimallyHealthy; } else { log.error( - `onPeerIdentify: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}` + `assessHealth: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}` ); newValue = this.value; } - log.info(`onPeerIdentify: node identified as ${newValue}`); + log.info( + `assessHealth: node identified as ${newValue} Filter:${filterPeers}; LightPush:${lightPushPeers}` + ); this.updateAndDispatchHealthEvent(newValue); } diff --git a/packages/sds/src/bloom_filter/bloom.spec.ts b/packages/sds/src/bloom_filter/bloom.spec.ts index 22d6991302..2ac1cdfeb7 100644 --- a/packages/sds/src/bloom_filter/bloom.spec.ts +++ b/packages/sds/src/bloom_filter/bloom.spec.ts @@ -92,7 +92,13 @@ describe("BloomFilter", () => { } const actualErrorRate = falsePositives / testSize; - expect(actualErrorRate).to.be.lessThan(bloomFilter.errorRate * 1.5); + const expectedErrorRate = bloomFilter.errorRate; + const zScore = 2; + const stdError = Math.sqrt( + (expectedErrorRate * (1 - expectedErrorRate)) / testSize + ); + const upperBound = expectedErrorRate + zScore * stdError; + expect(actualErrorRate).to.be.lessThan(upperBound); }); it("should never report false negatives", () => { @@ -153,6 +159,12 @@ describe("BloomFilter with special patterns", () => { } const fpRate = falsePositives / testSize; - expect(fpRate).to.be.lessThan(bloomFilter.errorRate * 1.5); + const expectedErrorRate = bloomFilter.errorRate; + const zScore = 2; + const stdError = Math.sqrt( + (expectedErrorRate * (1 - expectedErrorRate)) / testSize + ); + const upperBound = expectedErrorRate + zScore * stdError; + expect(fpRate).to.be.lessThan(upperBound); }); }); From 836d6b8793a5124747684f6ea76b6dd47c73048b Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Thu, 21 Aug 2025 11:58:43 +0200 Subject: [PATCH 2/2] feat: expose message hash from IDecodedMessage (#2578) * expose message hash from IDecodedMessage * up mock * optimize hashing and add tests --- .../core/src/lib/message/version_0.spec.ts | 61 ++++++++++++++++++- packages/core/src/lib/message/version_0.ts | 20 ++++++ packages/interfaces/src/message.ts | 2 + packages/relay/src/topic_only_message.ts | 6 ++ packages/rln/src/message.ts | 8 +++ packages/sdk/src/store/store.spec.ts | 4 +- 6 files changed, 99 insertions(+), 2 deletions(-) diff --git a/packages/core/src/lib/message/version_0.spec.ts b/packages/core/src/lib/message/version_0.spec.ts index d95963905c..943769c5d9 100644 --- a/packages/core/src/lib/message/version_0.spec.ts +++ b/packages/core/src/lib/message/version_0.spec.ts @@ -1,9 +1,17 @@ import type { AutoSharding, IProtoMessage } from "@waku/interfaces"; import { createRoutingInfo } from "@waku/utils"; +import { bytesToHex } from "@waku/utils/bytes"; import { expect } from "chai"; import fc from "fast-check"; -import { createDecoder, createEncoder, DecodedMessage } from "./version_0.js"; +import { messageHash } from "../message_hash/index.js"; + +import { + createDecoder, + createEncoder, + DecodedMessage, + proto +} from "./version_0.js"; const testContentTopic = "/js-waku/1/tests/bytes"; @@ -165,3 +173,54 @@ describe("Sets sharding configuration correctly", () => { expect(staticshardingEncoder.pubsubTopic).to.be.eq("/waku/2/rs/0/3"); }); }); + +describe("DecodedMessage lazy hash initialization", () => { + it("should compute hash only when first accessed", () => { + const pubsubTopic = "/waku/2/default-waku/proto"; + const protoMessage: proto.WakuMessage = { + payload: new Uint8Array([1, 2, 3]), + contentTopic: "/test/1/test-proto/proto", + timestamp: BigInt(1234567890000000), + ephemeral: false + }; + + const message = new DecodedMessage(pubsubTopic, protoMessage); + + expect((message as any)._hash).to.be.undefined; + expect((message as any)._hashStr).to.be.undefined; + + const hash = message.hash; + expect((message as any)._hash).to.not.be.undefined; + expect((message as any)._hashStr).to.be.undefined; + + const hashStr = message.hashStr; + expect((message as any)._hashStr).to.not.be.undefined; + + const expectedHash = messageHash( + pubsubTopic, + protoMessage as IProtoMessage + ); + expect(hash).to.deep.equal(expectedHash); + expect(hashStr).to.equal(bytesToHex(expectedHash)); + }); + + it("should return cached hash on subsequent access", () => { + const pubsubTopic = "/waku/2/default-waku/proto"; + const protoMessage: proto.WakuMessage = { + payload: new Uint8Array([1, 2, 3]), + contentTopic: "/test/1/test-proto/proto", + timestamp: BigInt(1234567890000000), + ephemeral: false + }; + + const message = new DecodedMessage(pubsubTopic, protoMessage); + + const hash1 = message.hash; + const hash2 = message.hash; + expect(hash1).to.equal(hash2); + + const hashStr1 = message.hashStr; + const hashStr2 = message.hashStr; + expect(hashStr1).to.equal(hashStr2); + }); +}); diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index a8706817d9..c127120e74 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -12,6 +12,9 @@ import type { } from "@waku/interfaces"; import { proto_message as proto } from "@waku/proto"; import { Logger } from "@waku/utils"; +import { bytesToHex } from "@waku/utils/bytes"; + +import { messageHash } from "../message_hash/index.js"; const log = new Logger("message:version-0"); const OneMillion = BigInt(1_000_000); @@ -20,6 +23,9 @@ export const Version = 0; export { proto }; export class DecodedMessage implements IDecodedMessage { + private _hash: Uint8Array | undefined; + private _hashStr: string | undefined; + public constructor( public pubsubTopic: string, private proto: proto.WakuMessage @@ -37,6 +43,20 @@ export class DecodedMessage implements IDecodedMessage { return this.proto.contentTopic; } + public get hash(): Uint8Array { + if (this._hash === undefined) { + this._hash = messageHash(this.pubsubTopic, this.proto as IProtoMessage); + } + return this._hash; + } + + public get hashStr(): string { + if (this._hashStr === undefined) { + this._hashStr = bytesToHex(this.hash); + } + return this._hashStr; + } + public get timestamp(): Date | undefined { // In the case we receive a value that is bigger than JS's max number, // we catch the error and return undefined. diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index 1b34700010..caecb73aec 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -20,6 +20,8 @@ export interface IDecodedMessage { rateLimitProof: IRateLimitProof | undefined; ephemeral: boolean | undefined; meta: Uint8Array | undefined; + hash: Uint8Array; + hashStr: string; } export interface IRlnMessage extends IDecodedMessage { diff --git a/packages/relay/src/topic_only_message.ts b/packages/relay/src/topic_only_message.ts index 0929361166..a1dfab2fd1 100644 --- a/packages/relay/src/topic_only_message.ts +++ b/packages/relay/src/topic_only_message.ts @@ -14,6 +14,12 @@ export class TopicOnlyMessage implements ITopicOnlyMessage { public get payload(): Uint8Array { throw "Only content topic can be accessed on this message"; } + public get hash(): Uint8Array { + throw "Only content topic can be accessed on this message"; + } + public get hashStr(): string { + throw "Only content topic can be accessed on this message"; + } public rateLimitProof: undefined; public timestamp: undefined; public meta: undefined; diff --git a/packages/rln/src/message.ts b/packages/rln/src/message.ts index d3474bb052..1cca8a4ed2 100644 --- a/packages/rln/src/message.ts +++ b/packages/rln/src/message.ts @@ -48,6 +48,14 @@ export class RlnMessage implements IRlnMessage { return this.msg.payload; } + public get hash(): Uint8Array { + return this.msg.hash; + } + + public get hashStr(): string { + return this.msg.hashStr; + } + public get contentTopic(): string { return this.msg.contentTopic; } diff --git a/packages/sdk/src/store/store.spec.ts b/packages/sdk/src/store/store.spec.ts index 025f2df425..4931aafa7c 100644 --- a/packages/sdk/src/store/store.spec.ts +++ b/packages/sdk/src/store/store.spec.ts @@ -76,7 +76,9 @@ describe("Store", () => { timestamp: new Date(), rateLimitProof: undefined, ephemeral: undefined, - meta: undefined + meta: undefined, + hash: new Uint8Array([1, 2, 3]), + hashStr: "010203" }; it("should successfully query store with valid decoders and options", async () => {