mirror of
https://github.com/waku-org/js-waku.git
synced 2025-02-11 11:57:04 +00:00
feat: add HealthIndicator with simplified logic and testing (#2251)
* implement HealthIndicator * up libp2p interface version * up lock * remove unused tests * expose HealthIndicator from Waku * update test, add start and stop * fix error handling
This commit is contained in:
parent
fc93fae873
commit
3136f3a704
40
package-lock.json
generated
40
package-lock.json
generated
@ -5768,9 +5768,9 @@
|
||||
}
|
||||
},
|
||||
"node_modules/@libp2p/interface": {
|
||||
"version": "2.4.0",
|
||||
"resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.4.0.tgz",
|
||||
"integrity": "sha512-PfzxOaz7dU4sdnUNByGLoEk9iqhD0IS+LQMQB12CXh6VyYLA7J8oaoHk3yRBZze3Y4FPa5DHMm5Oi9O/IhreaQ==",
|
||||
"version": "2.4.1",
|
||||
"resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.4.1.tgz",
|
||||
"integrity": "sha512-G80+rWn0d1+txM7TXMs+eK79qXdtS3yfepx2uGA5Kc7WSzXicwMN1Qw6ZJAB58SExdfQ0oWlS0E/v7kr8B025g==",
|
||||
"license": "Apache-2.0 OR MIT",
|
||||
"dependencies": {
|
||||
"@multiformats/multiaddr": "^12.3.3",
|
||||
@ -41107,7 +41107,7 @@
|
||||
"uint8arrays": "^5.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@libp2p/interface": "2.0.1",
|
||||
"@libp2p/interface": "^2.1.3",
|
||||
"@libp2p/peer-id": "5.0.1",
|
||||
"@multiformats/multiaddr": "^12.3.0",
|
||||
"@rollup/plugin-commonjs": "^25.0.7",
|
||||
@ -41129,21 +41129,6 @@
|
||||
"node": ">=20"
|
||||
}
|
||||
},
|
||||
"packages/discovery/node_modules/@libp2p/interface": {
|
||||
"version": "2.0.1",
|
||||
"resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.0.1.tgz",
|
||||
"integrity": "sha512-zDAgu+ZNiYZxVsmcvCeNCLMnGORwLMMI8w0k2YcHwolATsv2q7QG3KpakmyKjH4m7C0hT86lGgf1sgGobPssYA==",
|
||||
"dev": true,
|
||||
"license": "Apache-2.0 OR MIT",
|
||||
"dependencies": {
|
||||
"@multiformats/multiaddr": "^12.2.3",
|
||||
"it-pushable": "^3.2.3",
|
||||
"it-stream-types": "^2.0.1",
|
||||
"multiformats": "^13.1.0",
|
||||
"progress-events": "^1.0.0",
|
||||
"uint8arraylist": "^2.4.8"
|
||||
}
|
||||
},
|
||||
"packages/discovery/node_modules/@libp2p/peer-id": {
|
||||
"version": "5.0.1",
|
||||
"resolved": "https://registry.npmjs.org/@libp2p/peer-id/-/peer-id-5.0.1.tgz",
|
||||
@ -41381,7 +41366,7 @@
|
||||
"libp2p": "2.1.8"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@libp2p/interface": "2.1.3",
|
||||
"@libp2p/interface": "^2.1.3",
|
||||
"@rollup/plugin-commonjs": "^25.0.7",
|
||||
"@rollup/plugin-json": "^6.0.0",
|
||||
"@rollup/plugin-node-resolve": "^15.2.3",
|
||||
@ -41401,21 +41386,6 @@
|
||||
"node": ">=20"
|
||||
}
|
||||
},
|
||||
"packages/sdk/node_modules/@libp2p/interface": {
|
||||
"version": "2.1.3",
|
||||
"resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-2.1.3.tgz",
|
||||
"integrity": "sha512-t1i2LWcnTGJEr7fDMslA8wYwBzJP81QKBlrBHoGhXxqqpRQa9035roCh/Akuw5RUgjKE47/ezjuzo90aWsJB8g==",
|
||||
"dev": true,
|
||||
"license": "Apache-2.0 OR MIT",
|
||||
"dependencies": {
|
||||
"@multiformats/multiaddr": "^12.2.3",
|
||||
"it-pushable": "^3.2.3",
|
||||
"it-stream-types": "^2.0.1",
|
||||
"multiformats": "^13.1.0",
|
||||
"progress-events": "^1.0.0",
|
||||
"uint8arraylist": "^2.4.8"
|
||||
}
|
||||
},
|
||||
"packages/sdk/node_modules/@sinonjs/fake-timers": {
|
||||
"version": "13.0.5",
|
||||
"resolved": "https://registry.npmjs.org/@sinonjs/fake-timers/-/fake-timers-13.0.5.tgz",
|
||||
|
@ -17,8 +17,6 @@ export { StoreCore, StoreCodec } from "./lib/store/index.js";
|
||||
|
||||
export { ConnectionManager } from "./lib/connection_manager/index.js";
|
||||
|
||||
export { getHealthManager } from "./lib/health_manager.js";
|
||||
|
||||
export { StreamManager } from "./lib/stream_manager/index.js";
|
||||
|
||||
export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js";
|
||||
|
@ -1,90 +0,0 @@
|
||||
import {
|
||||
HealthStatus,
|
||||
type IHealthManager,
|
||||
NodeHealth,
|
||||
type ProtocolHealth,
|
||||
Protocols
|
||||
} from "@waku/interfaces";
|
||||
|
||||
class HealthManager implements IHealthManager {
|
||||
public static instance: HealthManager;
|
||||
private readonly health: NodeHealth;
|
||||
|
||||
private constructor() {
|
||||
this.health = {
|
||||
overallStatus: HealthStatus.Unhealthy,
|
||||
protocolStatuses: new Map()
|
||||
};
|
||||
}
|
||||
|
||||
public static getInstance(): HealthManager {
|
||||
if (!HealthManager.instance) {
|
||||
HealthManager.instance = new HealthManager();
|
||||
}
|
||||
return HealthManager.instance;
|
||||
}
|
||||
|
||||
public getHealthStatus(): HealthStatus {
|
||||
return this.health.overallStatus;
|
||||
}
|
||||
|
||||
public getProtocolStatus(protocol: Protocols): ProtocolHealth | undefined {
|
||||
return this.health.protocolStatuses.get(protocol);
|
||||
}
|
||||
|
||||
public updateProtocolHealth(
|
||||
multicodec: string,
|
||||
connectedPeers: number
|
||||
): void {
|
||||
const protocol = this.getNameFromMulticodec(multicodec);
|
||||
|
||||
let status: HealthStatus = HealthStatus.Unhealthy;
|
||||
if (connectedPeers == 1) {
|
||||
status = HealthStatus.MinimallyHealthy;
|
||||
} else if (connectedPeers >= 2) {
|
||||
status = HealthStatus.SufficientlyHealthy;
|
||||
}
|
||||
|
||||
this.health.protocolStatuses.set(protocol, {
|
||||
name: protocol,
|
||||
status: status,
|
||||
lastUpdate: new Date()
|
||||
});
|
||||
|
||||
this.updateOverallHealth();
|
||||
}
|
||||
|
||||
private getNameFromMulticodec(multicodec: string): Protocols {
|
||||
let name: Protocols;
|
||||
if (multicodec.includes("filter")) {
|
||||
name = Protocols.Filter;
|
||||
} else if (multicodec.includes("lightpush")) {
|
||||
name = Protocols.LightPush;
|
||||
} else if (multicodec.includes("store")) {
|
||||
name = Protocols.Store;
|
||||
} else {
|
||||
throw new Error(`Unknown protocol: ${multicodec}`);
|
||||
}
|
||||
return name;
|
||||
}
|
||||
|
||||
private updateOverallHealth(): void {
|
||||
const relevantProtocols = [Protocols.LightPush, Protocols.Filter];
|
||||
const statuses = relevantProtocols.map(
|
||||
(p) => this.getProtocolStatus(p)?.status
|
||||
);
|
||||
|
||||
if (statuses.some((status) => status === HealthStatus.Unhealthy)) {
|
||||
this.health.overallStatus = HealthStatus.Unhealthy;
|
||||
} else if (
|
||||
statuses.some((status) => status === HealthStatus.MinimallyHealthy)
|
||||
) {
|
||||
this.health.overallStatus = HealthStatus.MinimallyHealthy;
|
||||
} else {
|
||||
this.health.overallStatus = HealthStatus.SufficientlyHealthy;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
export const getHealthManager = (): HealthManager =>
|
||||
HealthManager.getInstance();
|
@ -62,7 +62,7 @@
|
||||
"uint8arrays": "^5.0.1"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@libp2p/interface": "2.0.1",
|
||||
"@libp2p/interface": "^2.1.3",
|
||||
"@libp2p/peer-id": "5.0.1",
|
||||
"@multiformats/multiaddr": "^12.3.0",
|
||||
"@rollup/plugin-commonjs": "^25.0.7",
|
||||
|
24
packages/interfaces/src/health_indicator.ts
Normal file
24
packages/interfaces/src/health_indicator.ts
Normal file
@ -0,0 +1,24 @@
|
||||
import { TypedEventEmitter } from "@libp2p/interface";
|
||||
|
||||
import { Libp2p } from "./libp2p.js";
|
||||
|
||||
export enum HealthStatusChangeEvents {
|
||||
StatusChange = "health:change"
|
||||
}
|
||||
|
||||
export enum HealthStatus {
|
||||
Unhealthy = "Unhealthy",
|
||||
MinimallyHealthy = "MinimallyHealthy",
|
||||
SufficientlyHealthy = "SufficientlyHealthy"
|
||||
}
|
||||
|
||||
export type HealthIndicatorEvents = {
|
||||
[HealthStatusChangeEvents.StatusChange]: CustomEvent<HealthStatus>;
|
||||
};
|
||||
|
||||
export interface IHealthIndicator
|
||||
extends TypedEventEmitter<HealthIndicatorEvents> {}
|
||||
|
||||
export type HealthIndicatorParams = {
|
||||
libp2p: Libp2p;
|
||||
};
|
@ -1,26 +0,0 @@
|
||||
import { Protocols } from "./protocols.js";
|
||||
|
||||
export enum HealthStatus {
|
||||
Unhealthy = "Unhealthy",
|
||||
MinimallyHealthy = "MinimallyHealthy",
|
||||
SufficientlyHealthy = "SufficientlyHealthy"
|
||||
}
|
||||
|
||||
export interface IHealthManager {
|
||||
getHealthStatus: () => HealthStatus;
|
||||
getProtocolStatus: (protocol: Protocols) => ProtocolHealth | undefined;
|
||||
updateProtocolHealth: (multicodec: string, connectedPeers: number) => void;
|
||||
}
|
||||
|
||||
export type NodeHealth = {
|
||||
overallStatus: HealthStatus;
|
||||
protocolStatuses: ProtocolsHealthStatus;
|
||||
};
|
||||
|
||||
export type ProtocolHealth = {
|
||||
name: Protocols;
|
||||
status: HealthStatus;
|
||||
lastUpdate: Date;
|
||||
};
|
||||
|
||||
export type ProtocolsHealthStatus = Map<Protocols, ProtocolHealth>;
|
@ -16,5 +16,5 @@ export * from "./dns_discovery.js";
|
||||
export * from "./metadata.js";
|
||||
export * from "./constants.js";
|
||||
export * from "./local_storage.js";
|
||||
export * from "./health_manager.js";
|
||||
export * from "./sharding.js";
|
||||
export * from "./health_indicator.js";
|
||||
|
@ -1,12 +1,12 @@
|
||||
import type { Peer, PeerId, Stream } from "@libp2p/interface";
|
||||
import type { MultiaddrInput } from "@multiformats/multiaddr";
|
||||
|
||||
import { IConnectionManager } from "./connection_manager.js";
|
||||
import type { IConnectionManager } from "./connection_manager.js";
|
||||
import type { IFilter } from "./filter.js";
|
||||
import { IHealthManager } from "./health_manager.js";
|
||||
import type { IHealthIndicator } from "./health_indicator.js";
|
||||
import type { Libp2p } from "./libp2p.js";
|
||||
import type { ILightPush } from "./light_push.js";
|
||||
import { Protocols } from "./protocols.js";
|
||||
import type { Protocols } from "./protocols.js";
|
||||
import type { IRelay } from "./relay.js";
|
||||
import type { IStore } from "./store.js";
|
||||
|
||||
@ -16,9 +16,8 @@ export interface IWaku {
|
||||
store?: IStore;
|
||||
filter?: IFilter;
|
||||
lightPush?: ILightPush;
|
||||
|
||||
health: IHealthManager;
|
||||
connectionManager: IConnectionManager;
|
||||
health: IHealthIndicator;
|
||||
|
||||
/**
|
||||
* Returns a unique identifier for a node on the network.
|
||||
|
@ -76,7 +76,7 @@
|
||||
"libp2p": "2.1.8"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@libp2p/interface": "2.1.3",
|
||||
"@libp2p/interface": "^2.1.3",
|
||||
"@types/chai": "^4.3.11",
|
||||
"@rollup/plugin-commonjs": "^25.0.7",
|
||||
"@rollup/plugin-json": "^6.0.0",
|
||||
|
145
packages/sdk/src/health_indicator/health_indicator.spec.ts
Normal file
145
packages/sdk/src/health_indicator/health_indicator.spec.ts
Normal file
@ -0,0 +1,145 @@
|
||||
import { Connection, Peer } from "@libp2p/interface";
|
||||
import { FilterCodecs, LightPushCodec } from "@waku/core";
|
||||
import {
|
||||
HealthStatus,
|
||||
HealthStatusChangeEvents,
|
||||
Libp2p
|
||||
} from "@waku/interfaces";
|
||||
import { expect } from "chai";
|
||||
import sinon from "sinon";
|
||||
|
||||
import { HealthIndicator } from "./health_indicator.js";
|
||||
|
||||
describe("HealthIndicator", () => {
|
||||
let libp2p: Libp2p;
|
||||
let healthIndicator: HealthIndicator;
|
||||
|
||||
beforeEach(() => {
|
||||
libp2p = mockLibp2p();
|
||||
healthIndicator = new HealthIndicator({ libp2p });
|
||||
healthIndicator.start();
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
healthIndicator.stop();
|
||||
sinon.restore();
|
||||
});
|
||||
|
||||
it("should initialize with Unhealthy status", () => {
|
||||
expect(healthIndicator.toString()).to.equal(HealthStatus.Unhealthy);
|
||||
});
|
||||
|
||||
it("should transition to Unhealthy when no connections", async () => {
|
||||
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
|
||||
healthIndicator.addEventListener(
|
||||
HealthStatusChangeEvents.StatusChange,
|
||||
(e: CustomEvent<HealthStatus>) => resolve(e.detail)
|
||||
);
|
||||
});
|
||||
|
||||
const connections: Connection[] = [];
|
||||
sinon.stub(libp2p, "getConnections").returns(connections);
|
||||
|
||||
libp2p.dispatchEvent(new CustomEvent("peer:disconnect", { detail: "1" }));
|
||||
|
||||
const changedStatus = await statusChangePromise;
|
||||
expect(changedStatus).to.equal(HealthStatus.Unhealthy);
|
||||
expect(healthIndicator.toString()).to.equal(HealthStatus.Unhealthy);
|
||||
});
|
||||
|
||||
it("should transition to MinimallyHealthy with one compatible peer", async () => {
|
||||
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
|
||||
healthIndicator.addEventListener(
|
||||
HealthStatusChangeEvents.StatusChange,
|
||||
(e: CustomEvent<HealthStatus>) => resolve(e.detail)
|
||||
);
|
||||
});
|
||||
|
||||
const peer = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]);
|
||||
const connections = [mockConnection("1")];
|
||||
sinon.stub(libp2p, "getConnections").returns(connections);
|
||||
sinon.stub(libp2p.peerStore, "get").resolves(peer);
|
||||
|
||||
libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "1" }));
|
||||
|
||||
const changedStatus = await statusChangePromise;
|
||||
expect(changedStatus).to.equal(HealthStatus.MinimallyHealthy);
|
||||
expect(healthIndicator.toString()).to.equal(HealthStatus.MinimallyHealthy);
|
||||
});
|
||||
|
||||
it("should transition to SufficientlyHealthy with multiple compatible peers", async () => {
|
||||
const statusChangePromise = new Promise<HealthStatus>((resolve) => {
|
||||
healthIndicator.addEventListener(
|
||||
HealthStatusChangeEvents.StatusChange,
|
||||
(e: CustomEvent<HealthStatus>) => resolve(e.detail)
|
||||
);
|
||||
});
|
||||
|
||||
const peer1 = mockPeer("1", [FilterCodecs.SUBSCRIBE, LightPushCodec]);
|
||||
const peer2 = mockPeer("2", [FilterCodecs.SUBSCRIBE, LightPushCodec]);
|
||||
const connections = [mockConnection("1"), mockConnection("2")];
|
||||
|
||||
sinon.stub(libp2p, "getConnections").returns(connections);
|
||||
const peerStoreStub = sinon.stub(libp2p.peerStore, "get");
|
||||
peerStoreStub.withArgs(connections[0].remotePeer).resolves(peer1);
|
||||
peerStoreStub.withArgs(connections[1].remotePeer).resolves(peer2);
|
||||
|
||||
libp2p.dispatchEvent(new CustomEvent("peer:connect", { detail: "2" }));
|
||||
|
||||
const changedStatus = await statusChangePromise;
|
||||
expect(changedStatus).to.equal(HealthStatus.SufficientlyHealthy);
|
||||
expect(healthIndicator.toString()).to.equal(
|
||||
HealthStatus.SufficientlyHealthy
|
||||
);
|
||||
});
|
||||
|
||||
it("should properly start and stop event listening", () => {
|
||||
const addEventSpy = sinon.spy(libp2p, "addEventListener");
|
||||
const removeEventSpy = sinon.spy(libp2p, "removeEventListener");
|
||||
|
||||
healthIndicator.start();
|
||||
expect(addEventSpy.calledTwice).to.be.true;
|
||||
|
||||
healthIndicator.stop();
|
||||
expect(removeEventSpy.calledTwice).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
function mockLibp2p(): Libp2p {
|
||||
const peerStore = {
|
||||
get: (id: any) => Promise.resolve(mockPeer(id.toString(), []))
|
||||
};
|
||||
|
||||
const events = new EventTarget();
|
||||
|
||||
return {
|
||||
peerStore,
|
||||
addEventListener: (event: string, handler: EventListener) =>
|
||||
events.addEventListener(event, handler),
|
||||
removeEventListener: (event: string, handler: EventListener) =>
|
||||
events.removeEventListener(event, handler),
|
||||
dispatchEvent: (event: Event) => events.dispatchEvent(event),
|
||||
getConnections: () => [],
|
||||
components: {
|
||||
events,
|
||||
peerStore
|
||||
}
|
||||
} as unknown as Libp2p;
|
||||
}
|
||||
|
||||
function mockPeer(id: string, protocols: string[]): Peer {
|
||||
return {
|
||||
id,
|
||||
protocols
|
||||
} as unknown as Peer;
|
||||
}
|
||||
|
||||
function mockConnection(id: string): Connection {
|
||||
return {
|
||||
remotePeer: {
|
||||
toString: () => id,
|
||||
equals: (other: any) => other.toString() === id
|
||||
},
|
||||
status: "open"
|
||||
} as unknown as Connection;
|
||||
}
|
159
packages/sdk/src/health_indicator/health_indicator.ts
Normal file
159
packages/sdk/src/health_indicator/health_indicator.ts
Normal file
@ -0,0 +1,159 @@
|
||||
import { TypedEventEmitter } from "@libp2p/interface";
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import { FilterCodecs, LightPushCodec } from "@waku/core";
|
||||
import {
|
||||
HealthIndicatorEvents,
|
||||
HealthIndicatorParams,
|
||||
HealthStatus,
|
||||
HealthStatusChangeEvents,
|
||||
IHealthIndicator,
|
||||
Libp2p
|
||||
} from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
type PeerEvent = (_event: CustomEvent<PeerId>) => void;
|
||||
|
||||
const log = new Logger("health-indicator");
|
||||
|
||||
/**
|
||||
* HealthIndicator monitors the health status of a Waku node by tracking peer connections
|
||||
* and their supported protocols.
|
||||
*
|
||||
* The health status can be one of three states:
|
||||
* - Unhealthy: No peer connections
|
||||
* - MinimallyHealthy: At least 1 peer supporting both Filter and LightPush protocols
|
||||
* - SufficientlyHealthy: At least 2 peers supporting both Filter and LightPush protocols
|
||||
*
|
||||
* @example
|
||||
* // Create and start a health indicator
|
||||
* const healthIndicator = new HealthIndicator({ libp2p: node.libp2p });
|
||||
* healthIndicator.start();
|
||||
*
|
||||
* // Listen for health status changes
|
||||
* healthIndicator.addEventListener(HealthStatusChangeEvents.StatusChange, (event) => {
|
||||
* console.log(`Health status changed to: ${event.detail}`);
|
||||
* });
|
||||
*
|
||||
* // Get current health status
|
||||
* console.log(`Current health: ${healthIndicator.toString()}`);
|
||||
*
|
||||
* // Clean up when done
|
||||
* healthIndicator.stop();
|
||||
*
|
||||
* @implements {IHealthIndicator}
|
||||
*/
|
||||
export class HealthIndicator
|
||||
extends TypedEventEmitter<HealthIndicatorEvents>
|
||||
implements IHealthIndicator
|
||||
{
|
||||
private readonly libp2p: Libp2p;
|
||||
private value: HealthStatus = HealthStatus.Unhealthy;
|
||||
|
||||
public constructor(params: HealthIndicatorParams) {
|
||||
super();
|
||||
this.libp2p = params.libp2p;
|
||||
|
||||
this.onPeerChange = this.onPeerChange.bind(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts monitoring the health status by adding event listeners to libp2p events.
|
||||
* Listens to peer connect and disconnect events to determine the node's health status.
|
||||
*/
|
||||
public start(): void {
|
||||
log.info("start: adding listeners to libp2p");
|
||||
|
||||
this.libp2p.addEventListener(
|
||||
"peer:connect",
|
||||
this.onPeerChange as PeerEvent
|
||||
);
|
||||
this.libp2p.addEventListener(
|
||||
"peer:disconnect",
|
||||
this.onPeerChange as PeerEvent
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Stops monitoring the health status by removing event listeners from libp2p events.
|
||||
* Cleans up the peer connect and disconnect event listeners.
|
||||
*/
|
||||
public stop(): void {
|
||||
log.info("stop: removing listeners to libp2p");
|
||||
|
||||
this.libp2p.removeEventListener(
|
||||
"peer:connect",
|
||||
this.onPeerChange as PeerEvent
|
||||
);
|
||||
this.libp2p.removeEventListener(
|
||||
"peer:disconnect",
|
||||
this.onPeerChange as PeerEvent
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current health status as a string.
|
||||
* @returns {string} Current health status (Unhealthy, MinimallyHealthy, or SufficientlyHealthy)
|
||||
*/
|
||||
public toString(): string {
|
||||
return this.value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the current health status value.
|
||||
* @returns {string} Current health status (Unhealthy, MinimallyHealthy, or SufficientlyHealthy)
|
||||
*/
|
||||
public toValue(): string {
|
||||
return this.value;
|
||||
}
|
||||
|
||||
private async onPeerChange(event: CustomEvent<PeerId>): Promise<void> {
|
||||
log.info(`onPeerChange: received libp2p event - ${event.type}`);
|
||||
|
||||
const connections = this.libp2p.getConnections();
|
||||
|
||||
const peers = await Promise.all(
|
||||
connections.map(async (c) => {
|
||||
try {
|
||||
return await this.libp2p.peerStore.get(c.remotePeer);
|
||||
} catch (e) {
|
||||
return null;
|
||||
}
|
||||
})
|
||||
);
|
||||
const filterPeers = peers.filter((p) =>
|
||||
p?.protocols.includes(FilterCodecs.SUBSCRIBE)
|
||||
).length;
|
||||
const lightPushPeers = peers.filter((p) =>
|
||||
p?.protocols.includes(LightPushCodec)
|
||||
).length;
|
||||
|
||||
if (connections.length === 0) {
|
||||
log.info(`onPeerChange: node identified as ${HealthStatus.Unhealthy}`);
|
||||
|
||||
this.value = HealthStatus.Unhealthy;
|
||||
} else if (filterPeers >= 2 && lightPushPeers >= 2) {
|
||||
log.info(
|
||||
`onPeerChange: node identified as ${HealthStatus.SufficientlyHealthy}`
|
||||
);
|
||||
|
||||
this.value = HealthStatus.SufficientlyHealthy;
|
||||
} else if (filterPeers === 1 && lightPushPeers === 1) {
|
||||
log.info(
|
||||
`onPeerChange: node identified as ${HealthStatus.MinimallyHealthy}`
|
||||
);
|
||||
|
||||
this.value = HealthStatus.MinimallyHealthy;
|
||||
}
|
||||
|
||||
this.dispatchEvent(
|
||||
new CustomEvent<HealthStatus>(HealthStatusChangeEvents.StatusChange, {
|
||||
detail: this.value
|
||||
})
|
||||
);
|
||||
|
||||
// this shouldn't happen as we expect service nodes implement Filter and LightPush at the same time
|
||||
log.error(
|
||||
`onPeerChange: unexpected state, cannot identify health status of the node: Filter:${filterPeers}; LightPush:${lightPushPeers}`
|
||||
);
|
||||
}
|
||||
}
|
1
packages/sdk/src/health_indicator/index.ts
Normal file
1
packages/sdk/src/health_indicator/index.ts
Normal file
@ -0,0 +1 @@
|
||||
export { HealthIndicator } from "./health_indicator.js";
|
@ -1,5 +1,5 @@
|
||||
import type { PeerId } from "@libp2p/interface";
|
||||
import { ConnectionManager, getHealthManager, LightPushCore } from "@waku/core";
|
||||
import { ConnectionManager, LightPushCore } from "@waku/core";
|
||||
import {
|
||||
type CoreProtocolResult,
|
||||
Failure,
|
||||
@ -101,11 +101,6 @@ export class LightPush implements ILightPush {
|
||||
}
|
||||
}
|
||||
|
||||
getHealthManager().updateProtocolHealth(
|
||||
this.protocol.multicodec,
|
||||
successes.length
|
||||
);
|
||||
|
||||
return {
|
||||
successes,
|
||||
failures
|
||||
|
@ -1,11 +1,10 @@
|
||||
import { isPeerId } from "@libp2p/interface";
|
||||
import type { Peer, PeerId, Stream } from "@libp2p/interface";
|
||||
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
|
||||
import { ConnectionManager, getHealthManager, StoreCodec } from "@waku/core";
|
||||
import { ConnectionManager, StoreCodec } from "@waku/core";
|
||||
import type {
|
||||
CreateNodeOptions,
|
||||
IFilter,
|
||||
IHealthManager,
|
||||
ILightPush,
|
||||
IRelay,
|
||||
IStore,
|
||||
@ -17,6 +16,7 @@ import { Protocols } from "@waku/interfaces";
|
||||
import { Logger } from "@waku/utils";
|
||||
|
||||
import { wakuFilter } from "../filter/index.js";
|
||||
import { HealthIndicator } from "../health_indicator/index.js";
|
||||
import { wakuLightPush } from "../light_push/index.js";
|
||||
import { PeerManager } from "../peer_manager/index.js";
|
||||
import { wakuStore } from "../store/index.js";
|
||||
@ -38,7 +38,7 @@ export class WakuNode implements IWaku {
|
||||
public filter?: IFilter;
|
||||
public lightPush?: ILightPush;
|
||||
public connectionManager: ConnectionManager;
|
||||
public readonly health: IHealthManager;
|
||||
public health: HealthIndicator;
|
||||
|
||||
private readonly peerManager: PeerManager;
|
||||
|
||||
@ -75,7 +75,7 @@ export class WakuNode implements IWaku {
|
||||
}
|
||||
});
|
||||
|
||||
this.health = getHealthManager();
|
||||
this.health = new HealthIndicator({ libp2p });
|
||||
|
||||
if (protocolsEnabled.store) {
|
||||
if (options.store?.peer) {
|
||||
@ -183,9 +183,11 @@ export class WakuNode implements IWaku {
|
||||
|
||||
public async start(): Promise<void> {
|
||||
await this.libp2p.start();
|
||||
this.health.start();
|
||||
}
|
||||
|
||||
public async stop(): Promise<void> {
|
||||
this.health.stop();
|
||||
this.peerManager.stop();
|
||||
this.connectionManager.stop();
|
||||
await this.libp2p.stop();
|
||||
|
@ -1,175 +0,0 @@
|
||||
import { HealthStatus, IWaku, LightNode, Protocols } from "@waku/interfaces";
|
||||
import { createLightNode } from "@waku/sdk";
|
||||
import { shardInfoToPubsubTopics } from "@waku/utils";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
runMultipleNodes,
|
||||
ServiceNode,
|
||||
ServiceNodesFleet
|
||||
} from "../../src/index.js";
|
||||
|
||||
import {
|
||||
messagePayload,
|
||||
TestDecoder,
|
||||
TestEncoder,
|
||||
TestShardInfo
|
||||
} from "./utils.js";
|
||||
|
||||
// TODO(weboko): resolve https://github.com/waku-org/js-waku/issues/2186
|
||||
describe.skip("Node Health Status Matrix Tests", function () {
|
||||
let waku: LightNode;
|
||||
let serviceNodes: ServiceNode[];
|
||||
|
||||
afterEachCustom(this, async function () {
|
||||
if (waku) {
|
||||
await waku.stop();
|
||||
}
|
||||
if (serviceNodes) {
|
||||
await Promise.all(serviceNodes.map((node) => node.stop()));
|
||||
}
|
||||
});
|
||||
|
||||
const peerCounts = [0, 1, 2, 3];
|
||||
|
||||
peerCounts.forEach((lightPushPeers) => {
|
||||
peerCounts.forEach((filterPeers) => {
|
||||
it(`LightPush: ${lightPushPeers} peers, Filter: ${filterPeers} peers`, async function () {
|
||||
this.timeout(10_000);
|
||||
|
||||
[waku, serviceNodes] = await setupTestEnvironment(
|
||||
this.ctx,
|
||||
lightPushPeers,
|
||||
filterPeers
|
||||
);
|
||||
|
||||
if (lightPushPeers > 0) {
|
||||
await waku.lightPush.send(TestEncoder, messagePayload);
|
||||
}
|
||||
|
||||
if (filterPeers > 0) {
|
||||
await waku.filter.subscribe([TestDecoder], () => {});
|
||||
}
|
||||
|
||||
const lightPushHealth = waku.health.getProtocolStatus(
|
||||
Protocols.LightPush
|
||||
);
|
||||
const filterHealth = waku.health.getProtocolStatus(Protocols.Filter);
|
||||
|
||||
lightPushPeers = await getPeerCounBasedOnConnections(
|
||||
waku,
|
||||
waku.lightPush.protocol.multicodec
|
||||
);
|
||||
expect(lightPushHealth?.status).to.equal(
|
||||
getExpectedProtocolStatus(lightPushPeers)
|
||||
);
|
||||
expect(filterHealth?.status).to.equal(
|
||||
getExpectedProtocolStatus(filterPeers)
|
||||
);
|
||||
|
||||
const expectedHealth = getExpectedNodeHealth(
|
||||
lightPushPeers,
|
||||
filterPeers
|
||||
);
|
||||
const nodeHealth = waku.health.getHealthStatus();
|
||||
expect(nodeHealth).to.equal(expectedHealth);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
function getExpectedProtocolStatus(peerCount: number): HealthStatus {
|
||||
if (peerCount === 0) return HealthStatus.Unhealthy;
|
||||
if (peerCount === 1) return HealthStatus.MinimallyHealthy;
|
||||
return HealthStatus.SufficientlyHealthy;
|
||||
}
|
||||
|
||||
async function getPeerCounBasedOnConnections(
|
||||
waku: IWaku,
|
||||
codec: string
|
||||
): Promise<number> {
|
||||
const peerIDs = waku.libp2p
|
||||
.getConnections()
|
||||
.map((c) => c.remotePeer.toString());
|
||||
|
||||
const peers = await waku.libp2p.peerStore.all();
|
||||
|
||||
return peers
|
||||
.filter((peer) => peerIDs.includes(peer.id.toString()))
|
||||
.filter((peer) => peer.protocols.includes(codec)).length;
|
||||
}
|
||||
|
||||
function getExpectedNodeHealth(
|
||||
lightPushPeers: number,
|
||||
filterPeers: number
|
||||
): HealthStatus {
|
||||
if (lightPushPeers === 0 || filterPeers === 0) {
|
||||
return HealthStatus.Unhealthy;
|
||||
} else if (lightPushPeers === 1 || filterPeers === 1) {
|
||||
return HealthStatus.MinimallyHealthy;
|
||||
} else {
|
||||
return HealthStatus.SufficientlyHealthy;
|
||||
}
|
||||
}
|
||||
|
||||
async function runNodeWithProtocols(
|
||||
lightPush: boolean,
|
||||
filter: boolean
|
||||
): Promise<ServiceNode> {
|
||||
const serviceNode = new ServiceNode(`node-${Date.now()}`);
|
||||
await serviceNode.start({
|
||||
lightpush: lightPush,
|
||||
filter: filter,
|
||||
relay: true,
|
||||
clusterId: TestShardInfo.clusterId,
|
||||
pubsubTopic: shardInfoToPubsubTopics(TestShardInfo)
|
||||
});
|
||||
return serviceNode;
|
||||
}
|
||||
|
||||
async function setupTestEnvironment(
|
||||
context: Mocha.Context,
|
||||
lightPushPeers: number,
|
||||
filterPeers: number
|
||||
): Promise<[LightNode, ServiceNode[]]> {
|
||||
let commonPeers: number;
|
||||
if (lightPushPeers === 0 || filterPeers === 0) {
|
||||
commonPeers = Math.max(lightPushPeers, filterPeers);
|
||||
} else {
|
||||
commonPeers = Math.min(lightPushPeers, filterPeers);
|
||||
}
|
||||
|
||||
let waku: LightNode;
|
||||
const serviceNodes: ServiceNode[] = [];
|
||||
let serviceNodesFleet: ServiceNodesFleet;
|
||||
|
||||
if (commonPeers > 0) {
|
||||
[serviceNodesFleet, waku] = await runMultipleNodes(
|
||||
context,
|
||||
TestShardInfo,
|
||||
{ filter: true, lightpush: true },
|
||||
undefined,
|
||||
commonPeers
|
||||
);
|
||||
serviceNodes.push(...serviceNodesFleet.nodes);
|
||||
} else {
|
||||
waku = await createLightNode({ networkConfig: TestShardInfo });
|
||||
}
|
||||
|
||||
// Create additional LightPush nodes if needed
|
||||
for (let i = commonPeers; i < lightPushPeers; i++) {
|
||||
const node = await runNodeWithProtocols(true, false);
|
||||
serviceNodes.push(node);
|
||||
await waku.dial(await node.getMultiaddrWithId());
|
||||
}
|
||||
|
||||
// Create additional Filter nodes if needed
|
||||
for (let i = commonPeers; i < filterPeers; i++) {
|
||||
const node = await runNodeWithProtocols(false, true);
|
||||
serviceNodes.push(node);
|
||||
await waku.dial(await node.getMultiaddrWithId());
|
||||
}
|
||||
|
||||
return [waku, serviceNodes];
|
||||
}
|
@ -1,94 +0,0 @@
|
||||
import { HealthStatus, type LightNode, Protocols } from "@waku/sdk";
|
||||
import { expect } from "chai";
|
||||
|
||||
import {
|
||||
afterEachCustom,
|
||||
runMultipleNodes,
|
||||
ServiceNodesFleet,
|
||||
teardownNodesWithRedundancy
|
||||
} from "../../src/index.js";
|
||||
|
||||
import {
|
||||
messagePayload,
|
||||
TestDecoder,
|
||||
TestEncoder,
|
||||
TestShardInfo
|
||||
} from "./utils.js";
|
||||
|
||||
const NUM_NODES = [0, 1, 2, 3];
|
||||
|
||||
// TODO(weboko): resolve https://github.com/waku-org/js-waku/issues/2186
|
||||
describe.skip("Health Manager", function () {
|
||||
this.timeout(10_000);
|
||||
|
||||
let waku: LightNode;
|
||||
let serviceNodes: ServiceNodesFleet;
|
||||
|
||||
afterEachCustom(this, async () => {
|
||||
await teardownNodesWithRedundancy(serviceNodes, waku);
|
||||
});
|
||||
|
||||
describe("Should update the health status for protocols", () => {
|
||||
this.timeout(10_000);
|
||||
|
||||
NUM_NODES.map((num) => {
|
||||
it(`LightPush with ${num} connections`, async function () {
|
||||
this.timeout(10_000);
|
||||
[serviceNodes, waku] = await runMultipleNodes(
|
||||
this.ctx,
|
||||
TestShardInfo,
|
||||
{ lightpush: true, filter: true },
|
||||
undefined,
|
||||
num
|
||||
);
|
||||
|
||||
await waku.lightPush.send(TestEncoder, messagePayload);
|
||||
|
||||
const health = waku.health.getProtocolStatus(Protocols.LightPush);
|
||||
if (!health) {
|
||||
expect(health).to.not.equal(undefined);
|
||||
}
|
||||
|
||||
if (num === 0) {
|
||||
expect(health?.status).to.equal(HealthStatus.Unhealthy);
|
||||
} else if (num < 2) {
|
||||
expect(health?.status).to.equal(HealthStatus.MinimallyHealthy);
|
||||
} else if (num >= 2) {
|
||||
expect(health?.status).to.equal(HealthStatus.SufficientlyHealthy);
|
||||
} else {
|
||||
throw new Error("Invalid number of connections");
|
||||
}
|
||||
});
|
||||
it(`Filter with ${num} connections`, async function () {
|
||||
[serviceNodes, waku] = await runMultipleNodes(
|
||||
this.ctx,
|
||||
TestShardInfo,
|
||||
{ filter: true, lightpush: true },
|
||||
undefined,
|
||||
num
|
||||
);
|
||||
|
||||
const { error } = await waku.filter.subscribe([TestDecoder], () => {});
|
||||
|
||||
if (error) {
|
||||
expect(error).to.not.equal(undefined);
|
||||
}
|
||||
|
||||
const health = waku.health.getProtocolStatus(Protocols.Filter);
|
||||
if (!health) {
|
||||
expect(health).to.not.equal(undefined);
|
||||
}
|
||||
|
||||
if (num === 0) {
|
||||
expect(health?.status).to.equal(HealthStatus.Unhealthy);
|
||||
} else if (num < 2) {
|
||||
expect(health?.status).to.equal(HealthStatus.MinimallyHealthy);
|
||||
} else if (num >= 2) {
|
||||
expect(health?.status).to.equal(HealthStatus.SufficientlyHealthy);
|
||||
} else {
|
||||
throw new Error("Invalid number of connections");
|
||||
}
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
@ -1,21 +0,0 @@
|
||||
import { createDecoder, createEncoder } from "@waku/core";
|
||||
import { utf8ToBytes } from "@waku/sdk";
|
||||
import { contentTopicToPubsubTopic } from "@waku/utils";
|
||||
|
||||
export const TestContentTopic = "/test/1/waku-filter/default";
|
||||
export const ClusterId = 2;
|
||||
export const TestShardInfo = {
|
||||
contentTopics: [TestContentTopic],
|
||||
clusterId: ClusterId
|
||||
};
|
||||
export const TestPubsubTopic = contentTopicToPubsubTopic(
|
||||
TestContentTopic,
|
||||
ClusterId
|
||||
);
|
||||
export const TestEncoder = createEncoder({
|
||||
contentTopic: TestContentTopic,
|
||||
pubsubTopic: TestPubsubTopic
|
||||
});
|
||||
export const TestDecoder = createDecoder(TestContentTopic, TestPubsubTopic);
|
||||
export const messageText = "Filtering works!";
|
||||
export const messagePayload = { payload: utf8ToBytes(messageText) };
|
Loading…
x
Reference in New Issue
Block a user