From dc7ea365c6419b5952cb906d3243326c962b8547 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Mon, 16 Dec 2024 19:52:41 +0530 Subject: [PATCH] chore: add tests --- packages/core/src/lib/health_manager.ts | 11 ++ .../tests/tests/health-manager/node.spec.ts | 74 ++++++++++++- .../tests/health-manager/protocols.spec.ts | 103 ++++++++++++++++++ packages/tests/tests/health-manager/utils.ts | 42 +++++++ 4 files changed, 229 insertions(+), 1 deletion(-) diff --git a/packages/core/src/lib/health_manager.ts b/packages/core/src/lib/health_manager.ts index f7a823ba2a..df5da6c3f6 100644 --- a/packages/core/src/lib/health_manager.ts +++ b/packages/core/src/lib/health_manager.ts @@ -8,13 +8,16 @@ import { type ProtocolHealth, Protocols } from "@waku/interfaces"; +import { Logger } from "@waku/utils"; class HealthManager implements IHealthManager { public static instance: HealthManager; private readonly health: NodeHealth; private listeners: Map>; + private log: Logger; private constructor() { + this.log = new Logger("health-manager"); this.health = { overallStatus: HealthStatus.Unhealthy, protocolStatuses: new Map() @@ -50,6 +53,10 @@ class HealthManager implements IHealthManager { status = HealthStatus.SufficientlyHealthy; } + this.log.info( + `Updating protocol health for ${protocol}: ${status} (${connectedPeers} peers)` + ); + this.health.protocolStatuses.set(protocol, { name: protocol, status: status, @@ -96,6 +103,7 @@ class HealthManager implements IHealthManager { } else if (multicodec.includes("store")) { name = Protocols.Store; } else { + this.log.error(`Unknown protocol multicodec: ${multicodec}`); throw new Error(`Unknown protocol: ${multicodec}`); } return name; @@ -119,6 +127,9 @@ class HealthManager implements IHealthManager { } if (this.health.overallStatus !== newStatus) { + this.log.info( + `Overall health status changed from ${this.health.overallStatus} to ${newStatus}` + ); this.health.overallStatus = newStatus; this.emitEvent({ type: "health:overall", diff --git a/packages/tests/tests/health-manager/node.spec.ts b/packages/tests/tests/health-manager/node.spec.ts index 6e50522a17..213600015b 100644 --- a/packages/tests/tests/health-manager/node.spec.ts +++ b/packages/tests/tests/health-manager/node.spec.ts @@ -1,6 +1,6 @@ import { HealthStatus, IWaku, LightNode, Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; -import { shardInfoToPubsubTopics } from "@waku/utils"; +import { delay, shardInfoToPubsubTopics } from "@waku/utils"; import { expect } from "chai"; import { @@ -16,6 +16,7 @@ import { TestEncoder, TestShardInfo } from "./utils.js"; +import { waitForHealthStatus } from "./utils.js"; // TODO(weboko): resolve https://github.com/waku-org/js-waku/issues/2186 describe.skip("Node Health Status Matrix Tests", function () { @@ -79,6 +80,77 @@ describe.skip("Node Health Status Matrix Tests", function () { }); }); +describe.only("Node Health Status Transitions", function () { + let waku: LightNode; + let serviceNodes: ServiceNode[]; + + afterEachCustom(this, async function () { + if (waku) { + await waku.stop(); + } + if (serviceNodes) { + await Promise.all(serviceNodes.map((node) => node.stop())); + } + }); + + it.only("should transition through health states with events", async function () { + // Start with no peers + [waku, serviceNodes] = await setupTestEnvironment(this.ctx, 0, 0); + expect(waku.health.getHealthStatus()).to.equal(HealthStatus.Unhealthy); + + // Add one peer for minimal health + const minimalNode = await runNodeWithProtocols(true, true); + serviceNodes.push(minimalNode); + await waku.dial(await minimalNode.getMultiaddrWithId()); + + await delay(5000); + + console.log("waiting for minimal health status"); + const minimalHealthPromiseEvent = await waitForHealthStatus( + waku, + HealthStatus.MinimallyHealthy + ); + + console.log("minimalHealthPromiseEvent", minimalHealthPromiseEvent); + + expect(minimalHealthPromiseEvent.status).to.equal( + HealthStatus.MinimallyHealthy + ); + console.log("minimalHealthPromiseEvent", minimalHealthPromiseEvent); + + // Add second peer for sufficient health + const sufficientNode = await runNodeWithProtocols(true, true); + serviceNodes.push(sufficientNode); + + const sufficientHealthPromise = waitForHealthStatus( + waku, + HealthStatus.SufficientlyHealthy + ); + await waku.dial(await sufficientNode.getMultiaddrWithId()); + + const sufficientEvent = await sufficientHealthPromise; + expect(sufficientEvent.status).to.equal(HealthStatus.SufficientlyHealthy); + }); + + it("should emit events only when health status changes", async function () { + [waku, serviceNodes] = await setupTestEnvironment(this.ctx, 2, 2); + + let eventCount = 0; + waku.health.addEventListener("health:overall", () => { + eventCount++; + }); + + // These should not trigger health changes as we're already at max health + await waku.lightPush.send(TestEncoder, messagePayload); + await waku.filter.subscribe([TestDecoder], () => {}); + + // Give events time to propagate + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(eventCount).to.equal(0); + }); +}); + function getExpectedProtocolStatus(peerCount: number): HealthStatus { if (peerCount === 0) return HealthStatus.Unhealthy; if (peerCount === 1) return HealthStatus.MinimallyHealthy; diff --git a/packages/tests/tests/health-manager/protocols.spec.ts b/packages/tests/tests/health-manager/protocols.spec.ts index 95d4ec5d71..3246090f15 100644 --- a/packages/tests/tests/health-manager/protocols.spec.ts +++ b/packages/tests/tests/health-manager/protocols.spec.ts @@ -9,6 +9,7 @@ import { } from "../../src/index.js"; import { + createHealthEventPromise, messagePayload, TestDecoder, TestEncoder, @@ -91,4 +92,106 @@ describe.skip("Health Manager", function () { }); }); }); + + describe.only("Health Manager Events", function () { + this.timeout(10000); + + it("should emit protocol health events", async function () { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestShardInfo, + { lightpush: true }, + undefined, + 2 + ); + + const eventPromise = createHealthEventPromise(waku, "health:protocol"); + + // Trigger a protocol health update + await waku.lightPush.send(TestEncoder, messagePayload); + + const event = await eventPromise; + expect(event.type).to.equal("health:protocol"); + expect(event.protocol).to.equal(Protocols.LightPush); + expect(event.status).to.equal(HealthStatus.SufficientlyHealthy); + expect(event.timestamp).to.be.instanceOf(Date); + }); + + it("should emit overall health events", async function () { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestShardInfo, + { lightpush: true, filter: true }, + undefined, + 2 + ); + + const eventPromise = createHealthEventPromise(waku, "health:overall"); + + // Trigger health updates + await waku.lightPush.send(TestEncoder, messagePayload); + await waku.filter.subscribe([TestDecoder], () => {}); + + const event = await eventPromise; + expect(event.type).to.equal("health:overall"); + expect(event.status).to.equal(HealthStatus.SufficientlyHealthy); + expect(event.timestamp).to.be.instanceOf(Date); + }); + + it("should allow multiple listeners for the same event type", async function () { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestShardInfo, + { lightpush: true }, + undefined, + 1 + ); + + let listener1Called = false; + let listener2Called = false; + + waku.health.addEventListener("health:protocol", () => { + listener1Called = true; + }); + waku.health.addEventListener("health:protocol", () => { + listener2Called = true; + }); + + await waku.lightPush.send(TestEncoder, messagePayload); + + // Give events time to propagate + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(listener1Called).to.be.true; + expect(listener2Called).to.be.true; + }); + + it("should properly remove event listeners", async function () { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestShardInfo, + { lightpush: true }, + undefined, + 1 + ); + let callCount = 0; + const listener = (): void => { + callCount++; + }; + + waku.health.addEventListener("health:protocol", listener); + await waku.lightPush.send(TestEncoder, messagePayload); + + // Give first event time to propagate + await new Promise((resolve) => setTimeout(resolve, 100)); + + waku.health.removeEventListener("health:protocol", listener); + await waku.lightPush.send(TestEncoder, messagePayload); + + // Give second event time to propagate + await new Promise((resolve) => setTimeout(resolve, 100)); + + expect(callCount).to.equal(1); + }); + }); }); diff --git a/packages/tests/tests/health-manager/utils.ts b/packages/tests/tests/health-manager/utils.ts index 564e4cfdf4..b3cf74d67f 100644 --- a/packages/tests/tests/health-manager/utils.ts +++ b/packages/tests/tests/health-manager/utils.ts @@ -1,4 +1,6 @@ import { createDecoder, createEncoder } from "@waku/core"; +import { LightNode } from "@waku/interfaces"; +import { HealthEvent, HealthEventType, HealthStatus } from "@waku/interfaces"; import { utf8ToBytes } from "@waku/sdk"; import { contentTopicToPubsubTopic } from "@waku/utils"; @@ -19,3 +21,43 @@ export const TestEncoder = createEncoder({ export const TestDecoder = createDecoder(TestContentTopic, TestPubsubTopic); export const messageText = "Filtering works!"; export const messagePayload = { payload: utf8ToBytes(messageText) }; + +export function createHealthEventPromise( + waku: LightNode, + eventType: HealthEventType +): Promise { + return new Promise((resolve) => { + const handler = (event: HealthEvent): void => { + waku.health.removeEventListener(eventType, handler); + resolve(event); + }; + waku.health.addEventListener(eventType, handler); + }); +} + +export function waitForHealthStatus( + waku: LightNode, + expectedStatus: HealthStatus +): Promise { + return new Promise((resolve) => { + // Check current status first + const currentStatus = waku.health.getHealthStatus(); + if (currentStatus === expectedStatus) { + resolve({ + type: "health:overall", + status: expectedStatus, + timestamp: new Date() + }); + return; + } + + // Otherwise wait for the event + const handler = (event: HealthEvent): void => { + if (event.status === expectedStatus) { + waku.health.removeEventListener("health:overall", handler); + resolve(event); + } + }; + waku.health.addEventListener("health:overall", handler); + }); +}