diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index af6d03b3c5..7ce22bfdc2 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -21,6 +21,8 @@ export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js"; export { ConnectionManager } from "./lib/connection_manager.js"; +export { getHealthManager } from "./lib/health_manager.js"; + export { KeepAliveManager } from "./lib/keep_alive_manager.js"; export { StreamManager } from "./lib/stream_manager/index.js"; diff --git a/packages/core/src/lib/health_manager.ts b/packages/core/src/lib/health_manager.ts new file mode 100644 index 0000000000..c41cf7950f --- /dev/null +++ b/packages/core/src/lib/health_manager.ts @@ -0,0 +1,90 @@ +import { + HealthStatus, + type IHealthManager, + NodeHealth, + type ProtocolHealth, + Protocols +} from "@waku/interfaces"; + +class HealthManager implements IHealthManager { + public static instance: HealthManager; + private readonly health: NodeHealth; + + private constructor() { + this.health = { + overallStatus: HealthStatus.Unhealthy, + protocolStatuses: new Map() + }; + } + + public static getInstance(): HealthManager { + if (!HealthManager.instance) { + HealthManager.instance = new HealthManager(); + } + return HealthManager.instance; + } + + public getHealthStatus(): HealthStatus { + return this.health.overallStatus; + } + + public getProtocolStatus(protocol: Protocols): ProtocolHealth | undefined { + return this.health.protocolStatuses.get(protocol); + } + + public updateProtocolHealth( + multicodec: string, + connectedPeers: number + ): void { + const protocol = this.getNameFromMulticodec(multicodec); + + let status: HealthStatus = HealthStatus.Unhealthy; + if (connectedPeers == 1) { + status = HealthStatus.MinimallyHealthy; + } else if (connectedPeers >= 2) { + status = HealthStatus.SufficientlyHealthy; + } + + this.health.protocolStatuses.set(protocol, { + name: protocol, + status: status, + lastUpdate: new Date() + }); + + this.updateOverallHealth(); + } + + private getNameFromMulticodec(multicodec: string): Protocols { + let name: Protocols; + if (multicodec.includes("filter")) { + name = Protocols.Filter; + } else if (multicodec.includes("lightpush")) { + name = Protocols.LightPush; + } else if (multicodec.includes("store")) { + name = Protocols.Store; + } else { + throw new Error(`Unknown protocol: ${multicodec}`); + } + return name; + } + + private updateOverallHealth(): void { + const relevantProtocols = [Protocols.LightPush, Protocols.Filter]; + const statuses = relevantProtocols.map( + (p) => this.getProtocolStatus(p)?.status + ); + + if (statuses.some((status) => status === HealthStatus.Unhealthy)) { + this.health.overallStatus = HealthStatus.Unhealthy; + } else if ( + statuses.some((status) => status === HealthStatus.MinimallyHealthy) + ) { + this.health.overallStatus = HealthStatus.MinimallyHealthy; + } else { + this.health.overallStatus = HealthStatus.SufficientlyHealthy; + } + } +} + +export const getHealthManager = (): HealthManager => + HealthManager.getInstance(); diff --git a/packages/interfaces/src/health_manager.ts b/packages/interfaces/src/health_manager.ts new file mode 100644 index 0000000000..d26354cae2 --- /dev/null +++ b/packages/interfaces/src/health_manager.ts @@ -0,0 +1,26 @@ +import { Protocols } from "./protocols"; + +export enum HealthStatus { + Unhealthy = "Unhealthy", + MinimallyHealthy = "MinimallyHealthy", + SufficientlyHealthy = "SufficientlyHealthy" +} + +export interface IHealthManager { + getHealthStatus: () => HealthStatus; + getProtocolStatus: (protocol: Protocols) => ProtocolHealth | undefined; + updateProtocolHealth: (multicodec: string, connectedPeers: number) => void; +} + +export type NodeHealth = { + overallStatus: HealthStatus; + protocolStatuses: ProtocolsHealthStatus; +}; + +export type ProtocolHealth = { + name: Protocols; + status: HealthStatus; + lastUpdate: Date; +}; + +export type ProtocolsHealthStatus = Map; diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 1d98ed24d4..44db3dcc62 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -17,3 +17,4 @@ export * from "./dns_discovery.js"; export * from "./metadata.js"; export * from "./constants.js"; export * from "./local_storage.js"; +export * from "./health_manager.js"; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 0c02a378ec..a9bdeda2e1 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -25,8 +25,8 @@ export type IBaseProtocolCore = { }; export type IBaseProtocolSDK = { - renewPeer: (peerToDisconnect: PeerId) => Promise; readonly connectedPeers: Peer[]; + renewPeer: (peerToDisconnect: PeerId) => Promise; readonly numPeersToUse: number; }; diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 5b54b0080a..c1eb1c3a0e 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -3,6 +3,7 @@ import type { MultiaddrInput } from "@multiformats/multiaddr"; import { IConnectionManager } from "./connection_manager.js"; import type { IFilterSDK } from "./filter.js"; +import { IHealthManager } from "./health_manager.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPushSDK } from "./light_push.js"; import { Protocols } from "./protocols.js"; @@ -27,6 +28,8 @@ export interface Waku { isStarted(): boolean; isConnected(): boolean; + + health: IHealthManager; } export interface LightNode extends Waku { diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 98b72327d0..8b6343a895 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -1,7 +1,11 @@ import type { Peer, PeerId } from "@libp2p/interface"; -import { ConnectionManager } from "@waku/core"; +import { ConnectionManager, getHealthManager } from "@waku/core"; import { BaseProtocol } from "@waku/core/lib/base_protocol"; -import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces"; +import { + IBaseProtocolSDK, + IHealthManager, + ProtocolUseOptions +} from "@waku/interfaces"; import { delay, Logger } from "@waku/utils"; interface Options { @@ -14,6 +18,7 @@ const DEFAULT_NUM_PEERS_TO_USE = 3; const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; export class BaseProtocolSDK implements IBaseProtocolSDK { + private healthManager: IHealthManager; public readonly numPeersToUse: number; private peers: Peer[] = []; private maintainPeersIntervalId: ReturnType< @@ -32,6 +37,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { options: Options ) { this.log = new Logger(`sdk:${core.multicodec}`); + + this.healthManager = getHealthManager(); + this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; const maintainPeersInterval = options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL; @@ -60,7 +68,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { ); } - this.peers = this.peers.filter((peer) => !peer.id.equals(peerToDisconnect)); + const updatedPeers = this.peers.filter( + (peer) => !peer.id.equals(peerToDisconnect) + ); + this.updatePeers(updatedPeers); + this.log.info( `Peer ${peerToDisconnect} disconnected and removed from the peer list` ); @@ -192,7 +204,9 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { await Promise.all(dials); - this.peers = [...this.peers, ...additionalPeers]; + const updatedPeers = [...this.peers, ...additionalPeers]; + this.updatePeers(updatedPeers); + this.log.info( `Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}` ); @@ -232,6 +246,14 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { throw error; } } + + private updatePeers(peers: Peer[]): void { + this.peers = peers; + this.healthManager.updateProtocolHealth( + this.core.multicodec, + this.peers.length + ); + } } class RenewPeerLocker { diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index bfd124f906..95977419f4 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -1,9 +1,10 @@ import type { Stream } from "@libp2p/interface"; import { isPeerId, PeerId } from "@libp2p/interface"; import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr"; -import { ConnectionManager } from "@waku/core"; +import { ConnectionManager, getHealthManager } from "@waku/core"; import type { IFilterSDK, + IHealthManager, ILightPushSDK, IRelay, IStoreSDK, @@ -68,6 +69,7 @@ export class WakuNode implements Waku { public lightPush?: ILightPushSDK; public connectionManager: ConnectionManager; public readonly pubsubTopics: PubsubTopic[]; + public readonly health: IHealthManager; public constructor( options: WakuOptions, @@ -105,6 +107,8 @@ export class WakuNode implements Waku { this.relay ); + this.health = getHealthManager(); + if (protocolsEnabled.store) { const store = wakuStore(this.connectionManager, options); this.store = store(libp2p); diff --git a/packages/tests/src/utils/index.ts b/packages/tests/src/utils/index.ts index 1024c0d884..1347e2d7e9 100644 --- a/packages/tests/src/utils/index.ts +++ b/packages/tests/src/utils/index.ts @@ -7,3 +7,4 @@ export * from "./base64_utf8.js"; export * from "./waitForConnections.js"; export * from "./custom_mocha_hooks.js"; export * from "./waku_versions_utils.js"; +export * from "./nodes.js"; diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts new file mode 100644 index 0000000000..7d11ef8e12 --- /dev/null +++ b/packages/tests/src/utils/nodes.ts @@ -0,0 +1,115 @@ +import { waitForRemotePeer } from "@waku/core"; +import { + LightNode, + ProtocolCreateOptions, + Protocols, + ShardingParams, + Waku +} from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { isDefined, shardInfoToPubsubTopics } from "@waku/utils"; +import { Context } from "mocha"; +import pRetry from "p-retry"; + +import { DefaultTestPubsubTopic, NOISE_KEY_1 } from "../constants"; +import { ServiceNodesFleet } from "../lib"; +import { Args } from "../types"; + +import { waitForConnections } from "./waitForConnections"; + +export async function runMultipleNodes( + context: Context, + shardInfo?: ShardingParams, + customArgs?: Args, + strictChecking: boolean = false, + numServiceNodes = 3, + withoutFilter = false +): Promise<[ServiceNodesFleet, LightNode]> { + const pubsubTopics = shardInfo + ? shardInfoToPubsubTopics(shardInfo) + : [DefaultTestPubsubTopic]; + // create numServiceNodes nodes + const serviceNodes = await ServiceNodesFleet.createAndRun( + context, + pubsubTopics, + numServiceNodes, + strictChecking, + shardInfo, + customArgs, + withoutFilter + ); + + const wakuOptions: ProtocolCreateOptions = { + staticNoiseKey: NOISE_KEY_1, + libp2p: { + addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } + } + }; + + if (shardInfo) { + wakuOptions.shardInfo = shardInfo; + } else { + wakuOptions.pubsubTopics = pubsubTopics; + } + + const waku = await createLightNode(wakuOptions); + await waku.start(); + + if (!waku) { + throw new Error("Failed to initialize waku"); + } + + for (const node of serviceNodes.nodes) { + await waku.dial(await node.getMultiaddrWithId()); + await waitForRemotePeer( + waku, + [ + !customArgs?.filter ? undefined : Protocols.Filter, + !customArgs?.lightpush ? undefined : Protocols.LightPush + ].filter(isDefined) + ); + await node.ensureSubscriptions(pubsubTopics); + + const wakuConnections = waku.libp2p.getConnections(); + const nodePeers = await node.peers(); + + if (wakuConnections.length < 1 || nodePeers.length < 1) { + throw new Error( + `Expected at least 1 peer in each node. Got waku connections: ${wakuConnections.length} and service nodes: ${nodePeers.length}` + ); + } + } + + await waitForConnections(numServiceNodes, waku); + + return [serviceNodes, waku]; +} + +export async function teardownNodesWithRedundancy( + serviceNodes: ServiceNodesFleet, + wakuNodes: Waku | Waku[] +): Promise { + const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes]; + + const stopNwakuNodes = serviceNodes.nodes.map(async (node) => { + await pRetry( + async () => { + await node.stop(); + }, + { retries: 3 } + ); + }); + + const stopWakuNodes = wNodes.map(async (waku) => { + if (waku) { + await pRetry( + async () => { + await waku.stop(); + }, + { retries: 3 } + ); + } + }); + + await Promise.all([...stopNwakuNodes, ...stopWakuNodes]); +} diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 0e0963bf3d..f78d83f69e 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -18,13 +18,11 @@ import { beforeEachCustom, DefaultTestPubsubTopic, DefaultTestShardInfo, - ServiceNode, - ServiceNodesFleet -} from "../../src/index.js"; -import { runMultipleNodes, + ServiceNode, + ServiceNodesFleet, teardownNodesWithRedundancy -} from "../filter/utils.js"; +} from "../../src/index.js"; describe("Waku Filter: Peer Management: E2E", function () { this.timeout(15000); @@ -46,6 +44,7 @@ describe("Waku Filter: Peer Management: E2E", function () { this.ctx, DefaultTestShardInfo, undefined, + undefined, 5 ); const { error, subscription: sub } = await waku.filter.createSubscription( @@ -186,6 +185,7 @@ describe("Waku Filter: Peer Management: E2E", function () { this.ctx, DefaultTestShardInfo, undefined, + undefined, 2 ); const serviceNodesPeerIdStr = await Promise.all( diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index 68287b54de..c6e3e11a2b 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -5,12 +5,12 @@ import { expect } from "chai"; import { afterEachCustom, beforeEachCustom, - ServiceNodesFleet + runMultipleNodes, + ServiceNodesFleet, + teardownNodesWithRedundancy } from "../../src/index.js"; import { - runMultipleNodes, - teardownNodesWithRedundancy, TestContentTopic, TestDecoder, TestEncoder, diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index 4d78c4530c..0b41585ff9 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -7,15 +7,15 @@ import { afterEachCustom, beforeEachCustom, delay, + runMultipleNodes, ServiceNodesFleet, + teardownNodesWithRedundancy, TEST_STRING, TEST_TIMESTAMPS } from "../../src/index.js"; import { messageText, - runMultipleNodes, - teardownNodesWithRedundancy, TestContentTopic, TestDecoder, TestEncoder, diff --git a/packages/tests/tests/filter/subscribe.node.spec.ts b/packages/tests/tests/filter/subscribe.node.spec.ts index fe6e024d2f..b350643670 100644 --- a/packages/tests/tests/filter/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/subscribe.node.spec.ts @@ -15,15 +15,15 @@ import { beforeEachCustom, delay, generateTestData, + runMultipleNodes, ServiceNodesFleet, + teardownNodesWithRedundancy, TEST_STRING } from "../../src/index.js"; import { messagePayload, messageText, - runMultipleNodes, - teardownNodesWithRedundancy, TestContentTopic, TestDecoder, TestEncoder, @@ -42,6 +42,7 @@ const runTests = (strictCheckNodes: boolean): void => { [serviceNodes, waku] = await runMultipleNodes( this.ctx, TestShardInfo, + undefined, strictCheckNodes ); const { error, subscription: _subscription } = diff --git a/packages/tests/tests/filter/unsubscribe.node.spec.ts b/packages/tests/tests/filter/unsubscribe.node.spec.ts index 9dc362bb08..7f3a74bf11 100644 --- a/packages/tests/tests/filter/unsubscribe.node.spec.ts +++ b/packages/tests/tests/filter/unsubscribe.node.spec.ts @@ -7,15 +7,15 @@ import { afterEachCustom, beforeEachCustom, generateTestData, - ServiceNodesFleet + runMultipleNodes, + ServiceNodesFleet, + teardownNodesWithRedundancy } from "../../src/index.js"; import { ClusterId, messagePayload, messageText, - runMultipleNodes, - teardownNodesWithRedundancy, TestContentTopic, TestDecoder, TestEncoder, diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 05f093dcc4..930f8b4afb 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -22,7 +22,7 @@ import { NOISE_KEY_1, ServiceNodesFleet, waitForConnections -} from "../../src/index.js"; +} from "../../src"; // Constants for test configuration. export const log = new Logger("test:filter"); diff --git a/packages/tests/tests/health-manager/node.spec.ts b/packages/tests/tests/health-manager/node.spec.ts new file mode 100644 index 0000000000..85ef6e3e67 --- /dev/null +++ b/packages/tests/tests/health-manager/node.spec.ts @@ -0,0 +1,146 @@ +import { HealthStatus, LightNode, Protocols } from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { expect } from "chai"; + +import { + afterEachCustom, + runMultipleNodes, + ServiceNode, + ServiceNodesFleet +} from "../../src"; + +import { messagePayload, TestEncoder, TestShardInfo } from "./utils"; + +describe("Node Health Status Matrix Tests", function () { + let waku: LightNode; + let serviceNodes: ServiceNode[]; + + afterEachCustom(this, async function () { + if (waku) { + await waku.stop(); + } + if (serviceNodes) { + await Promise.all(serviceNodes.map((node) => node.stop())); + } + }); + + const peerCounts = [0, 1, 2, 3]; + + peerCounts.forEach((lightPushPeers) => { + peerCounts.forEach((filterPeers) => { + const expectedHealth = getExpectedNodeHealth(lightPushPeers, filterPeers); + it(`LightPush: ${lightPushPeers} peers, Filter: ${filterPeers} peers - Expected: ${expectedHealth}`, async function () { + this.timeout(10_000); + + [waku, serviceNodes] = await setupTestEnvironment( + this.ctx, + lightPushPeers, + filterPeers + ); + + if (lightPushPeers > 0) { + await waku.lightPush.send(TestEncoder, messagePayload, { + forceUseAllPeers: true + }); + } + + if (filterPeers > 0) { + await waku.filter.createSubscription(TestShardInfo); + } + + const lightPushHealth = waku.health.getProtocolStatus( + Protocols.LightPush + ); + const filterHealth = waku.health.getProtocolStatus(Protocols.Filter); + + expect(lightPushHealth?.status).to.equal( + getExpectedProtocolStatus(lightPushPeers) + ); + expect(filterHealth?.status).to.equal( + getExpectedProtocolStatus(filterPeers) + ); + + const nodeHealth = waku.health.getHealthStatus(); + expect(nodeHealth).to.equal(expectedHealth); + }); + }); + }); +}); + +function getExpectedProtocolStatus(peerCount: number): HealthStatus { + if (peerCount === 0) return HealthStatus.Unhealthy; + if (peerCount === 1) return HealthStatus.MinimallyHealthy; + return HealthStatus.SufficientlyHealthy; +} + +function getExpectedNodeHealth( + lightPushPeers: number, + filterPeers: number +): HealthStatus { + if (lightPushPeers === 0 || filterPeers === 0) { + return HealthStatus.Unhealthy; + } else if (lightPushPeers === 1 || filterPeers === 1) { + return HealthStatus.MinimallyHealthy; + } else { + return HealthStatus.SufficientlyHealthy; + } +} + +async function runNodeWithProtocols( + lightPush: boolean, + filter: boolean +): Promise { + const serviceNode = new ServiceNode(`node-${Date.now()}`); + await serviceNode.start({ + lightpush: lightPush, + filter: filter, + relay: true + }); + return serviceNode; +} + +async function setupTestEnvironment( + context: Mocha.Context, + lightPushPeers: number, + filterPeers: number +): Promise<[LightNode, ServiceNode[]]> { + let commonPeers: number; + if (lightPushPeers === 0 || filterPeers === 0) { + commonPeers = Math.max(lightPushPeers, filterPeers); + } else { + commonPeers = Math.min(lightPushPeers, filterPeers); + } + + let waku: LightNode; + const serviceNodes: ServiceNode[] = []; + let serviceNodesFleet: ServiceNodesFleet; + + if (commonPeers > 0) { + [serviceNodesFleet, waku] = await runMultipleNodes( + context, + TestShardInfo, + { filter: true, lightpush: true }, + undefined, + commonPeers + ); + serviceNodes.push(...serviceNodesFleet.nodes); + } else { + waku = await createLightNode({ shardInfo: TestShardInfo }); + } + + // Create additional LightPush nodes if needed + for (let i = commonPeers; i < lightPushPeers; i++) { + const node = await runNodeWithProtocols(true, false); + serviceNodes.push(node); + await waku.dial(await node.getMultiaddrWithId()); + } + + // Create additional Filter nodes if needed + for (let i = commonPeers; i < filterPeers; i++) { + const node = await runNodeWithProtocols(false, true); + serviceNodes.push(node); + await waku.dial(await node.getMultiaddrWithId()); + } + + return [waku, serviceNodes]; +} diff --git a/packages/tests/tests/health-manager/protocols.spec.ts b/packages/tests/tests/health-manager/protocols.spec.ts new file mode 100644 index 0000000000..965753f420 --- /dev/null +++ b/packages/tests/tests/health-manager/protocols.spec.ts @@ -0,0 +1,95 @@ +import { HealthStatus, type LightNode, Protocols } from "@waku/sdk"; +import { expect } from "chai"; + +import { + afterEachCustom, + runMultipleNodes, + ServiceNodesFleet, + teardownNodesWithRedundancy +} from "../../src/index.js"; + +import { + messagePayload, + TestDecoder, + TestEncoder, + TestShardInfo +} from "./utils.js"; + +const NUM_NODES = [0, 1, 2, 3]; + +describe("Health Manager", function () { + this.timeout(10_000); + + let waku: LightNode; + let serviceNodes: ServiceNodesFleet; + + afterEachCustom(this, async () => { + await teardownNodesWithRedundancy(serviceNodes, waku); + }); + + describe("Should update the health status for protocols", () => { + this.timeout(10_000); + + NUM_NODES.map((num) => { + it(`LightPush with ${num} connections`, async function () { + this.timeout(10_000); + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestShardInfo, + undefined, + undefined, + num + ); + + await waku.lightPush.send(TestEncoder, messagePayload); + + const health = waku.health.getProtocolStatus(Protocols.LightPush); + if (!health) { + expect(health).to.not.equal(undefined); + } + + if (num === 0) { + expect(health?.status).to.equal(HealthStatus.Unhealthy); + } else if (num < 2) { + expect(health?.status).to.equal(HealthStatus.MinimallyHealthy); + } else if (num >= 2) { + expect(health?.status).to.equal(HealthStatus.SufficientlyHealthy); + } else { + throw new Error("Invalid number of connections"); + } + }); + it(`Filter with ${num} connections`, async function () { + [serviceNodes, waku] = await runMultipleNodes( + this.ctx, + TestShardInfo, + undefined, + undefined, + num + ); + + const { error, subscription } = + await waku.filter.createSubscription(TestShardInfo); + if (error) { + expect(error).to.not.equal(undefined); + } + + await subscription?.subscribe([TestDecoder], () => {}); + + const health = waku.health.getProtocolStatus(Protocols.Filter); + if (!health) { + expect(health).to.not.equal(undefined); + } + + if (num === 0) { + expect(health?.status).to.equal(HealthStatus.Unhealthy); + } else if (num < 2) { + expect(health?.status).to.equal(HealthStatus.MinimallyHealthy); + } else if (num >= 2) { + expect(health?.status).to.equal(HealthStatus.SufficientlyHealthy); + } else { + throw new Error("Invalid number of connections"); + } + }); + }); + }); +}); diff --git a/packages/tests/tests/health-manager/utils.ts b/packages/tests/tests/health-manager/utils.ts new file mode 100644 index 0000000000..564e4cfdf4 --- /dev/null +++ b/packages/tests/tests/health-manager/utils.ts @@ -0,0 +1,21 @@ +import { createDecoder, createEncoder } from "@waku/core"; +import { utf8ToBytes } from "@waku/sdk"; +import { contentTopicToPubsubTopic } from "@waku/utils"; + +export const TestContentTopic = "/test/1/waku-filter/default"; +export const ClusterId = 2; +export const TestShardInfo = { + contentTopics: [TestContentTopic], + clusterId: ClusterId +}; +export const TestPubsubTopic = contentTopicToPubsubTopic( + TestContentTopic, + ClusterId +); +export const TestEncoder = createEncoder({ + contentTopic: TestContentTopic, + pubsubTopic: TestPubsubTopic +}); +export const TestDecoder = createDecoder(TestContentTopic, TestPubsubTopic); +export const messageText = "Filtering works!"; +export const messagePayload = { payload: utf8ToBytes(messageText) }; diff --git a/packages/tests/tests/light-push/index.node.spec.ts b/packages/tests/tests/light-push/index.node.spec.ts index ae12db79a4..43f6d10970 100644 --- a/packages/tests/tests/light-push/index.node.spec.ts +++ b/packages/tests/tests/light-push/index.node.spec.ts @@ -7,13 +7,11 @@ import { afterEachCustom, beforeEachCustom, generateRandomUint8Array, + runMultipleNodes, ServiceNodesFleet, + teardownNodesWithRedundancy, TEST_STRING } from "../../src"; -import { - runMultipleNodes, - teardownNodesWithRedundancy -} from "../filter/utils.js"; import { messagePayload, @@ -36,6 +34,7 @@ const runTests = (strictNodeCheck: boolean): void => { [serviceNodes, waku] = await runMultipleNodes( this.ctx, TestShardInfo, + undefined, strictNodeCheck, numServiceNodes, true diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts index 59388c1f7f..1c7fa59d44 100644 --- a/packages/tests/tests/light-push/peer_management.spec.ts +++ b/packages/tests/tests/light-push/peer_management.spec.ts @@ -8,13 +8,11 @@ import { beforeEachCustom, DefaultTestShardInfo, DefaultTestSingleShardInfo, - ServiceNodesFleet -} from "../../src/index.js"; -import { runMultipleNodes, - teardownNodesWithRedundancy, - TestContentTopic -} from "../filter/utils.js"; + ServiceNodesFleet, + teardownNodesWithRedundancy +} from "../../src/index.js"; +import { TestContentTopic } from "../filter/utils.js"; describe("Waku Light Push: Peer Management: E2E", function () { this.timeout(15000); @@ -26,6 +24,7 @@ describe("Waku Light Push: Peer Management: E2E", function () { this.ctx, DefaultTestShardInfo, undefined, + undefined, 5 ); });