Merge branch 'master' of github.com:waku-org/js-waku into weboko/send-api

This commit is contained in:
Sasha 2025-08-22 16:37:26 +02:00
commit 97f0fb279c
No known key found for this signature in database
9 changed files with 254 additions and 21 deletions

View File

@ -1,9 +1,17 @@
import type { AutoSharding, IProtoMessage } from "@waku/interfaces";
import { createRoutingInfo } from "@waku/utils";
import { bytesToHex } from "@waku/utils/bytes";
import { expect } from "chai";
import fc from "fast-check";
import { createDecoder, createEncoder, DecodedMessage } from "./version_0.js";
import { messageHash } from "../message_hash/index.js";
import {
createDecoder,
createEncoder,
DecodedMessage,
proto
} from "./version_0.js";
const testContentTopic = "/js-waku/1/tests/bytes";
@ -165,3 +173,54 @@ describe("Sets sharding configuration correctly", () => {
expect(staticshardingEncoder.pubsubTopic).to.be.eq("/waku/2/rs/0/3");
});
});
describe("DecodedMessage lazy hash initialization", () => {
it("should compute hash only when first accessed", () => {
const pubsubTopic = "/waku/2/default-waku/proto";
const protoMessage: proto.WakuMessage = {
payload: new Uint8Array([1, 2, 3]),
contentTopic: "/test/1/test-proto/proto",
timestamp: BigInt(1234567890000000),
ephemeral: false
};
const message = new DecodedMessage(pubsubTopic, protoMessage);
expect((message as any)._hash).to.be.undefined;
expect((message as any)._hashStr).to.be.undefined;
const hash = message.hash;
expect((message as any)._hash).to.not.be.undefined;
expect((message as any)._hashStr).to.be.undefined;
const hashStr = message.hashStr;
expect((message as any)._hashStr).to.not.be.undefined;
const expectedHash = messageHash(
pubsubTopic,
protoMessage as IProtoMessage
);
expect(hash).to.deep.equal(expectedHash);
expect(hashStr).to.equal(bytesToHex(expectedHash));
});
it("should return cached hash on subsequent access", () => {
const pubsubTopic = "/waku/2/default-waku/proto";
const protoMessage: proto.WakuMessage = {
payload: new Uint8Array([1, 2, 3]),
contentTopic: "/test/1/test-proto/proto",
timestamp: BigInt(1234567890000000),
ephemeral: false
};
const message = new DecodedMessage(pubsubTopic, protoMessage);
const hash1 = message.hash;
const hash2 = message.hash;
expect(hash1).to.equal(hash2);
const hashStr1 = message.hashStr;
const hashStr2 = message.hashStr;
expect(hashStr1).to.equal(hashStr2);
});
});

View File

@ -12,6 +12,9 @@ import type {
} from "@waku/interfaces";
import { proto_message as proto } from "@waku/proto";
import { Logger } from "@waku/utils";
import { bytesToHex } from "@waku/utils/bytes";
import { messageHash } from "../message_hash/index.js";
const log = new Logger("message:version-0");
const OneMillion = BigInt(1_000_000);
@ -20,6 +23,9 @@ export const Version = 0;
export { proto };
export class DecodedMessage implements IDecodedMessage {
private _hash: Uint8Array | undefined;
private _hashStr: string | undefined;
public constructor(
public pubsubTopic: string,
private proto: proto.WakuMessage
@ -37,6 +43,20 @@ export class DecodedMessage implements IDecodedMessage {
return this.proto.contentTopic;
}
public get hash(): Uint8Array {
if (this._hash === undefined) {
this._hash = messageHash(this.pubsubTopic, this.proto as IProtoMessage);
}
return this._hash;
}
public get hashStr(): string {
if (this._hashStr === undefined) {
this._hashStr = bytesToHex(this.hash);
}
return this._hashStr;
}
public get timestamp(): Date | undefined {
// In the case we receive a value that is bigger than JS's max number,
// we catch the error and return undefined.

View File

@ -20,6 +20,8 @@ export interface IDecodedMessage {
rateLimitProof: IRateLimitProof | undefined;
ephemeral: boolean | undefined;
meta: Uint8Array | undefined;
hash: Uint8Array;
hashStr: string;
}
export interface IRlnMessage extends IDecodedMessage {

View File

@ -14,6 +14,12 @@ export class TopicOnlyMessage implements ITopicOnlyMessage {
public get payload(): Uint8Array {
throw "Only content topic can be accessed on this message";
}
public get hash(): Uint8Array {
throw "Only content topic can be accessed on this message";
}
public get hashStr(): string {
throw "Only content topic can be accessed on this message";
}
public rateLimitProof: undefined;
public timestamp: undefined;
public meta: undefined;

View File

@ -48,6 +48,14 @@ export class RlnMessage<T extends IDecodedMessage> implements IRlnMessage {
return this.msg.payload;
}
public get hash(): Uint8Array {
return this.msg.hash;
}
public get hashStr(): string {
return this.msg.hashStr;
}
public get contentTopic(): string {
return this.msg.contentTopic;
}

View File

@ -15,7 +15,6 @@ describe("HealthIndicator", () => {
libp2p = mockLibp2p();
events = mockEvents();
healthIndicator = new HealthIndicator({ libp2p, events });
healthIndicator.start();
});
afterEach(() => {
@ -28,6 +27,8 @@ describe("HealthIndicator", () => {
});
it("should transition to Unhealthy when no connections", async () => {
healthIndicator.start();
// Only track transition, starting as healthy
(healthIndicator as any).value = HealthStatus.SufficientlyHealthy;
@ -49,6 +50,8 @@ describe("HealthIndicator", () => {
});
it("should transition to MinimallyHealthy with one compatible peer", async () => {
healthIndicator.start();
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
@ -70,6 +73,8 @@ describe("HealthIndicator", () => {
});
it("should transition to SufficientlyHealthy with multiple compatible peers", async () => {
healthIndicator.start();
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
@ -110,11 +115,124 @@ describe("HealthIndicator", () => {
expect(removeEventSpy.firstCall.args[0]).to.equal("peer:identify");
expect(removeEventSpy.secondCall.args[0]).to.equal("peer:disconnect");
});
it("should reassess health immediately when peer disconnects", async () => {
const peer1 = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]);
const peer2 = mockPeer("2", [FilterCodecs.SUBSCRIBE, LightPushCodec]);
const connection1 = mockConnection("1");
const connection2 = mockConnection("2");
const connections = [connection1, connection2];
const getConnectionsStub = sinon
.stub(libp2p, "getConnections")
.returns(connections);
const peerStoreStub = sinon.stub(libp2p.peerStore, "get");
peerStoreStub.withArgs(connection1.remotePeer).resolves(peer1);
peerStoreStub.withArgs(connection2.remotePeer).resolves(peer2);
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
healthIndicator.start();
await statusChangePromise;
expect(healthIndicator.toValue()).to.equal(
HealthStatus.SufficientlyHealthy
);
const statusChangePromise2 = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
const remainingConnections = [connection1];
getConnectionsStub.returns(remainingConnections);
libp2p.dispatchEvent(new CustomEvent("peer:disconnect", { detail: "2" }));
const changedStatus = await statusChangePromise2;
expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy);
expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy);
});
it("should perform initial health assessment on start", async () => {
const peer = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]);
const connections = [mockConnection("1")];
sinon.stub(libp2p, "getConnections").returns(connections);
sinon.stub(libp2p.peerStore, "get").resolves(peer);
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
healthIndicator.start();
const changedStatus = await statusChangePromise;
expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy);
expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy);
});
it("should handle peer store errors gracefully", async function () {
this.timeout(5000);
// Start with a healthy state
(healthIndicator as any).value = HealthStatus.SufficientlyHealthy;
const connections = [mockConnection("1")];
sinon.stub(libp2p, "getConnections").returns(connections);
sinon.stub(libp2p.peerStore, "get").rejects(new Error("Peer not found"));
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
healthIndicator.start();
const changedStatus = await statusChangePromise;
expect(changedStatus).to.equal(HealthStatus.Unhealthy);
expect(healthIndicator.toValue()).to.equal(HealthStatus.Unhealthy);
});
it("should handle mixed protocol support correctly", async () => {
// Start with a healthy state to ensure status change
(healthIndicator as any).value = HealthStatus.SufficientlyHealthy;
const peer1 = mockPeer("1", [FilterCodecs.SUBSCRIBE]);
const peer2 = mockPeer("2", [LightPushCodec]);
const connection1 = mockConnection("1");
const connection2 = mockConnection("2");
const connections = [connection1, connection2];
sinon.stub(libp2p, "getConnections").returns(connections);
const peerStoreStub = sinon.stub(libp2p.peerStore, "get");
peerStoreStub.withArgs(connection1.remotePeer).resolves(peer1);
peerStoreStub.withArgs(connection2.remotePeer).resolves(peer2);
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
events.addEventListener("waku:health", (e: CustomEvent<HealthStatus>) =>
resolve(e.detail)
);
});
healthIndicator.start();
const changedStatus = await statusChangePromise;
expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy);
expect(healthIndicator.toValue()).to.equal(HealthStatus.MinimallyHealthy);
});
});
function mockLibp2p(): Libp2p {
const peerStore = {
get: (id: any) => Promise.resolve(mockPeer(id.toString(), []))
get: () => Promise.reject(new Error("Peer not found"))
};
const events = new EventTarget();

View File

@ -43,6 +43,8 @@ export class HealthIndicator implements IHealthIndicator {
"peer:disconnect",
this.onPeerDisconnected as PeerEvent<PeerId>
);
void this.assessHealth();
}
public stop(): void {
@ -64,40 +66,42 @@ export class HealthIndicator implements IHealthIndicator {
private async onPeerDisconnected(_event: CustomEvent<PeerId>): Promise<void> {
log.info(`onPeerDisconnected: received libp2p event`);
const connections = this.libp2p.getConnections();
// we handle only Unhealthy here and onPeerIdentify will cover other cases
if (connections.length > 0) {
log.info("onPeerDisconnected: has connections, ignoring");
}
log.info(
`onPeerDisconnected: node identified as ${HealthStatus.Unhealthy}`
);
this.updateAndDispatchHealthEvent(HealthStatus.Unhealthy);
await this.assessHealth();
}
private async onPeerIdentify(
_event: CustomEvent<IdentifyResult>
): Promise<void> {
log.info(`onPeerIdentify: received libp2p event`);
await this.assessHealth();
}
private async assessHealth(): Promise<void> {
const connections = this.libp2p.getConnections();
if (connections.length === 0) {
log.info("assessHealth: no connections, setting to Unhealthy");
this.updateAndDispatchHealthEvent(HealthStatus.Unhealthy);
return;
}
const peers = await Promise.all(
connections.map(async (c) => {
try {
return await this.libp2p.peerStore.get(c.remotePeer);
} catch (e) {
log.warn(
`assessHealth: failed to get peer ${c.remotePeer}, skipping`
);
return null;
}
})
);
const filterPeers = peers.filter((p) =>
p?.protocols.includes(FilterCodecs.SUBSCRIBE)
).length;
const lightPushPeers = peers.filter((p) =>
p?.protocols.includes(LightPushCodec)
).length;
@ -111,12 +115,14 @@ export class HealthIndicator implements IHealthIndicator {
newValue = HealthStatus.MinimallyHealthy;
} else {
log.error(
`onPeerIdentify: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}`
`assessHealth: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}`
);
newValue = this.value;
}
log.info(`onPeerIdentify: node identified as ${newValue}`);
log.info(
`assessHealth: node identified as ${newValue} Filter:${filterPeers}; LightPush:${lightPushPeers}`
);
this.updateAndDispatchHealthEvent(newValue);
}

View File

@ -76,7 +76,9 @@ describe("Store", () => {
timestamp: new Date(),
rateLimitProof: undefined,
ephemeral: undefined,
meta: undefined
meta: undefined,
hash: new Uint8Array([1, 2, 3]),
hashStr: "010203"
};
it("should successfully query store with valid decoders and options", async () => {

View File

@ -92,7 +92,13 @@ describe("BloomFilter", () => {
}
const actualErrorRate = falsePositives / testSize;
expect(actualErrorRate).to.be.lessThan(bloomFilter.errorRate * 1.5);
const expectedErrorRate = bloomFilter.errorRate;
const zScore = 2;
const stdError = Math.sqrt(
(expectedErrorRate * (1 - expectedErrorRate)) / testSize
);
const upperBound = expectedErrorRate + zScore * stdError;
expect(actualErrorRate).to.be.lessThan(upperBound);
});
it("should never report false negatives", () => {
@ -153,6 +159,12 @@ describe("BloomFilter with special patterns", () => {
}
const fpRate = falsePositives / testSize;
expect(fpRate).to.be.lessThan(bloomFilter.errorRate * 1.5);
const expectedErrorRate = bloomFilter.errorRate;
const zScore = 2;
const stdError = Math.sqrt(
(expectedErrorRate * (1 - expectedErrorRate)) / testSize
);
const upperBound = expectedErrorRate + zScore * stdError;
expect(fpRate).to.be.lessThan(upperBound);
});
});