mirror of https://github.com/waku-org/js-waku.git
feat: peer manager
chore: track peers in a hashmap instead of array chore: peer mgmt responds to conenction/disconnection and improve logging fix: hasPeers management chore: add modularity to getting connected peers chore: renewal doesnt disconnect, only removes
This commit is contained in:
parent
3e821591c9
commit
d38ba9cf1c
|
@ -5,12 +5,8 @@ import type {
|
|||
Libp2pComponents,
|
||||
PubsubTopic
|
||||
} from "@waku/interfaces";
|
||||
import { Logger, pubsubTopicsToShardInfo } from "@waku/utils";
|
||||
import {
|
||||
getConnectedPeersForProtocolAndShard,
|
||||
getPeersForProtocol,
|
||||
sortPeersByLatency
|
||||
} from "@waku/utils/libp2p";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";
|
||||
|
||||
import { filterPeersByDiscovery } from "./filterPeers.js";
|
||||
import { StreamManager } from "./stream_manager/index.js";
|
||||
|
@ -63,7 +59,7 @@ export class BaseProtocol implements IBaseProtocolCore {
|
|||
return getPeersForProtocol(this.peerStore, [this.multicodec]);
|
||||
}
|
||||
|
||||
public async connectedPeers(): Promise<Peer[]> {
|
||||
public async connectedPeers(withOpenStreams = false): Promise<Peer[]> {
|
||||
const peers = await this.allPeers();
|
||||
return peers.filter((peer) => {
|
||||
return (
|
||||
|
@ -77,9 +73,8 @@ export class BaseProtocol implements IBaseProtocolCore {
|
|||
*
|
||||
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
|
||||
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
|
||||
|
||||
* @returns A list of peers that support the protocol sorted by latency.
|
||||
*/
|
||||
* @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap.
|
||||
*/
|
||||
public async getPeers(
|
||||
{
|
||||
numPeers,
|
||||
|
@ -88,7 +83,7 @@ export class BaseProtocol implements IBaseProtocolCore {
|
|||
numPeers: number;
|
||||
maxBootstrapPeers: number;
|
||||
} = {
|
||||
maxBootstrapPeers: 1,
|
||||
maxBootstrapPeers: 0,
|
||||
numPeers: 0
|
||||
}
|
||||
): Promise<Peer[]> {
|
||||
|
@ -103,7 +98,7 @@ export class BaseProtocol implements IBaseProtocolCore {
|
|||
|
||||
// Filter the peers based on discovery & number of peers requested
|
||||
const filteredPeers = filterPeersByDiscovery(
|
||||
connectedPeersForProtocolAndShard,
|
||||
allAvailableConnectedPeers,
|
||||
numPeers,
|
||||
maxBootstrapPeers
|
||||
);
|
||||
|
|
|
@ -18,7 +18,7 @@ export type IBaseProtocolCore = {
|
|||
multicodec: string;
|
||||
peerStore: PeerStore;
|
||||
allPeers: () => Promise<Peer[]>;
|
||||
connectedPeers: () => Promise<Peer[]>;
|
||||
connectedPeers: (withOpenStreams?: boolean) => Promise<Peer[]>;
|
||||
addLibp2pEventListener: Libp2p["addEventListener"];
|
||||
removeLibp2pEventListener: Libp2p["removeEventListener"];
|
||||
};
|
||||
|
@ -36,10 +36,6 @@ export type NetworkConfig = StaticSharding | AutoSharding;
|
|||
* Options for using LightPush and Filter
|
||||
*/
|
||||
export type ProtocolUseOptions = {
|
||||
/**
|
||||
* Optional flag to enable auto-retry with exponential backoff
|
||||
*/
|
||||
autoRetry?: boolean;
|
||||
/**
|
||||
* Optional flag to force using all available peers
|
||||
*/
|
||||
|
@ -48,14 +44,6 @@ export type ProtocolUseOptions = {
|
|||
* Optional maximum number of attempts for exponential backoff
|
||||
*/
|
||||
maxAttempts?: number;
|
||||
/**
|
||||
* Optional initial delay in milliseconds for exponential backoff
|
||||
*/
|
||||
initialDelay?: number;
|
||||
/**
|
||||
* Optional maximum delay in milliseconds for exponential backoff
|
||||
*/
|
||||
maxDelay?: number;
|
||||
};
|
||||
|
||||
export type ProtocolCreateOptions = {
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
import type { Peer, PeerId } from "@libp2p/interface";
|
||||
import { ConnectionManager, getHealthManager } from "@waku/core";
|
||||
import { ConnectionManager } from "@waku/core";
|
||||
import { BaseProtocol } from "@waku/core/lib/base_protocol";
|
||||
import {
|
||||
IBaseProtocolSDK,
|
||||
IHealthManager,
|
||||
PeerIdStr,
|
||||
ProtocolUseOptions
|
||||
} from "@waku/interfaces";
|
||||
import { delay, Logger } from "@waku/utils";
|
||||
|
@ -12,15 +13,15 @@ interface Options {
|
|||
numPeersToUse?: number;
|
||||
maintainPeersInterval?: number;
|
||||
}
|
||||
///TODO: update HealthManager
|
||||
|
||||
const RENEW_TIME_LOCK_DURATION = 30 * 1000;
|
||||
const DEFAULT_NUM_PEERS_TO_USE = 2;
|
||||
const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000;
|
||||
|
||||
export class BaseProtocolSDK implements IBaseProtocolSDK {
|
||||
private healthManager: IHealthManager;
|
||||
private peerManager: PeerManager;
|
||||
public readonly numPeersToUse: number;
|
||||
private peers: Peer[] = [];
|
||||
private peers: Map<PeerIdStr, Peer> = new Map();
|
||||
private maintainPeersIntervalId: ReturnType<
|
||||
typeof window.setInterval
|
||||
> | null = null;
|
||||
|
@ -38,17 +39,18 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||
) {
|
||||
this.log = new Logger(`sdk:${core.multicodec}`);
|
||||
|
||||
this.healthManager = getHealthManager();
|
||||
this.peerManager = new PeerManager(connectionManager, core, this.log);
|
||||
|
||||
this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE;
|
||||
const maintainPeersInterval =
|
||||
options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL;
|
||||
|
||||
void this.setupEventListeners();
|
||||
void this.startMaintainPeersInterval(maintainPeersInterval);
|
||||
}
|
||||
|
||||
public get connectedPeers(): Peer[] {
|
||||
return this.peers;
|
||||
return Array.from(this.peers.values());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -93,30 +95,30 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||
}
|
||||
}
|
||||
|
||||
private setupEventListeners(): void {
|
||||
this.core.addLibp2pEventListener(
|
||||
"peer:connect",
|
||||
() => void this.confirmPeers()
|
||||
);
|
||||
this.core.addLibp2pEventListener(
|
||||
"peer:disconnect",
|
||||
() => void this.confirmPeers()
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if there are peers to send a message to.
|
||||
* If `forceUseAllPeers` is `false` (default) and there are connected peers, returns `true`.
|
||||
* If `forceUseAllPeers` is `true` or there are no connected peers, tries to find new peers from the ConnectionManager.
|
||||
* If `autoRetry` is `false`, returns `false` if no peers are found.
|
||||
* If `autoRetry` is `true`, tries to find new peers from the ConnectionManager with exponential backoff.
|
||||
* Returns `true` if peers are found, `false` otherwise.
|
||||
* Checks if there are sufficient peers to send a message to.
|
||||
* If `forceUseAllPeers` is `false` (default), returns `true` if there are any connected peers.
|
||||
* If `forceUseAllPeers` is `true`, attempts to connect to `numPeersToUse` peers.
|
||||
* @param options Optional options object
|
||||
* @param options.autoRetry Optional flag to enable auto-retry with exponential backoff (default: false)
|
||||
* @param options.forceUseAllPeers Optional flag to force using all available peers (default: false)
|
||||
* @param options.initialDelay Optional initial delay in milliseconds for exponential backoff (default: 10)
|
||||
* @param options.maxAttempts Optional maximum number of attempts for exponential backoff (default: 3)
|
||||
* @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100)
|
||||
* @param options.forceUseAllPeers Optional flag to force connecting to `numPeersToUse` peers (default: false)
|
||||
* @param options.maxAttempts Optional maximum number of attempts to reach the required number of peers (default: 3)
|
||||
* @returns `true` if the required number of peers are connected, `false` otherwise
|
||||
*/
|
||||
protected hasPeers = async (
|
||||
protected async hasPeers(
|
||||
options: Partial<ProtocolUseOptions> = {}
|
||||
): Promise<boolean> => {
|
||||
const {
|
||||
autoRetry = false,
|
||||
forceUseAllPeers = false,
|
||||
initialDelay = 10,
|
||||
maxAttempts = 3,
|
||||
maxDelay = 100
|
||||
} = options;
|
||||
): Promise<boolean> {
|
||||
const { forceUseAllPeers = false, maxAttempts = 3 } = options;
|
||||
|
||||
if (!forceUseAllPeers && this.connectedPeers.length > 0) return true;
|
||||
|
||||
|
@ -124,24 +126,34 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||
while (attempts < maxAttempts) {
|
||||
attempts++;
|
||||
if (await this.maintainPeers()) {
|
||||
if (this.peers.length < this.numPeersToUse) {
|
||||
if (this.peers.size < this.numPeersToUse) {
|
||||
this.log.warn(
|
||||
`Found only ${this.peers.length} peers, expected ${this.numPeersToUse}`
|
||||
`Found only ${this.peers.size} peers, expected ${this.numPeersToUse}`
|
||||
);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (!autoRetry) return false;
|
||||
const delayMs = Math.min(
|
||||
initialDelay * Math.pow(2, attempts - 1),
|
||||
maxDelay
|
||||
);
|
||||
await delay(delayMs);
|
||||
if (!autoRetry) {
|
||||
return false;
|
||||
}
|
||||
//TODO: handle autoRetry
|
||||
}
|
||||
|
||||
this.log.error("Failed to find peers to send message to");
|
||||
for (let attempts = 0; attempts < maxAttempts; attempts++) {
|
||||
await this.maintainPeers();
|
||||
|
||||
if (this.connectedPeers.length >= this.numPeersToUse) {
|
||||
return true;
|
||||
}
|
||||
|
||||
this.log.warn(
|
||||
`Found only ${this.connectedPeers.length} peers, expected ${this.numPeersToUse}. Retrying...`
|
||||
);
|
||||
}
|
||||
|
||||
this.log.error("Failed to find required number of peers");
|
||||
return false;
|
||||
};
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts an interval to maintain the peers list to `numPeersToUse`.
|
||||
|
@ -150,7 +162,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||
private async startMaintainPeersInterval(interval: number): Promise<void> {
|
||||
this.log.info("Starting maintain peers interval");
|
||||
try {
|
||||
await this.maintainPeers();
|
||||
// await this.maintainPeers();
|
||||
this.maintainPeersIntervalId = setInterval(() => {
|
||||
this.maintainPeers().catch((error) => {
|
||||
this.log.error("Error during maintain peers interval:", error);
|
||||
|
@ -176,18 +188,36 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||
this.maintainPeersLock = true;
|
||||
this.log.info(`Maintaining peers, current count: ${this.peers.length}`);
|
||||
try {
|
||||
const numPeersToAdd = this.numPeersToUse - this.peers.length;
|
||||
await this.confirmPeers();
|
||||
this.log.info(`Maintaining peers, current count: ${this.peers.size}`);
|
||||
|
||||
const numPeersToAdd = this.numPeersToUse - this.peers.size;
|
||||
if (numPeersToAdd > 0) {
|
||||
await this.findAndAddPeers(numPeersToAdd);
|
||||
await this.peerManager.findAndAddPeers(numPeersToAdd);
|
||||
} else {
|
||||
await this.peerManager.removeExcessPeers(Math.abs(numPeersToAdd));
|
||||
}
|
||||
|
||||
this.log.info(
|
||||
`Peer maintenance completed, current count: ${this.peers.length}`
|
||||
`Peer maintenance completed, current count: ${this.peers.size}`
|
||||
);
|
||||
this.renewPeersLocker.cleanUnlocked();
|
||||
return true;
|
||||
} catch (error) {
|
||||
if (error instanceof Error) {
|
||||
this.log.error("Error during peer maintenance", {
|
||||
error: error.message,
|
||||
stack: error.stack
|
||||
});
|
||||
} else {
|
||||
this.log.error("Error during peer maintenance", {
|
||||
error: String(error)
|
||||
});
|
||||
}
|
||||
return false;
|
||||
} finally {
|
||||
this.maintainPeersLock = false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -204,11 +234,13 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||
|
||||
await Promise.all(dials);
|
||||
|
||||
const updatedPeers = [...this.peers, ...additionalPeers];
|
||||
this.updatePeers(updatedPeers);
|
||||
additionalPeers.forEach((peer) =>
|
||||
this.peers.set(peer.id.toString(), peer)
|
||||
);
|
||||
this.updatePeers(this.peers);
|
||||
|
||||
this.log.info(
|
||||
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}`
|
||||
`Added ${additionalPeers.length} new peers, total peers: ${this.peers.size}`
|
||||
);
|
||||
return additionalPeers;
|
||||
} catch (error) {
|
||||
|
@ -234,9 +266,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||
}
|
||||
|
||||
newPeers = newPeers
|
||||
.filter(
|
||||
(peer) => this.peers.some((p) => p.id.equals(peer.id)) === false
|
||||
)
|
||||
.filter((peer) => !this.peers.has(peer.id.toString()))
|
||||
.filter((peer) => !this.renewPeersLocker.isLocked(peer.id))
|
||||
.slice(0, numPeers);
|
||||
|
||||
|
@ -247,11 +277,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK {
|
|||
}
|
||||
}
|
||||
|
||||
private updatePeers(peers: Peer[]): void {
|
||||
private updatePeers(peers: Map<PeerIdStr, Peer>): void {
|
||||
this.peers = peers;
|
||||
this.healthManager.updateProtocolHealth(
|
||||
this.core.multicodec,
|
||||
this.peers.length
|
||||
this.peers.size
|
||||
);
|
||||
}
|
||||
}
|
||||
|
@ -276,7 +306,7 @@ class RenewPeerLocker {
|
|||
}
|
||||
|
||||
public cleanUnlocked(): void {
|
||||
Object.entries(this.peers).forEach(([id, lock]) => {
|
||||
Array.from(this.peers.entries()).forEach(([id, lock]) => {
|
||||
if (this.isTimeUnlocked(lock)) {
|
||||
this.peers.delete(id.toString());
|
||||
}
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
import { Peer, PeerId } from "@libp2p/interface";
|
||||
import { ConnectionManager } from "@waku/core";
|
||||
import { BaseProtocol } from "@waku/core/lib/base_protocol";
|
||||
import { Logger } from "@waku/utils";
|
||||
import { Mutex } from "async-mutex";
|
||||
|
||||
export class PeerManager {
|
||||
private peers: Map<string, Peer> = new Map();
|
||||
private readMutex = new Mutex();
|
||||
private writeMutex = new Mutex();
|
||||
private writeLockHolder: string | null = null;
|
||||
|
||||
public constructor(
|
||||
private readonly connectionManager: ConnectionManager,
|
||||
private readonly core: BaseProtocol,
|
||||
private readonly log: Logger
|
||||
) {}
|
||||
|
||||
public getWriteLockHolder(): string | null {
|
||||
return this.writeLockHolder;
|
||||
}
|
||||
|
||||
public getPeers(): Peer[] {
|
||||
return Array.from(this.peers.values());
|
||||
}
|
||||
|
||||
public async addPeer(peer: Peer): Promise<void> {
|
||||
return this.writeMutex.runExclusive(async () => {
|
||||
this.writeLockHolder = `addPeer: ${peer.id.toString()}`;
|
||||
await this.connectionManager.attemptDial(peer.id);
|
||||
this.peers.set(peer.id.toString(), peer);
|
||||
this.log.info(`Added and dialed peer: ${peer.id.toString()}`);
|
||||
this.writeLockHolder = null;
|
||||
});
|
||||
}
|
||||
|
||||
public async removePeer(peerId: PeerId): Promise<void> {
|
||||
return this.writeMutex.runExclusive(() => {
|
||||
this.writeLockHolder = `removePeer: ${peerId.toString()}`;
|
||||
this.peers.delete(peerId.toString());
|
||||
this.log.info(`Removed peer: ${peerId.toString()}`);
|
||||
this.writeLockHolder = null;
|
||||
});
|
||||
}
|
||||
|
||||
public async getPeerCount(): Promise<number> {
|
||||
return this.readMutex.runExclusive(() => this.peers.size);
|
||||
}
|
||||
|
||||
public async hasPeers(): Promise<boolean> {
|
||||
return this.readMutex.runExclusive(() => this.peers.size > 0);
|
||||
}
|
||||
|
||||
public async removeExcessPeers(excessPeers: number): Promise<void> {
|
||||
this.log.info(`Removing ${excessPeers} excess peer(s)`);
|
||||
const peersToRemove = Array.from(this.peers.values()).slice(0, excessPeers);
|
||||
for (const peer of peersToRemove) {
|
||||
await this.removePeer(peer.id);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds and adds new peers to the peers list.
|
||||
* @param numPeers The number of peers to find and add.
|
||||
*/
|
||||
public async findAndAddPeers(numPeers: number): Promise<Peer[]> {
|
||||
const additionalPeers = await this.findPeers(numPeers);
|
||||
if (additionalPeers.length === 0) {
|
||||
this.log.warn("No additional peers found");
|
||||
return [];
|
||||
}
|
||||
return this.addMultiplePeers(additionalPeers);
|
||||
}
|
||||
|
||||
/**
|
||||
* Finds additional peers.
|
||||
* @param numPeers The number of peers to find.
|
||||
*/
|
||||
public async findPeers(numPeers: number): Promise<Peer[]> {
|
||||
const connectedPeers = await this.core.getPeers();
|
||||
|
||||
return this.readMutex.runExclusive(async () => {
|
||||
const newPeers = connectedPeers
|
||||
.filter((peer) => !this.peers.has(peer.id.toString()))
|
||||
.slice(0, numPeers);
|
||||
|
||||
return newPeers;
|
||||
});
|
||||
}
|
||||
|
||||
public async addMultiplePeers(peers: Peer[]): Promise<Peer[]> {
|
||||
const addedPeers: Peer[] = [];
|
||||
for (const peer of peers) {
|
||||
await this.addPeer(peer);
|
||||
addedPeers.push(peer);
|
||||
}
|
||||
return addedPeers;
|
||||
}
|
||||
}
|
|
@ -54,9 +54,13 @@ describe("Waku Light Push: Peer Management: E2E", function () {
|
|||
|
||||
it("Failed peers are renewed", async function () {
|
||||
// send a lightpush request -- should have all successes
|
||||
const response1 = await waku.lightPush.send(encoder, {
|
||||
payload: utf8ToBytes("Hello_World")
|
||||
});
|
||||
const response1 = await waku.lightPush.send(
|
||||
encoder,
|
||||
{
|
||||
payload: utf8ToBytes("Hello_World")
|
||||
},
|
||||
{ forceUseAllPeers: true }
|
||||
);
|
||||
|
||||
expect(response1.successes.length).to.be.equal(
|
||||
waku.lightPush.numPeersToUse
|
||||
|
|
|
@ -1,8 +1,6 @@
|
|||
import type { Connection, Peer, PeerStore } from "@libp2p/interface";
|
||||
import { ShardInfo } from "@waku/interfaces";
|
||||
import type { Peer, PeerStore } from "@libp2p/interface";
|
||||
|
||||
import { bytesToUtf8 } from "../bytes/index.js";
|
||||
import { decodeRelayShard } from "../common/relay_shard_codec.js";
|
||||
|
||||
/**
|
||||
* Returns a pseudo-random peer that supports the given protocol.
|
||||
|
@ -69,39 +67,3 @@ export async function getPeersForProtocol(
|
|||
});
|
||||
return peers;
|
||||
}
|
||||
|
||||
export async function getConnectedPeersForProtocolAndShard(
|
||||
connections: Connection[],
|
||||
peerStore: PeerStore,
|
||||
protocols: string[],
|
||||
shardInfo?: ShardInfo
|
||||
): Promise<Peer[]> {
|
||||
const openConnections = connections.filter(
|
||||
(connection) => connection.status === "open"
|
||||
);
|
||||
|
||||
const peerPromises = openConnections.map(async (connection) => {
|
||||
const peer = await peerStore.get(connection.remotePeer);
|
||||
const supportsProtocol = protocols.some((protocol) =>
|
||||
peer.protocols.includes(protocol)
|
||||
);
|
||||
|
||||
if (supportsProtocol) {
|
||||
if (shardInfo) {
|
||||
const encodedPeerShardInfo = peer.metadata.get("shardInfo");
|
||||
const peerShardInfo =
|
||||
encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo);
|
||||
|
||||
if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) {
|
||||
return peer;
|
||||
}
|
||||
} else {
|
||||
return peer;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
});
|
||||
|
||||
const peersWithNulls = await Promise.all(peerPromises);
|
||||
return peersWithNulls.filter((peer): peer is Peer => peer !== null);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue