chore: update HealthManager updates

This commit is contained in:
Danish Arora 2024-09-24 19:12:57 +05:30
parent 8cb3c352ad
commit 1df0556e19
No known key found for this signature in database
GPG Key ID: 1C6EF37CDAE1426E
3 changed files with 16 additions and 4 deletions

View File

@ -10,7 +10,6 @@ interface Options {
numPeersToUse?: number; numPeersToUse?: number;
maintainPeersInterval?: number; maintainPeersInterval?: number;
} }
///TODO: update HealthManager
const DEFAULT_NUM_PEERS_TO_USE = 2; const DEFAULT_NUM_PEERS_TO_USE = 2;
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;

View File

@ -1,11 +1,14 @@
import { Peer, PeerId } from "@libp2p/interface"; import { Peer, PeerId } from "@libp2p/interface";
import { ConnectionManager } from "@waku/core"; import { ConnectionManager, getHealthManager } from "@waku/core";
import { BaseProtocol } from "@waku/core/lib/base_protocol"; import { BaseProtocol } from "@waku/core/lib/base_protocol";
import { IHealthManager } from "@waku/interfaces";
import { Logger } from "@waku/utils"; import { Logger } from "@waku/utils";
import { Mutex } from "async-mutex"; import { Mutex } from "async-mutex";
export class PeerManager { export class PeerManager {
private peers: Map<string, Peer> = new Map(); private peers: Map<string, Peer> = new Map();
private healthManager: IHealthManager;
private readMutex = new Mutex(); private readMutex = new Mutex();
private writeMutex = new Mutex(); private writeMutex = new Mutex();
private writeLockHolder: string | null = null; private writeLockHolder: string | null = null;
@ -14,7 +17,9 @@ export class PeerManager {
private readonly connectionManager: ConnectionManager, private readonly connectionManager: ConnectionManager,
private readonly core: BaseProtocol, private readonly core: BaseProtocol,
private readonly log: Logger private readonly log: Logger
) {} ) {
this.healthManager = getHealthManager();
}
public getWriteLockHolder(): string | null { public getWriteLockHolder(): string | null {
return this.writeLockHolder; return this.writeLockHolder;
@ -30,6 +35,10 @@ export class PeerManager {
await this.connectionManager.attemptDial(peer.id); await this.connectionManager.attemptDial(peer.id);
this.peers.set(peer.id.toString(), peer); this.peers.set(peer.id.toString(), peer);
this.log.info(`Added and dialed peer: ${peer.id.toString()}`); this.log.info(`Added and dialed peer: ${peer.id.toString()}`);
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.size
);
this.writeLockHolder = null; this.writeLockHolder = null;
}); });
} }
@ -39,6 +48,10 @@ export class PeerManager {
this.writeLockHolder = `removePeer: ${peerId.toString()}`; this.writeLockHolder = `removePeer: ${peerId.toString()}`;
this.peers.delete(peerId.toString()); this.peers.delete(peerId.toString());
this.log.info(`Removed peer: ${peerId.toString()}`); this.log.info(`Removed peer: ${peerId.toString()}`);
this.healthManager.updateProtocolHealth(
this.core.multicodec,
this.peers.size
);
this.writeLockHolder = null; this.writeLockHolder = null;
}); });
} }

View File

@ -40,7 +40,7 @@ const runTests = (strictCheckNodes: boolean): void => {
await teardownNodesWithRedundancy(serviceNodes, waku); await teardownNodesWithRedundancy(serviceNodes, waku);
}); });
it("Ping on subscribed peer", async function () { it.only("Ping on subscribed peer", async function () {
const { error, subscription } = await waku.filter.subscribe( const { error, subscription } = await waku.filter.subscribe(
[TestDecoder], [TestDecoder],
serviceNodes.messageCollector.callback serviceNodes.messageCollector.callback