feat: node and protocols health (#2080)

* feat: introduce HealthManager

* feat: make health accessible on Waku object

* feat: update health from protocols

* chore: add access modifiers to healthmanager

* feat: use a HealthManager singleton

* chore: add tests for Filter, LightPush and Store

* feat: add overall node health

* chore: update protocol health to consider Store protocol

* chore: setup generic test utils instead of using filter utils

* tests: add a health status matrix check from 0-3

* chore: increase timeout for failing tests in CI
tests pass locally without an increased timeout, but fail in CI

* chore: move name inference to HealthManager

* tests: abstract away node creation and teardown utils

* fix: import
This commit is contained in:
Danish Arora 2024-07-27 18:27:54 +05:30 committed by GitHub
parent defe41bb9a
commit d464af3645
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 557 additions and 32 deletions

View File

@ -21,6 +21,8 @@ export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js";
export { ConnectionManager } from "./lib/connection_manager.js";
export { getHealthManager } from "./lib/health_manager.js";
export { KeepAliveManager } from "./lib/keep_alive_manager.js";
export { StreamManager } from "./lib/stream_manager/index.js";

View File

@ -0,0 +1,90 @@
import {
HealthStatus,
type IHealthManager,
NodeHealth,
type ProtocolHealth,
Protocols
} from "@waku/interfaces";
class HealthManager implements IHealthManager {
public static instance: HealthManager;
private readonly health: NodeHealth;
private constructor() {
this.health = {
overallStatus: HealthStatus.Unhealthy,
protocolStatuses: new Map()
};
}
public static getInstance(): HealthManager {
if (!HealthManager.instance) {
HealthManager.instance = new HealthManager();
}
return HealthManager.instance;
}
public getHealthStatus(): HealthStatus {
return this.health.overallStatus;
}
public getProtocolStatus(protocol: Protocols): ProtocolHealth | undefined {
return this.health.protocolStatuses.get(protocol);
}
public updateProtocolHealth(
multicodec: string,
connectedPeers: number
): void {
const protocol = this.getNameFromMulticodec(multicodec);
let status: HealthStatus = HealthStatus.Unhealthy;
if (connectedPeers == 1) {
status = HealthStatus.MinimallyHealthy;
} else if (connectedPeers >= 2) {
status = HealthStatus.SufficientlyHealthy;
}
this.health.protocolStatuses.set(protocol, {
name: protocol,
status: status,
lastUpdate: new Date()
});
this.updateOverallHealth();
}
private getNameFromMulticodec(multicodec: string): Protocols {
let name: Protocols;
if (multicodec.includes("filter")) {
name = Protocols.Filter;
} else if (multicodec.includes("lightpush")) {
name = Protocols.LightPush;
} else if (multicodec.includes("store")) {
name = Protocols.Store;
} else {
throw new Error(`Unknown protocol: ${multicodec}`);
}
return name;
}
private updateOverallHealth(): void {
const relevantProtocols = [Protocols.LightPush, Protocols.Filter];
const statuses = relevantProtocols.map(
(p) => this.getProtocolStatus(p)?.status
);
if (statuses.some((status) => status === HealthStatus.Unhealthy)) {
this.health.overallStatus = HealthStatus.Unhealthy;
} else if (
statuses.some((status) => status === HealthStatus.MinimallyHealthy)
) {
this.health.overallStatus = HealthStatus.MinimallyHealthy;
} else {
this.health.overallStatus = HealthStatus.SufficientlyHealthy;
}
}
}
export const getHealthManager = (): HealthManager =>
HealthManager.getInstance();

View File

@ -0,0 +1,26 @@
import { Protocols } from "./protocols";
export enum HealthStatus {
Unhealthy = "Unhealthy",
MinimallyHealthy = "MinimallyHealthy",
SufficientlyHealthy = "SufficientlyHealthy"
}
export interface IHealthManager {
getHealthStatus: () => HealthStatus;
getProtocolStatus: (protocol: Protocols) => ProtocolHealth | undefined;
updateProtocolHealth: (multicodec: string, connectedPeers: number) => void;
}
export type NodeHealth = {
overallStatus: HealthStatus;
protocolStatuses: ProtocolsHealthStatus;
};
export type ProtocolHealth = {
name: Protocols;
status: HealthStatus;
lastUpdate: Date;
};
export type ProtocolsHealthStatus = Map<Protocols, ProtocolHealth>;

View File

@ -17,3 +17,4 @@ export * from "./dns_discovery.js";
export * from "./metadata.js";
export * from "./constants.js";
export * from "./local_storage.js";
export * from "./health_manager.js";

View File

@ -25,8 +25,8 @@ export type IBaseProtocolCore = {
};
export type IBaseProtocolSDK = {
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
readonly connectedPeers: Peer[];
renewPeer: (peerToDisconnect: PeerId) => Promise<Peer>;
readonly numPeersToUse: number;
};

View File

@ -3,6 +3,7 @@ import type { MultiaddrInput } from "@multiformats/multiaddr";
import { IConnectionManager } from "./connection_manager.js";
import type { IFilterSDK } from "./filter.js";
import { IHealthManager } from "./health_manager.js";
import type { Libp2p } from "./libp2p.js";
import type { ILightPushSDK } from "./light_push.js";
import { Protocols } from "./protocols.js";
@ -27,6 +28,8 @@ export interface Waku {
isStarted(): boolean;
isConnected(): boolean;
health: IHealthManager;
}
export interface LightNode extends Waku {

View File

@ -1,7 +1,11 @@
import type { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager } from "@waku/core";
import { ConnectionManager, getHealthManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces";
import {
IBaseProtocolSDK,
IHealthManager,
ProtocolUseOptions
} from "@waku/interfaces";
import { delay, Logger } from "@waku/utils";
interface Options {
@ -14,6 +18,7 @@ const DEFAULT_NUM_PEERS_TO_USE = 3;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;
export class BaseProtocolSDK implements IBaseProtocolSDK {
private healthManager: IHealthManager;
public readonly numPeersToUse: number;
private peers: Peer[] = [];
private maintainPeersIntervalId: ReturnType<
@ -32,6 +37,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
options: Options
) {
this.log = new Logger(`sdk:${core.multicodec}`);
this.healthManager = getHealthManager();
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
const maintainPeersInterval =
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;
@ -60,7 +68,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
);
}
this.peers = this.peers.filter((peer) => !peer.id.equals(peerToDisconnect));
const updatedPeers = this.peers.filter(
(peer) => !peer.id.equals(peerToDisconnect)
);
this.updatePeers(updatedPeers);
this.log.info(
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
);
@ -192,7 +204,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
await Promise.all(dials);
this.peers = [...this.peers, ...additionalPeers];
const updatedPeers = [...this.peers, ...additionalPeers];
this.updatePeers(updatedPeers);
this.log.info(
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}`
);
@ -232,6 +246,14 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
throw error;
}
}
private updatePeers(peers: Peer[]): void {
this.peers = peers;
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.length
);
}
}
class RenewPeerLocker {

View File

@ -1,9 +1,10 @@
import type { Stream } from "@libp2p/interface";
import { isPeerId, PeerId } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager } from "@waku/core";
import { ConnectionManager, getHealthManager } from "@waku/core";
import type {
IFilterSDK,
IHealthManager,
ILightPushSDK,
IRelay,
IStoreSDK,
@ -68,6 +69,7 @@ export class WakuNode implements Waku {
public lightPush?: ILightPushSDK;
public connectionManager: ConnectionManager;
public readonly pubsubTopics: PubsubTopic[];
public readonly health: IHealthManager;
public constructor(
options: WakuOptions,
@ -105,6 +107,8 @@ export class WakuNode implements Waku {
this.relay
);
this.health = getHealthManager();
if (protocolsEnabled.store) {
const store = wakuStore(this.connectionManager, options);
this.store = store(libp2p);

View File

@ -7,3 +7,4 @@ export * from "./base64_utf8.js";
export * from "./waitForConnections.js";
export * from "./custom_mocha_hooks.js";
export * from "./waku_versions_utils.js";
export * from "./nodes.js";

View File

@ -0,0 +1,115 @@
import { waitForRemotePeer } from "@waku/core";
import {
LightNode,
ProtocolCreateOptions,
Protocols,
ShardingParams,
Waku
} from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { isDefined, shardInfoToPubsubTopics } from "@waku/utils";
import { Context } from "mocha";
import pRetry from "p-retry";
import { DefaultTestPubsubTopic, NOISE_KEY_1 } from "../constants";
import { ServiceNodesFleet } from "../lib";
import { Args } from "../types";
import { waitForConnections } from "./waitForConnections";
export async function runMultipleNodes(
context: Context,
shardInfo?: ShardingParams,
customArgs?: Args,
strictChecking: boolean = false,
numServiceNodes = 3,
withoutFilter = false
): Promise<[ServiceNodesFleet, LightNode]> {
const pubsubTopics = shardInfo
? shardInfoToPubsubTopics(shardInfo)
: [DefaultTestPubsubTopic];
// create numServiceNodes nodes
const serviceNodes = await ServiceNodesFleet.createAndRun(
context,
pubsubTopics,
numServiceNodes,
strictChecking,
shardInfo,
customArgs,
withoutFilter
);
const wakuOptions: ProtocolCreateOptions = {
staticNoiseKey: NOISE_KEY_1,
libp2p: {
addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] }
}
};
if (shardInfo) {
wakuOptions.shardInfo = shardInfo;
} else {
wakuOptions.pubsubTopics = pubsubTopics;
}
const waku = await createLightNode(wakuOptions);
await waku.start();
if (!waku) {
throw new Error("Failed to initialize waku");
}
for (const node of serviceNodes.nodes) {
await waku.dial(await node.getMultiaddrWithId());
await waitForRemotePeer(
waku,
[
!customArgs?.filter ? undefined : Protocols.Filter,
!customArgs?.lightpush ? undefined : Protocols.LightPush
].filter(isDefined)
);
await node.ensureSubscriptions(pubsubTopics);
const wakuConnections = waku.libp2p.getConnections();
const nodePeers = await node.peers();
if (wakuConnections.length < 1 || nodePeers.length < 1) {
throw new Error(
`Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and service nodes: ${nodePeers.length}`
);
}
}
await waitForConnections(numServiceNodes, waku);
return [serviceNodes, waku];
}
export async function teardownNodesWithRedundancy(
serviceNodes: ServiceNodesFleet,
wakuNodes: Waku | Waku[]
): Promise<void> {
const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes];
const stopNwakuNodes = serviceNodes.nodes.map(async (node) => {
await pRetry(
async () => {
await node.stop();
},
{ retries: 3 }
);
});
const stopWakuNodes = wNodes.map(async (waku) => {
if (waku) {
await pRetry(
async () => {
await waku.stop();
},
{ retries: 3 }
);
}
});
await Promise.all([...stopNwakuNodes, ...stopWakuNodes]);
}

View File

@ -18,13 +18,11 @@ import {
beforeEachCustom,
DefaultTestPubsubTopic,
DefaultTestShardInfo,
ServiceNode,
ServiceNodesFleet
} from "../../src/index.js";
import {
runMultipleNodes,
ServiceNode,
ServiceNodesFleet,
teardownNodesWithRedundancy
} from "../filter/utils.js";
} from "../../src/index.js";
describe("Waku Filter: Peer Management: E2E", function () {
this.timeout(15000);
@ -46,6 +44,7 @@ describe("Waku Filter: Peer Management: E2E", function () {
this.ctx,
DefaultTestShardInfo,
undefined,
undefined,
5
);
const { error, subscription: sub } = await waku.filter.createSubscription(
@ -186,6 +185,7 @@ describe("Waku Filter: Peer Management: E2E", function () {
this.ctx,
DefaultTestShardInfo,
undefined,
undefined,
2
);
const serviceNodesPeerIdStr = await Promise.all(

View File

@ -5,12 +5,12 @@ import { expect } from "chai";
import {
afterEachCustom,
beforeEachCustom,
ServiceNodesFleet
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy
} from "../../src/index.js";
import {
runMultipleNodes,
teardownNodesWithRedundancy,
TestContentTopic,
TestDecoder,
TestEncoder,

View File

@ -7,15 +7,15 @@ import {
afterEachCustom,
beforeEachCustom,
delay,
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy,
TEST_STRING,
TEST_TIMESTAMPS
} from "../../src/index.js";
import {
messageText,
runMultipleNodes,
teardownNodesWithRedundancy,
TestContentTopic,
TestDecoder,
TestEncoder,

View File

@ -15,15 +15,15 @@ import {
beforeEachCustom,
delay,
generateTestData,
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy,
TEST_STRING
} from "../../src/index.js";
import {
messagePayload,
messageText,
runMultipleNodes,
teardownNodesWithRedundancy,
TestContentTopic,
TestDecoder,
TestEncoder,
@ -42,6 +42,7 @@ const runTests = (strictCheckNodes: boolean): void => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
undefined,
strictCheckNodes
);
const { error, subscription: _subscription } =

View File

@ -7,15 +7,15 @@ import {
afterEachCustom,
beforeEachCustom,
generateTestData,
ServiceNodesFleet
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy
} from "../../src/index.js";
import {
ClusterId,
messagePayload,
messageText,
runMultipleNodes,
teardownNodesWithRedundancy,
TestContentTopic,
TestDecoder,
TestEncoder,

View File

@ -22,7 +22,7 @@ import {
NOISE_KEY_1,
ServiceNodesFleet,
waitForConnections
} from "../../src/index.js";
} from "../../src";
// Constants for test configuration.
export const log = new Logger("test:filter");

View File

@ -0,0 +1,146 @@
import { HealthStatus, LightNode, Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
runMultipleNodes,
ServiceNode,
ServiceNodesFleet
} from "../../src";
import { messagePayload, TestEncoder, TestShardInfo } from "./utils";
describe("Node Health Status Matrix Tests", function () {
let waku: LightNode;
let serviceNodes: ServiceNode[];
afterEachCustom(this, async function () {
if (waku) {
await waku.stop();
}
if (serviceNodes) {
await Promise.all(serviceNodes.map((node) => node.stop()));
}
});
const peerCounts = [0, 1, 2, 3];
peerCounts.forEach((lightPushPeers) => {
peerCounts.forEach((filterPeers) => {
const expectedHealth = getExpectedNodeHealth(lightPushPeers, filterPeers);
it(`LightPush: ${lightPushPeers} peers, Filter: ${filterPeers} peers - Expected: ${expectedHealth}`, async function () {
this.timeout(10_000);
[waku, serviceNodes] = await setupTestEnvironment(
this.ctx,
lightPushPeers,
filterPeers
);
if (lightPushPeers > 0) {
await waku.lightPush.send(TestEncoder, messagePayload, {
forceUseAllPeers: true
});
}
if (filterPeers > 0) {
await waku.filter.createSubscription(TestShardInfo);
}
const lightPushHealth = waku.health.getProtocolStatus(
Protocols.LightPush
);
const filterHealth = waku.health.getProtocolStatus(Protocols.Filter);
expect(lightPushHealth?.status).to.equal(
getExpectedProtocolStatus(lightPushPeers)
);
expect(filterHealth?.status).to.equal(
getExpectedProtocolStatus(filterPeers)
);
const nodeHealth = waku.health.getHealthStatus();
expect(nodeHealth).to.equal(expectedHealth);
});
});
});
});
function getExpectedProtocolStatus(peerCount: number): HealthStatus {
if (peerCount === 0) return HealthStatus.Unhealthy;
if (peerCount === 1) return HealthStatus.MinimallyHealthy;
return HealthStatus.SufficientlyHealthy;
}
function getExpectedNodeHealth(
lightPushPeers: number,
filterPeers: number
): HealthStatus {
if (lightPushPeers === 0 || filterPeers === 0) {
return HealthStatus.Unhealthy;
} else if (lightPushPeers === 1 || filterPeers === 1) {
return HealthStatus.MinimallyHealthy;
} else {
return HealthStatus.SufficientlyHealthy;
}
}
async function runNodeWithProtocols(
lightPush: boolean,
filter: boolean
): Promise<ServiceNode> {
const serviceNode = new ServiceNode(`node-${Date.now()}`);
await serviceNode.start({
lightpush: lightPush,
filter: filter,
relay: true
});
return serviceNode;
}
async function setupTestEnvironment(
context: Mocha.Context,
lightPushPeers: number,
filterPeers: number
): Promise<[LightNode, ServiceNode[]]> {
let commonPeers: number;
if (lightPushPeers === 0 || filterPeers === 0) {
commonPeers = Math.max(lightPushPeers, filterPeers);
} else {
commonPeers = Math.min(lightPushPeers, filterPeers);
}
let waku: LightNode;
const serviceNodes: ServiceNode[] = [];
let serviceNodesFleet: ServiceNodesFleet;
if (commonPeers > 0) {
[serviceNodesFleet, waku] = await runMultipleNodes(
context,
TestShardInfo,
{ filter: true, lightpush: true },
undefined,
commonPeers
);
serviceNodes.push(...serviceNodesFleet.nodes);
} else {
waku = await createLightNode({ shardInfo: TestShardInfo });
}
// Create additional LightPush nodes if needed
for (let i = commonPeers; i < lightPushPeers; i++) {
const node = await runNodeWithProtocols(true, false);
serviceNodes.push(node);
await waku.dial(await node.getMultiaddrWithId());
}
// Create additional Filter nodes if needed
for (let i = commonPeers; i < filterPeers; i++) {
const node = await runNodeWithProtocols(false, true);
serviceNodes.push(node);
await waku.dial(await node.getMultiaddrWithId());
}
return [waku, serviceNodes];
}

View File

@ -0,0 +1,95 @@
import { HealthStatus, type LightNode, Protocols } from "@waku/sdk";
import { expect } from "chai";
import {
afterEachCustom,
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy
} from "../../src/index.js";
import {
messagePayload,
TestDecoder,
TestEncoder,
TestShardInfo
} from "./utils.js";
const NUM_NODES = [0, 1, 2, 3];
describe("Health Manager", function () {
this.timeout(10_000);
let waku: LightNode;
let serviceNodes: ServiceNodesFleet;
afterEachCustom(this, async () => {
await teardownNodesWithRedundancy(serviceNodes, waku);
});
describe("Should update the health status for protocols", () => {
this.timeout(10_000);
NUM_NODES.map((num) => {
it(`LightPush with ${num} connections`, async function () {
this.timeout(10_000);
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
undefined,
undefined,
num
);
await waku.lightPush.send(TestEncoder, messagePayload);
const health = waku.health.getProtocolStatus(Protocols.LightPush);
if (!health) {
expect(health).to.not.equal(undefined);
}
if (num === 0) {
expect(health?.status).to.equal(HealthStatus.Unhealthy);
} else if (num < 2) {
expect(health?.status).to.equal(HealthStatus.MinimallyHealthy);
} else if (num >= 2) {
expect(health?.status).to.equal(HealthStatus.SufficientlyHealthy);
} else {
throw new Error("Invalid number of connections");
}
});
it(`Filter with ${num} connections`, async function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
undefined,
undefined,
num
);
const { error, subscription } =
await waku.filter.createSubscription(TestShardInfo);
if (error) {
expect(error).to.not.equal(undefined);
}
await subscription?.subscribe([TestDecoder], () => {});
const health = waku.health.getProtocolStatus(Protocols.Filter);
if (!health) {
expect(health).to.not.equal(undefined);
}
if (num === 0) {
expect(health?.status).to.equal(HealthStatus.Unhealthy);
} else if (num < 2) {
expect(health?.status).to.equal(HealthStatus.MinimallyHealthy);
} else if (num >= 2) {
expect(health?.status).to.equal(HealthStatus.SufficientlyHealthy);
} else {
throw new Error("Invalid number of connections");
}
});
});
});
});

View File

@ -0,0 +1,21 @@
import { createDecoder, createEncoder } from "@waku/core";
import { utf8ToBytes } from "@waku/sdk";
import { contentTopicToPubsubTopic } from "@waku/utils";
export const TestContentTopic = "/test/1/waku-filter/default";
export const ClusterId = 2;
export const TestShardInfo = {
contentTopics: [TestContentTopic],
clusterId: ClusterId
};
export const TestPubsubTopic = contentTopicToPubsubTopic(
TestContentTopic,
ClusterId
);
export const TestEncoder = createEncoder({
contentTopic: TestContentTopic,
pubsubTopic: TestPubsubTopic
});
export const TestDecoder = createDecoder(TestContentTopic, TestPubsubTopic);
export const messageText = "Filtering works!";
export const messagePayload = { payload: utf8ToBytes(messageText) };

View File

@ -7,13 +7,11 @@ import {
afterEachCustom,
beforeEachCustom,
generateRandomUint8Array,
runMultipleNodes,
ServiceNodesFleet,
teardownNodesWithRedundancy,
TEST_STRING
} from "../../src";
import {
runMultipleNodes,
teardownNodesWithRedundancy
} from "../filter/utils.js";
import {
messagePayload,
@ -36,6 +34,7 @@ const runTests = (strictNodeCheck: boolean): void => {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
undefined,
strictNodeCheck,
numServiceNodes,
true

View File

@ -8,13 +8,11 @@ import {
beforeEachCustom,
DefaultTestShardInfo,
DefaultTestSingleShardInfo,
ServiceNodesFleet
} from "../../src/index.js";
import {
runMultipleNodes,
teardownNodesWithRedundancy,
TestContentTopic
} from "../filter/utils.js";
ServiceNodesFleet,
teardownNodesWithRedundancy
} from "../../src/index.js";
import { TestContentTopic } from "../filter/utils.js";
describe("Waku Light Push: Peer Management: E2E", function () {
this.timeout(15000);
@ -26,6 +24,7 @@ describe("Waku Light Push: Peer Management: E2E", function () {
this.ctx,
DefaultTestShardInfo,
undefined,
undefined,
5
);
});