chore: add tests

This commit is contained in:
Danish Arora 2024-12-16 19:52:41 +05:30
parent 32b9f83bbd
commit dc7ea365c6
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
4 changed files with 229 additions and 1 deletions

View File

@ -8,13 +8,16 @@ import {
type ProtocolHealth, type ProtocolHealth,
Protocols Protocols
} from "@waku/interfaces"; } from "@waku/interfaces";
import { Logger } from "@waku/utils";
class HealthManager implements IHealthManager { class HealthManager implements IHealthManager {
public static instance: HealthManager; public static instance: HealthManager;
private readonly health: NodeHealth; private readonly health: NodeHealth;
private listeners: Map<HealthEventType, Set<HealthListener>>; private listeners: Map<HealthEventType, Set<HealthListener>>;
private log: Logger;
private constructor() { private constructor() {
this.log = new Logger("health-manager");
this.health = { this.health = {
overallStatus: HealthStatus.Unhealthy, overallStatus: HealthStatus.Unhealthy,
protocolStatuses: new Map() protocolStatuses: new Map()
@ -50,6 +53,10 @@ class HealthManager implements IHealthManager {
status = HealthStatus.SufficientlyHealthy; status = HealthStatus.SufficientlyHealthy;
} }
this.log.info(
`Updating protocol health for ${protocol}: ${status} (${connectedPeers} peers)`
);
this.health.protocolStatuses.set(protocol, { this.health.protocolStatuses.set(protocol, {
name: protocol, name: protocol,
status: status, status: status,
@ -96,6 +103,7 @@ class HealthManager implements IHealthManager {
} else if (multicodec.includes("store")) { } else if (multicodec.includes("store")) {
name = Protocols.Store; name = Protocols.Store;
} else { } else {
this.log.error(`Unknown protocol multicodec: ${multicodec}`);
throw new Error(`Unknown protocol: ${multicodec}`); throw new Error(`Unknown protocol: ${multicodec}`);
} }
return name; return name;
@ -119,6 +127,9 @@ class HealthManager implements IHealthManager {
} }
if (this.health.overallStatus !== newStatus) { if (this.health.overallStatus !== newStatus) {
this.log.info(
`Overall health status changed from ${this.health.overallStatus} to ${newStatus}`
);
this.health.overallStatus = newStatus; this.health.overallStatus = newStatus;
this.emitEvent({ this.emitEvent({
type: "health:overall", type: "health:overall",

View File

@ -1,6 +1,6 @@
import { HealthStatus, IWaku, LightNode, Protocols } from "@waku/interfaces"; import { HealthStatus, IWaku, LightNode, Protocols } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk"; import { createLightNode } from "@waku/sdk";
import { shardInfoToPubsubTopics } from "@waku/utils"; import { delay, shardInfoToPubsubTopics } from "@waku/utils";
import { expect } from "chai"; import { expect } from "chai";
import { import {
@ -16,6 +16,7 @@ import {
TestEncoder, TestEncoder,
TestShardInfo TestShardInfo
} from "./utils.js"; } from "./utils.js";
import { waitForHealthStatus } from "./utils.js";
// TODO(weboko): resolve https://github.com/waku-org/js-waku/issues/2186 // TODO(weboko): resolve https://github.com/waku-org/js-waku/issues/2186
describe.skip("Node Health Status Matrix Tests", function () { describe.skip("Node Health Status Matrix Tests", function () {
@ -79,6 +80,77 @@ describe.skip("Node Health Status Matrix Tests", function () {
}); });
}); });
describe.only("Node Health Status Transitions", function () {
let waku: LightNode;
let serviceNodes: ServiceNode[];
afterEachCustom(this, async function () {
if (waku) {
await waku.stop();
}
if (serviceNodes) {
await Promise.all(serviceNodes.map((node) => node.stop()));
}
});
it.only("should transition through health states with events", async function () {
// Start with no peers
[waku, serviceNodes] = await setupTestEnvironment(this.ctx, 0, 0);
expect(waku.health.getHealthStatus()).to.equal(HealthStatus.Unhealthy);
// Add one peer for minimal health
const minimalNode = await runNodeWithProtocols(true, true);
serviceNodes.push(minimalNode);
await waku.dial(await minimalNode.getMultiaddrWithId());
await delay(5000);
console.log("waiting for minimal health status");
const minimalHealthPromiseEvent = await waitForHealthStatus(
waku,
HealthStatus.MinimallyHealthy
);
console.log("minimalHealthPromiseEvent", minimalHealthPromiseEvent);
expect(minimalHealthPromiseEvent.status).to.equal(
HealthStatus.MinimallyHealthy
);
console.log("minimalHealthPromiseEvent", minimalHealthPromiseEvent);
// Add second peer for sufficient health
const sufficientNode = await runNodeWithProtocols(true, true);
serviceNodes.push(sufficientNode);
const sufficientHealthPromise = waitForHealthStatus(
waku,
HealthStatus.SufficientlyHealthy
);
await waku.dial(await sufficientNode.getMultiaddrWithId());
const sufficientEvent = await sufficientHealthPromise;
expect(sufficientEvent.status).to.equal(HealthStatus.SufficientlyHealthy);
});
it("should emit events only when health status changes", async function () {
[waku, serviceNodes] = await setupTestEnvironment(this.ctx, 2, 2);
let eventCount = 0;
waku.health.addEventListener("health:overall", () => {
eventCount++;
});
// These should not trigger health changes as we're already at max health
await waku.lightPush.send(TestEncoder, messagePayload);
await waku.filter.subscribe([TestDecoder], () => {});
// Give events time to propagate
await new Promise((resolve) => setTimeout(resolve, 100));
expect(eventCount).to.equal(0);
});
});
function getExpectedProtocolStatus(peerCount: number): HealthStatus { function getExpectedProtocolStatus(peerCount: number): HealthStatus {
if (peerCount === 0) return HealthStatus.Unhealthy; if (peerCount === 0) return HealthStatus.Unhealthy;
if (peerCount === 1) return HealthStatus.MinimallyHealthy; if (peerCount === 1) return HealthStatus.MinimallyHealthy;

View File

@ -9,6 +9,7 @@ import {
} from "../../src/index.js"; } from "../../src/index.js";
import { import {
createHealthEventPromise,
messagePayload, messagePayload,
TestDecoder, TestDecoder,
TestEncoder, TestEncoder,
@ -91,4 +92,106 @@ describe.skip("Health Manager", function () {
}); });
}); });
}); });
describe.only("Health Manager Events", function () {
this.timeout(10000);
it("should emit protocol health events", async function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
{ lightpush: true },
undefined,
2
);
const eventPromise = createHealthEventPromise(waku, "health:protocol");
// Trigger a protocol health update
await waku.lightPush.send(TestEncoder, messagePayload);
const event = await eventPromise;
expect(event.type).to.equal("health:protocol");
expect(event.protocol).to.equal(Protocols.LightPush);
expect(event.status).to.equal(HealthStatus.SufficientlyHealthy);
expect(event.timestamp).to.be.instanceOf(Date);
});
it("should emit overall health events", async function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
{ lightpush: true, filter: true },
undefined,
2
);
const eventPromise = createHealthEventPromise(waku, "health:overall");
// Trigger health updates
await waku.lightPush.send(TestEncoder, messagePayload);
await waku.filter.subscribe([TestDecoder], () => {});
const event = await eventPromise;
expect(event.type).to.equal("health:overall");
expect(event.status).to.equal(HealthStatus.SufficientlyHealthy);
expect(event.timestamp).to.be.instanceOf(Date);
});
it("should allow multiple listeners for the same event type", async function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
{ lightpush: true },
undefined,
1
);
let listener1Called = false;
let listener2Called = false;
waku.health.addEventListener("health:protocol", () => {
listener1Called = true;
});
waku.health.addEventListener("health:protocol", () => {
listener2Called = true;
});
await waku.lightPush.send(TestEncoder, messagePayload);
// Give events time to propagate
await new Promise((resolve) => setTimeout(resolve, 100));
expect(listener1Called).to.be.true;
expect(listener2Called).to.be.true;
});
it("should properly remove event listeners", async function () {
[serviceNodes, waku] = await runMultipleNodes(
this.ctx,
TestShardInfo,
{ lightpush: true },
undefined,
1
);
let callCount = 0;
const listener = (): void => {
callCount++;
};
waku.health.addEventListener("health:protocol", listener);
await waku.lightPush.send(TestEncoder, messagePayload);
// Give first event time to propagate
await new Promise((resolve) => setTimeout(resolve, 100));
waku.health.removeEventListener("health:protocol", listener);
await waku.lightPush.send(TestEncoder, messagePayload);
// Give second event time to propagate
await new Promise((resolve) => setTimeout(resolve, 100));
expect(callCount).to.equal(1);
});
});
}); });

View File

@ -1,4 +1,6 @@
import { createDecoder, createEncoder } from "@waku/core"; import { createDecoder, createEncoder } from "@waku/core";
import { LightNode } from "@waku/interfaces";
import { HealthEvent, HealthEventType, HealthStatus } from "@waku/interfaces";
import { utf8ToBytes } from "@waku/sdk"; import { utf8ToBytes } from "@waku/sdk";
import { contentTopicToPubsubTopic } from "@waku/utils"; import { contentTopicToPubsubTopic } from "@waku/utils";
@ -19,3 +21,43 @@ export const TestEncoder = createEncoder({
export const TestDecoder = createDecoder(TestContentTopic, TestPubsubTopic); export const TestDecoder = createDecoder(TestContentTopic, TestPubsubTopic);
export const messageText = "Filtering works!"; export const messageText = "Filtering works!";
export const messagePayload = { payload: utf8ToBytes(messageText) }; export const messagePayload = { payload: utf8ToBytes(messageText) };
export function createHealthEventPromise(
waku: LightNode,
eventType: HealthEventType
): Promise<HealthEvent> {
return new Promise((resolve) => {
const handler = (event: HealthEvent): void => {
waku.health.removeEventListener(eventType, handler);
resolve(event);
};
waku.health.addEventListener(eventType, handler);
});
}
export function waitForHealthStatus(
waku: LightNode,
expectedStatus: HealthStatus
): Promise<HealthEvent> {
return new Promise((resolve) => {
// Check current status first
const currentStatus = waku.health.getHealthStatus();
if (currentStatus === expectedStatus) {
resolve({
type: "health:overall",
status: expectedStatus,
timestamp: new Date()
});
return;
}
// Otherwise wait for the event
const handler = (event: HealthEvent): void => {
if (event.status === expectedStatus) {
waku.health.removeEventListener("health:overall", handler);
resolve(event);
}
};
waku.health.addEventListener("health:overall", handler);
});
}