mirror of
https://github.com/waku-org/js-waku.git
synced 2025-02-22 09:08:19 +00:00
feat: peer manager
This commit is contained in:
parent
558fb6ae2e
commit
82baa26841
@ -5,12 +5,8 @@ import type {
|
|||||||
Libp2pComponents,
|
Libp2pComponents,
|
||||||
PubsubTopic
|
PubsubTopic
|
||||||
} from "@waku/interfaces";
|
} from "@waku/interfaces";
|
||||||
import { Logger, pubsubTopicsToShardInfo } from "@waku/utils";
|
import { Logger } from "@waku/utils";
|
||||||
import {
|
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";
|
||||||
getConnectedPeersForProtocolAndShard,
|
|
||||||
getPeersForProtocol,
|
|
||||||
sortPeersByLatency
|
|
||||||
} from "@waku/utils/libp2p";
|
|
||||||
|
|
||||||
import { filterPeersByDiscovery } from "./filterPeers.js";
|
import { filterPeersByDiscovery } from "./filterPeers.js";
|
||||||
import { StreamManager } from "./stream_manager/index.js";
|
import { StreamManager } from "./stream_manager/index.js";
|
||||||
@ -77,9 +73,8 @@ export class BaseProtocol implements IBaseProtocolCore {
|
|||||||
*
|
*
|
||||||
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
|
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
|
||||||
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
|
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
|
||||||
|
* @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap.
|
||||||
* @returns A list of peers that support the protocol sorted by latency.
|
*/
|
||||||
*/
|
|
||||||
public async getPeers(
|
public async getPeers(
|
||||||
{
|
{
|
||||||
numPeers,
|
numPeers,
|
||||||
@ -88,22 +83,16 @@ export class BaseProtocol implements IBaseProtocolCore {
|
|||||||
numPeers: number;
|
numPeers: number;
|
||||||
maxBootstrapPeers: number;
|
maxBootstrapPeers: number;
|
||||||
} = {
|
} = {
|
||||||
maxBootstrapPeers: 1,
|
maxBootstrapPeers: 0,
|
||||||
numPeers: 0
|
numPeers: 0
|
||||||
}
|
}
|
||||||
): Promise<Peer[]> {
|
): Promise<Peer[]> {
|
||||||
// Retrieve all connected peers that support the protocol & shard (if configured)
|
// Retrieve all connected peers that support the protocol & shard (if configured)
|
||||||
const connectedPeersForProtocolAndShard =
|
const allAvailableConnectedPeers = await this.connectedPeers();
|
||||||
await getConnectedPeersForProtocolAndShard(
|
|
||||||
this.components.connectionManager.getConnections(),
|
|
||||||
this.components.peerStore,
|
|
||||||
[this.multicodec],
|
|
||||||
pubsubTopicsToShardInfo(this.pubsubTopics)
|
|
||||||
);
|
|
||||||
|
|
||||||
// Filter the peers based on discovery & number of peers requested
|
// Filter the peers based on discovery & number of peers requested
|
||||||
const filteredPeers = filterPeersByDiscovery(
|
const filteredPeers = filterPeersByDiscovery(
|
||||||
connectedPeersForProtocolAndShard,
|
allAvailableConnectedPeers,
|
||||||
numPeers,
|
numPeers,
|
||||||
maxBootstrapPeers
|
maxBootstrapPeers
|
||||||
);
|
);
|
||||||
|
@ -1,39 +1,28 @@
|
|||||||
import type { Peer, PeerId } from "@libp2p/interface";
|
import type { Peer, PeerId } from "@libp2p/interface";
|
||||||
import { ConnectionManager, getHealthManager } from "@waku/core";
|
import { ConnectionManager } from "@waku/core";
|
||||||
import { BaseProtocol } from "@waku/core/lib/base_protocol";
|
import { BaseProtocol } from "@waku/core/lib/base_protocol";
|
||||||
import {
|
import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces";
|
||||||
IBaseProtocolSDK,
|
import { Logger } from "@waku/utils";
|
||||||
IHealthManager,
|
|
||||||
PeerIdStr,
|
import { PeerManager } from "./peer_manager.js";
|
||||||
ProtocolUseOptions
|
|
||||||
} from "@waku/interfaces";
|
|
||||||
import { delay, Logger } from "@waku/utils";
|
|
||||||
import { Mutex } from "async-mutex";
|
|
||||||
|
|
||||||
interface Options {
|
interface Options {
|
||||||
numPeersToUse?: number;
|
numPeersToUse?: number;
|
||||||
maintainPeersInterval?: number;
|
maintainPeersInterval?: number;
|
||||||
}
|
}
|
||||||
|
///TODO: update HealthManager
|
||||||
|
|
||||||
const RENEW_TIME_LOCK_DURATION = 30 * 1000;
|
const DEFAULT_NUM_PEERS_TO_USE = 2;
|
||||||
export const DEFAULT_NUM_PEERS_TO_USE = 2;
|
|
||||||
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;
|
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;
|
||||||
|
|
||||||
export class BaseProtocolSDK implements IBaseProtocolSDK {
|
export class BaseProtocolSDK implements IBaseProtocolSDK {
|
||||||
protected healthManager: IHealthManager;
|
private peerManager: PeerManager;
|
||||||
public readonly numPeersToUse: number;
|
public readonly numPeersToUse: number;
|
||||||
private peers: Map<PeerIdStr, Peer> = new Map();
|
|
||||||
private maintainPeersIntervalId: ReturnType<
|
private maintainPeersIntervalId: ReturnType<
|
||||||
typeof window.setInterval
|
typeof window.setInterval
|
||||||
> | null = null;
|
> | null = null;
|
||||||
private log: Logger;
|
private log: Logger;
|
||||||
|
|
||||||
private readonly renewPeersLocker = new RenewPeerLocker(
|
|
||||||
RENEW_TIME_LOCK_DURATION
|
|
||||||
);
|
|
||||||
|
|
||||||
private peersMutex = new Mutex();
|
|
||||||
|
|
||||||
public constructor(
|
public constructor(
|
||||||
protected core: BaseProtocol,
|
protected core: BaseProtocol,
|
||||||
protected connectionManager: ConnectionManager,
|
protected connectionManager: ConnectionManager,
|
||||||
@ -41,7 +30,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||||||
) {
|
) {
|
||||||
this.log = new Logger(`sdk:${core.multicodec}`);
|
this.log = new Logger(`sdk:${core.multicodec}`);
|
||||||
|
|
||||||
this.healthManager = getHealthManager();
|
this.peerManager = new PeerManager(connectionManager, core, this.log);
|
||||||
|
|
||||||
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
|
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
|
||||||
const maintainPeersInterval =
|
const maintainPeersInterval =
|
||||||
@ -52,7 +41,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public get connectedPeers(): Peer[] {
|
public get connectedPeers(): Peer[] {
|
||||||
return Array.from(this.peers.values());
|
return this.peerManager.getPeers();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -61,30 +50,20 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||||||
* @returns The new peer that was found and connected to.
|
* @returns The new peer that was found and connected to.
|
||||||
*/
|
*/
|
||||||
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer | undefined> {
|
public async renewPeer(peerToDisconnect: PeerId): Promise<Peer | undefined> {
|
||||||
return this.peersMutex.runExclusive(async () => {
|
this.log.info(`Renewing peer ${peerToDisconnect}`);
|
||||||
this.log.info(`Renewing peer ${peerToDisconnect}`);
|
|
||||||
|
|
||||||
await this.connectionManager.dropConnection(peerToDisconnect);
|
const success = await this.peerManager.disconnectPeer(peerToDisconnect);
|
||||||
|
if (!success) return undefined;
|
||||||
|
|
||||||
this.peers.delete(peerToDisconnect.toString());
|
const newPeer = await this.peerManager.findAndAddPeers(1);
|
||||||
this.updatePeers(new Map(this.peers));
|
if (newPeer.length === 0) {
|
||||||
|
this.log.error(
|
||||||
this.log.info(
|
"Failed to find a new peer to replace the disconnected one."
|
||||||
`Peer ${peerToDisconnect} disconnected and removed from the peer list`
|
|
||||||
);
|
);
|
||||||
|
return undefined;
|
||||||
|
}
|
||||||
|
|
||||||
const newPeer = await this.findAndAddPeers(1);
|
return newPeer[0];
|
||||||
if (newPeer.length === 0) {
|
|
||||||
this.log.error(
|
|
||||||
"Failed to find a new peer to replace the disconnected one."
|
|
||||||
);
|
|
||||||
return undefined;
|
|
||||||
}
|
|
||||||
|
|
||||||
this.renewPeersLocker.lock(peerToDisconnect);
|
|
||||||
|
|
||||||
return newPeer[0];
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -101,11 +80,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||||||
// private setupEventListeners(): void {
|
// private setupEventListeners(): void {
|
||||||
// this.core.addLibp2pEventListener(
|
// this.core.addLibp2pEventListener(
|
||||||
// "peer:connect",
|
// "peer:connect",
|
||||||
// () => void this.confirmPeers()
|
// () => void this.maintainPeers()
|
||||||
// );
|
// );
|
||||||
// this.core.addLibp2pEventListener(
|
// this.core.addLibp2pEventListener(
|
||||||
// "peer:disconnect",
|
// "peer:disconnect",
|
||||||
// () => void this.confirmPeers()
|
// () => void this.maintainPeers()
|
||||||
// );
|
// );
|
||||||
// }
|
// }
|
||||||
|
|
||||||
@ -123,57 +102,38 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||||||
* @param options.maxAttempts Optional maximum number of attempts for exponential backoff (default: 3)
|
* @param options.maxAttempts Optional maximum number of attempts for exponential backoff (default: 3)
|
||||||
* @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100)
|
* @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100)
|
||||||
*/
|
*/
|
||||||
protected hasPeers = async (
|
protected async hasPeers(
|
||||||
options: Partial<ProtocolUseOptions> = {}
|
options: Partial<ProtocolUseOptions> = {}
|
||||||
): Promise<boolean> => {
|
): Promise<boolean> {
|
||||||
const {
|
const {
|
||||||
autoRetry = false,
|
autoRetry = false,
|
||||||
forceUseAllPeers = false,
|
forceUseAllPeers = false,
|
||||||
initialDelay = 10,
|
maxAttempts = 3
|
||||||
maxAttempts = 3,
|
|
||||||
maxDelay = 100
|
|
||||||
} = options;
|
} = options;
|
||||||
|
|
||||||
let needsMaintenance: boolean;
|
if (!forceUseAllPeers && this.connectedPeers.length > 0) {
|
||||||
let currentPeerCount: number;
|
return true;
|
||||||
|
|
||||||
const release = await this.peersMutex.acquire();
|
|
||||||
try {
|
|
||||||
currentPeerCount = this.connectedPeers.length;
|
|
||||||
needsMaintenance = forceUseAllPeers || currentPeerCount === 0;
|
|
||||||
} finally {
|
|
||||||
release();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!needsMaintenance) return true;
|
for (let attempts = 0; attempts < maxAttempts; attempts++) {
|
||||||
|
const success = await this.maintainPeers();
|
||||||
let attempts = 0;
|
if (success) {
|
||||||
while (attempts < maxAttempts) {
|
if (this.connectedPeers.length < this.numPeersToUse) {
|
||||||
attempts++;
|
this.log.warn(
|
||||||
if (await this.maintainPeers()) {
|
`Found only ${this.connectedPeers.length} peers, expected ${this.numPeersToUse}`
|
||||||
const finalRelease = await this.peersMutex.acquire();
|
);
|
||||||
try {
|
|
||||||
if (this.peers.size < this.numPeersToUse) {
|
|
||||||
this.log.warn(
|
|
||||||
`Found only ${this.peers.size} peers, expected ${this.numPeersToUse}`
|
|
||||||
);
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
} finally {
|
|
||||||
finalRelease();
|
|
||||||
}
|
}
|
||||||
|
return true;
|
||||||
}
|
}
|
||||||
if (!autoRetry) return false;
|
if (!autoRetry) {
|
||||||
const delayMs = Math.min(
|
return false;
|
||||||
initialDelay * Math.pow(2, attempts - 1),
|
}
|
||||||
maxDelay
|
//TODO: handle autoRetry
|
||||||
);
|
|
||||||
await delay(delayMs);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
this.log.error("Failed to find peers to send message to");
|
this.log.error("Failed to find peers to send message to");
|
||||||
return false;
|
return false;
|
||||||
};
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Starts an interval to maintain the peers list to `numPeersToUse`.
|
* Starts an interval to maintain the peers list to `numPeersToUse`.
|
||||||
@ -182,7 +142,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||||||
private async startMaintainPeersInterval(interval: number): Promise<void> {
|
private async startMaintainPeersInterval(interval: number): Promise<void> {
|
||||||
this.log.info("Starting maintain peers interval");
|
this.log.info("Starting maintain peers interval");
|
||||||
try {
|
try {
|
||||||
await this.maintainPeers();
|
// await this.maintainPeers();
|
||||||
this.maintainPeersIntervalId = setInterval(() => {
|
this.maintainPeersIntervalId = setInterval(() => {
|
||||||
this.maintainPeers().catch((error) => {
|
this.maintainPeers().catch((error) => {
|
||||||
this.log.error("Error during maintain peers interval:", error);
|
this.log.error("Error during maintain peers interval:", error);
|
||||||
@ -202,169 +162,30 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||||||
*/
|
*/
|
||||||
private async maintainPeers(): Promise<boolean> {
|
private async maintainPeers(): Promise<boolean> {
|
||||||
try {
|
try {
|
||||||
await this.confirmPeers();
|
const currentPeerCount = await this.peerManager.getPeerCount();
|
||||||
|
const numPeersToAdd = this.numPeersToUse - currentPeerCount;
|
||||||
|
|
||||||
const numPeersToAdd = await this.peersMutex.runExclusive(() => {
|
if (numPeersToAdd === 0) {
|
||||||
this.log.info(`Maintaining peers, current count: ${this.peers.size}`);
|
this.log.info("No maintenance required, peer count is sufficient");
|
||||||
return this.numPeersToUse - this.peers.size;
|
return true;
|
||||||
});
|
}
|
||||||
|
|
||||||
|
this.log.info(`Maintaining peers, current count: ${currentPeerCount}`);
|
||||||
|
|
||||||
if (numPeersToAdd > 0) {
|
if (numPeersToAdd > 0) {
|
||||||
await this.findAndAddPeers(numPeersToAdd);
|
await this.peerManager.findAndAddPeers(numPeersToAdd);
|
||||||
|
} else {
|
||||||
|
await this.peerManager.removeExcessPeers(Math.abs(numPeersToAdd));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (numPeersToAdd < 0) {
|
const finalPeerCount = await this.peerManager.getPeerCount();
|
||||||
this.log.warn(`
|
this.log.info(
|
||||||
Peer maintenance completed, but there are more than ${this.numPeersToUse} peers.
|
`Peer maintenance completed, current count: ${finalPeerCount}`
|
||||||
This should not happen.
|
);
|
||||||
`);
|
return finalPeerCount >= this.numPeersToUse;
|
||||||
}
|
|
||||||
|
|
||||||
await this.peersMutex.runExclusive(() => {
|
|
||||||
this.log.info(
|
|
||||||
`Peer maintenance completed, current count: ${this.peers.size}`
|
|
||||||
);
|
|
||||||
this.renewPeersLocker.cleanUnlocked();
|
|
||||||
});
|
|
||||||
|
|
||||||
return true;
|
|
||||||
} catch (error) {
|
} catch (error) {
|
||||||
this.log.error("Error during peer maintenance", error);
|
this.log.error("Error during peer maintenance", { error });
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private async confirmPeers(): Promise<void> {
|
|
||||||
const connectedPeers = await this.core.connectedPeers();
|
|
||||||
const currentPeerIds = new Set(this.peers.keys());
|
|
||||||
|
|
||||||
// Peers to add (connected but not in our list)
|
|
||||||
const peersToAdd = connectedPeers.filter(
|
|
||||||
(p) => !currentPeerIds.has(p.id.toString())
|
|
||||||
);
|
|
||||||
|
|
||||||
// Peers to remove (in our list but not connected)
|
|
||||||
const peersToRemove = Array.from(this.peers.values()).filter(
|
|
||||||
(p) => !connectedPeers.some((cp) => cp.id.equals(p.id))
|
|
||||||
);
|
|
||||||
|
|
||||||
await this.peersMutex.runExclusive(async () => {
|
|
||||||
// Add new peers
|
|
||||||
for (const peer of peersToAdd) {
|
|
||||||
this.peers.set(peer.id.toString(), peer);
|
|
||||||
this.log.info(`Added new peer: ${peer.id.toString()}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove disconnected peers
|
|
||||||
for (const peer of peersToRemove) {
|
|
||||||
this.peers.delete(peer.id.toString());
|
|
||||||
this.log.info(`Removed disconnected peer: ${peer.id.toString()}`);
|
|
||||||
}
|
|
||||||
|
|
||||||
this.updatePeers(new Map(this.peers));
|
|
||||||
this.log.info(`Peers confirmed. Current count: ${this.peers.size}`);
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Finds and adds new peers to the peers list.
|
|
||||||
* @param numPeers The number of peers to find and add.
|
|
||||||
*/
|
|
||||||
private async findAndAddPeers(numPeers: number): Promise<Peer[]> {
|
|
||||||
let newPeers: Peer[];
|
|
||||||
const release = await this.peersMutex.acquire();
|
|
||||||
try {
|
|
||||||
this.log.info(`Finding and adding ${numPeers} new peers`);
|
|
||||||
newPeers = await this.findAdditionalPeers(numPeers);
|
|
||||||
} finally {
|
|
||||||
release();
|
|
||||||
}
|
|
||||||
|
|
||||||
const dials = await Promise.all(
|
|
||||||
newPeers.map((peer) => this.connectionManager.attemptDial(peer.id))
|
|
||||||
);
|
|
||||||
|
|
||||||
const finalRelease = await this.peersMutex.acquire();
|
|
||||||
try {
|
|
||||||
const successfulPeers = newPeers.filter((_, index) => dials[index]);
|
|
||||||
successfulPeers.forEach((peer) =>
|
|
||||||
this.peers.set(peer.id.toString(), peer)
|
|
||||||
);
|
|
||||||
this.updatePeers(new Map(this.peers));
|
|
||||||
this.log.info(
|
|
||||||
`Added ${successfulPeers.length} new peers, total peers: ${this.peers.size}`
|
|
||||||
);
|
|
||||||
return successfulPeers;
|
|
||||||
} finally {
|
|
||||||
finalRelease();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Finds additional peers.
|
|
||||||
* Attempts to find peers without using bootstrap peers first,
|
|
||||||
* If no peers are found,
|
|
||||||
* tries with bootstrap peers.
|
|
||||||
* @param numPeers The number of peers to find.
|
|
||||||
*/
|
|
||||||
private async findAdditionalPeers(numPeers: number): Promise<Peer[]> {
|
|
||||||
this.log.info(`Finding ${numPeers} additional peers`);
|
|
||||||
try {
|
|
||||||
let newPeers = await this.core.allPeers();
|
|
||||||
|
|
||||||
if (newPeers.length === 0) {
|
|
||||||
this.log.warn("No new peers found.");
|
|
||||||
}
|
|
||||||
|
|
||||||
newPeers = newPeers
|
|
||||||
.filter((peer) => !this.peers.has(peer.id.toString()))
|
|
||||||
.filter((peer) => !this.renewPeersLocker.isLocked(peer.id))
|
|
||||||
.slice(0, numPeers);
|
|
||||||
|
|
||||||
return newPeers;
|
|
||||||
} catch (error) {
|
|
||||||
this.log.error("Error finding additional peers:", error);
|
|
||||||
throw error;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private updatePeers(peers: Map<PeerIdStr, Peer>): void {
|
|
||||||
this.peers = peers;
|
|
||||||
this.healthManager.updateProtocolHealth(
|
|
||||||
this.core.multicodec,
|
|
||||||
this.peers.size
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class RenewPeerLocker {
|
|
||||||
private readonly peers: Map<string, number> = new Map();
|
|
||||||
|
|
||||||
public constructor(private lockDuration: number) {}
|
|
||||||
|
|
||||||
public lock(id: PeerId): void {
|
|
||||||
this.peers.set(id.toString(), Date.now());
|
|
||||||
}
|
|
||||||
|
|
||||||
public isLocked(id: PeerId): boolean {
|
|
||||||
const time = this.peers.get(id.toString());
|
|
||||||
|
|
||||||
if (time && !this.isTimeUnlocked(time)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
public cleanUnlocked(): void {
|
|
||||||
Array.from(this.peers.entries()).forEach(([id, lock]) => {
|
|
||||||
if (this.isTimeUnlocked(lock)) {
|
|
||||||
this.peers.delete(id.toString());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
private isTimeUnlocked(time: number): boolean {
|
|
||||||
return Date.now() - time >= this.lockDuration;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
115
packages/sdk/src/protocols/peer_manager.ts
Normal file
115
packages/sdk/src/protocols/peer_manager.ts
Normal file
@ -0,0 +1,115 @@
|
|||||||
|
import { Peer, PeerId } from "@libp2p/interface";
|
||||||
|
import { ConnectionManager } from "@waku/core";
|
||||||
|
import { BaseProtocol } from "@waku/core/lib/base_protocol";
|
||||||
|
import { Logger } from "@waku/utils";
|
||||||
|
import { Mutex } from "async-mutex";
|
||||||
|
|
||||||
|
export class PeerManager {
|
||||||
|
private peers: Map<string, Peer> = new Map();
|
||||||
|
private readMutex = new Mutex();
|
||||||
|
private writeMutex = new Mutex();
|
||||||
|
private writeLockHolder: string | null = null;
|
||||||
|
|
||||||
|
public constructor(
|
||||||
|
private readonly connectionManager: ConnectionManager,
|
||||||
|
private readonly core: BaseProtocol,
|
||||||
|
private readonly log: Logger
|
||||||
|
) {}
|
||||||
|
|
||||||
|
public getWriteLockHolder(): string | null {
|
||||||
|
return this.writeLockHolder;
|
||||||
|
}
|
||||||
|
|
||||||
|
public getPeers(): Peer[] {
|
||||||
|
return Array.from(this.peers.values());
|
||||||
|
}
|
||||||
|
|
||||||
|
public async addPeer(peer: Peer): Promise<void> {
|
||||||
|
return this.writeMutex.runExclusive(async () => {
|
||||||
|
this.writeLockHolder = `addPeer: ${peer.id.toString()}`;
|
||||||
|
await this.connectionManager.attemptDial(peer.id);
|
||||||
|
this.peers.set(peer.id.toString(), peer);
|
||||||
|
this.log.info(`Added and dialed peer: ${peer.id.toString()}`);
|
||||||
|
this.writeLockHolder = null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public async removePeer(peerId: PeerId): Promise<void> {
|
||||||
|
return this.writeMutex.runExclusive(() => {
|
||||||
|
this.writeLockHolder = `removePeer: ${peerId.toString()}`;
|
||||||
|
this.peers.delete(peerId.toString());
|
||||||
|
this.log.info(`Removed peer: ${peerId.toString()}`);
|
||||||
|
this.writeLockHolder = null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
public async getPeerCount(): Promise<number> {
|
||||||
|
return this.readMutex.runExclusive(() => this.peers.size);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async hasPeers(): Promise<boolean> {
|
||||||
|
return this.readMutex.runExclusive(() => this.peers.size > 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public async removeExcessPeers(excessPeers: number): Promise<void> {
|
||||||
|
this.log.info(`Removing ${excessPeers} excess peer(s)`);
|
||||||
|
const peersToRemove = Array.from(this.peers.values()).slice(0, excessPeers);
|
||||||
|
for (const peer of peersToRemove) {
|
||||||
|
await this.removePeer(peer.id);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public async disconnectPeer(peerId: PeerId): Promise<boolean> {
|
||||||
|
try {
|
||||||
|
this.writeLockHolder = `disconnectPeer: ${peerId.toString()}`;
|
||||||
|
this.log.info(`Disconnecting peer: ${peerId}`);
|
||||||
|
await this.connectionManager.dropConnection(peerId);
|
||||||
|
await this.removePeer(peerId);
|
||||||
|
this.log.info(`Disconnected peer: ${peerId}`);
|
||||||
|
this.writeLockHolder = null;
|
||||||
|
return true;
|
||||||
|
} catch (error) {
|
||||||
|
this.log.error("Error disconnecting peer:", error);
|
||||||
|
this.writeLockHolder = null;
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds and adds new peers to the peers list.
|
||||||
|
* @param numPeers The number of peers to find and add.
|
||||||
|
*/
|
||||||
|
public async findAndAddPeers(numPeers: number): Promise<Peer[]> {
|
||||||
|
const additionalPeers = await this.findPeers(numPeers);
|
||||||
|
if (additionalPeers.length === 0) {
|
||||||
|
this.log.warn("No additional peers found");
|
||||||
|
return [];
|
||||||
|
}
|
||||||
|
return this.addMultiplePeers(additionalPeers);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Finds additional peers.
|
||||||
|
* @param numPeers The number of peers to find.
|
||||||
|
*/
|
||||||
|
private async findPeers(numPeers: number): Promise<Peer[]> {
|
||||||
|
const connectedPeers = await this.core.getPeers();
|
||||||
|
|
||||||
|
return this.readMutex.runExclusive(async () => {
|
||||||
|
const newPeers = connectedPeers
|
||||||
|
.filter((peer) => !this.peers.has(peer.id.toString()))
|
||||||
|
.slice(0, numPeers);
|
||||||
|
|
||||||
|
return newPeers;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private async addMultiplePeers(peers: Peer[]): Promise<Peer[]> {
|
||||||
|
const addedPeers: Peer[] = [];
|
||||||
|
for (const peer of peers) {
|
||||||
|
await this.addPeer(peer);
|
||||||
|
addedPeers.push(peer);
|
||||||
|
}
|
||||||
|
return addedPeers;
|
||||||
|
}
|
||||||
|
}
|
@ -47,11 +47,15 @@ describe("Waku Light Push: Connection Management: E2E", function () {
|
|||||||
expect(failures?.length || 0).to.equal(0);
|
expect(failures?.length || 0).to.equal(0);
|
||||||
});
|
});
|
||||||
|
|
||||||
it.only("Failed peers are renewed", async function () {
|
it("Failed peers are renewed", async function () {
|
||||||
// send a lightpush request -- should have all successes
|
// send a lightpush request -- should have all successes
|
||||||
const response1 = await waku.lightPush.send(encoder, {
|
const response1 = await waku.lightPush.send(
|
||||||
payload: utf8ToBytes("Hello_World")
|
encoder,
|
||||||
});
|
{
|
||||||
|
payload: utf8ToBytes("Hello_World")
|
||||||
|
},
|
||||||
|
{ forceUseAllPeers: true }
|
||||||
|
);
|
||||||
|
|
||||||
expect(response1.successes.length).to.be.equal(
|
expect(response1.successes.length).to.be.equal(
|
||||||
waku.lightPush.numPeersToUse
|
waku.lightPush.numPeersToUse
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
import type { Connection, Peer, PeerStore } from "@libp2p/interface";
|
import type { Peer, PeerStore } from "@libp2p/interface";
|
||||||
import { ShardInfo } from "@waku/interfaces";
|
|
||||||
|
|
||||||
import { bytesToUtf8 } from "../bytes/index.js";
|
import { bytesToUtf8 } from "../bytes/index.js";
|
||||||
import { decodeRelayShard } from "../common/relay_shard_codec.js";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns a pseudo-random peer that supports the given protocol.
|
* Returns a pseudo-random peer that supports the given protocol.
|
||||||
@ -69,39 +67,3 @@ export async function getPeersForProtocol(
|
|||||||
});
|
});
|
||||||
return peers;
|
return peers;
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function getConnectedPeersForProtocolAndShard(
|
|
||||||
connections: Connection[],
|
|
||||||
peerStore: PeerStore,
|
|
||||||
protocols: string[],
|
|
||||||
shardInfo?: ShardInfo
|
|
||||||
): Promise<Peer[]> {
|
|
||||||
const openConnections = connections.filter(
|
|
||||||
(connection) => connection.status === "open"
|
|
||||||
);
|
|
||||||
|
|
||||||
const peerPromises = openConnections.map(async (connection) => {
|
|
||||||
const peer = await peerStore.get(connection.remotePeer);
|
|
||||||
const supportsProtocol = protocols.some((protocol) =>
|
|
||||||
peer.protocols.includes(protocol)
|
|
||||||
);
|
|
||||||
|
|
||||||
if (supportsProtocol) {
|
|
||||||
if (shardInfo) {
|
|
||||||
const encodedPeerShardInfo = peer.metadata.get("shardInfo");
|
|
||||||
const peerShardInfo =
|
|
||||||
encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo);
|
|
||||||
|
|
||||||
if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) {
|
|
||||||
return peer;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return peer;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
});
|
|
||||||
|
|
||||||
const peersWithNulls = await Promise.all(peerPromises);
|
|
||||||
return peersWithNulls.filter((peer): peer is Peer => peer !== null);
|
|
||||||
}
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user