js-waku/packages/sdk/src/peer_manager/peer_manager.ts
Sasha f199d92d60
feat: migrate to latest LightPush version (#2281)
* create retry manager

* update tests

* add retry manager tests, update peer manager

* fix start & merge with master

* return send to many logic

* add new error handling

* add sections to protocol errors

* fix check

* up test

* add waku.start in test

* fix check and test

* improve name
2025-02-25 22:40:03 +01:00

133 lines
3.4 KiB
TypeScript

import { Connection, PeerId } from "@libp2p/interface";
import { Libp2p } from "@waku/interfaces";
import { Logger } from "@waku/utils";
const log = new Logger("peer-manager");
const DEFAULT_NUM_PEERS_TO_USE = 2;
const CONNECTION_LOCK_TAG = "peer-manager-lock";
type PeerManagerConfig = {
numPeersToUse?: number;
};
type PeerManagerParams = {
libp2p: Libp2p;
config?: PeerManagerConfig;
};
export class PeerManager {
private readonly numPeersToUse: number;
private readonly libp2p: Libp2p;
public constructor(params: PeerManagerParams) {
this.onConnected = this.onConnected.bind(this);
this.onDisconnected = this.onDisconnected.bind(this);
this.numPeersToUse =
params?.config?.numPeersToUse || DEFAULT_NUM_PEERS_TO_USE;
this.libp2p = params.libp2p;
}
public start(): void {
this.startConnectionListener();
}
public stop(): void {
this.stopConnectionListener();
}
public getPeers(): PeerId[] {
return this.getLockedConnections().map((c) => c.remotePeer);
}
public requestRenew(peerId: PeerId | string): PeerId | undefined {
const lockedConnections = this.getLockedConnections();
const neededPeers = this.numPeersToUse - lockedConnections.length;
if (neededPeers === 0) {
return;
}
const connections = this.getUnlockedConnections()
.filter((c) => !c.remotePeer.equals(peerId))
.slice(0, neededPeers)
.map((c) => this.lockConnection(c))
.map((c) => c.remotePeer);
const newPeerId = connections[0];
if (!newPeerId) {
log.warn(
`requestRenew: Couldn't renew peer ${peerId.toString()} - no peers.`
);
return;
}
log.info(
`requestRenew: Renewed peer ${peerId.toString()} to ${newPeerId.toString()}`
);
return newPeerId;
}
private startConnectionListener(): void {
this.libp2p.addEventListener("peer:connect", this.onConnected);
this.libp2p.addEventListener("peer:disconnect", this.onDisconnected);
}
private stopConnectionListener(): void {
this.libp2p.removeEventListener("peer:connect", this.onConnected);
this.libp2p.removeEventListener("peer:disconnect", this.onDisconnected);
}
private onConnected(event: CustomEvent<PeerId>): void {
const peerId = event.detail;
this.lockPeerIfNeeded(peerId);
}
private onDisconnected(event: CustomEvent<PeerId>): void {
const peerId = event.detail;
this.requestRenew(peerId);
}
private lockPeerIfNeeded(peerId: PeerId): void {
const lockedConnections = this.getLockedConnections();
const neededPeers = this.numPeersToUse - lockedConnections.length;
if (neededPeers === 0) {
return;
}
this.getUnlockedConnections()
.filter((c) => c.remotePeer.equals(peerId))
.map((c) => this.lockConnection(c));
}
private getLockedConnections(): Connection[] {
return this.libp2p
.getConnections()
.filter((c) => c.status === "open" && this.isConnectionLocked(c));
}
private getUnlockedConnections(): Connection[] {
return this.libp2p
.getConnections()
.filter((c) => c.status === "open" && !this.isConnectionLocked(c));
}
private lockConnection(c: Connection): Connection {
log.info(
`requestRenew: Locking connection for peerId=${c.remotePeer.toString()}`
);
c.tags.push(CONNECTION_LOCK_TAG);
return c;
}
private isConnectionLocked(c: Connection): boolean {
return c.tags.includes(CONNECTION_LOCK_TAG);
}
}