diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 5bfeace6c5..98a5ec3d1d 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -10,7 +10,6 @@ interface Options { numPeersToUse?: number; maintainPeersInterval?: number; } -///TODO: update HealthManager const DEFAULT_NUM_PEERS_TO_USE = 2; const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; diff --git a/packages/sdk/src/protocols/peer_manager.ts b/packages/sdk/src/protocols/peer_manager.ts index db773b1dc3..3912e27bb6 100644 --- a/packages/sdk/src/protocols/peer_manager.ts +++ b/packages/sdk/src/protocols/peer_manager.ts @@ -1,11 +1,14 @@ import { Peer, PeerId } from "@libp2p/interface"; -import { ConnectionManager } from "@waku/core"; +import { ConnectionManager, getHealthManager } from "@waku/core"; import { BaseProtocol } from "@waku/core/lib/base_protocol"; +import { IHealthManager } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { Mutex } from "async-mutex"; export class PeerManager { private peers: Map = new Map(); + private healthManager: IHealthManager; + private readMutex = new Mutex(); private writeMutex = new Mutex(); private writeLockHolder: string | null = null; @@ -14,7 +17,9 @@ export class PeerManager { private readonly connectionManager: ConnectionManager, private readonly core: BaseProtocol, private readonly log: Logger - ) {} + ) { + this.healthManager = getHealthManager(); + } public getWriteLockHolder(): string | null { return this.writeLockHolder; @@ -30,6 +35,10 @@ export class PeerManager { await this.connectionManager.attemptDial(peer.id); this.peers.set(peer.id.toString(), peer); this.log.info(`Added and dialed peer: ${peer.id.toString()}`); + this.healthManager.updateProtocolHealth( + this.core.multicodec, + this.peers.size + ); this.writeLockHolder = null; }); } @@ -39,6 +48,10 @@ export class PeerManager { this.writeLockHolder = `removePeer: ${peerId.toString()}`; this.peers.delete(peerId.toString()); this.log.info(`Removed peer: ${peerId.toString()}`); + this.healthManager.updateProtocolHealth( + this.core.multicodec, + this.peers.size + ); this.writeLockHolder = null; }); } diff --git a/packages/tests/tests/filter/ping.node.spec.ts b/packages/tests/tests/filter/ping.node.spec.ts index 7008b9bda8..d4a7b2f52d 100644 --- a/packages/tests/tests/filter/ping.node.spec.ts +++ b/packages/tests/tests/filter/ping.node.spec.ts @@ -40,7 +40,7 @@ const runTests = (strictCheckNodes: boolean): void => { await teardownNodesWithRedundancy(serviceNodes, waku); }); - it("Ping on subscribed peer", async function () { + it.only("Ping on subscribed peer", async function () { const { error, subscription } = await waku.filter.subscribe( [TestDecoder], serviceNodes.messageCollector.callback