diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 9c1bc031b4..2a86c01494 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -5,12 +5,8 @@ import type { Libp2pComponents, PubsubTopic } from "@waku/interfaces"; -import { Logger, pubsubTopicsToShardInfo } from "@waku/utils"; -import { - getConnectedPeersForProtocolAndShard, - getPeersForProtocol, - sortPeersByLatency -} from "@waku/utils/libp2p"; +import { Logger } from "@waku/utils"; +import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p"; import { filterPeersByDiscovery } from "./filterPeers.js"; import { StreamManager } from "./stream_manager/index.js"; @@ -77,9 +73,8 @@ export class BaseProtocol implements IBaseProtocolCore { * * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. - - * @returns A list of peers that support the protocol sorted by latency. - */ + * @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap. + */ public async getPeers( { numPeers, @@ -88,22 +83,16 @@ export class BaseProtocol implements IBaseProtocolCore { numPeers: number; maxBootstrapPeers: number; } = { - maxBootstrapPeers: 1, + maxBootstrapPeers: 0, numPeers: 0 } ): Promise { // Retrieve all connected peers that support the protocol & shard (if configured) - const connectedPeersForProtocolAndShard = - await getConnectedPeersForProtocolAndShard( - this.components.connectionManager.getConnections(), - this.components.peerStore, - [this.multicodec], - pubsubTopicsToShardInfo(this.pubsubTopics) - ); + const allAvailableConnectedPeers = await this.connectedPeers(); // Filter the peers based on discovery & number of peers requested const filteredPeers = filterPeersByDiscovery( - connectedPeersForProtocolAndShard, + allAvailableConnectedPeers, numPeers, maxBootstrapPeers ); diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 64650ffab2..5bfeace6c5 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -1,39 +1,28 @@ import type { Peer, PeerId } from "@libp2p/interface"; -import { ConnectionManager, getHealthManager } from "@waku/core"; +import { ConnectionManager } from "@waku/core"; import { BaseProtocol } from "@waku/core/lib/base_protocol"; -import { - IBaseProtocolSDK, - IHealthManager, - PeerIdStr, - ProtocolUseOptions -} from "@waku/interfaces"; -import { delay, Logger } from "@waku/utils"; -import { Mutex } from "async-mutex"; +import { IBaseProtocolSDK, ProtocolUseOptions } from "@waku/interfaces"; +import { Logger } from "@waku/utils"; + +import { PeerManager } from "./peer_manager.js"; interface Options { numPeersToUse?: number; maintainPeersInterval?: number; } +///TODO: update HealthManager -const RENEW_TIME_LOCK_DURATION = 30 * 1000; -export const DEFAULT_NUM_PEERS_TO_USE = 2; +const DEFAULT_NUM_PEERS_TO_USE = 2; const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; export class BaseProtocolSDK implements IBaseProtocolSDK { - protected healthManager: IHealthManager; + private peerManager: PeerManager; public readonly numPeersToUse: number; - private peers: Map = new Map(); private maintainPeersIntervalId: ReturnType< typeof window.setInterval > | null = null; private log: Logger; - private readonly renewPeersLocker = new RenewPeerLocker( - RENEW_TIME_LOCK_DURATION - ); - - private peersMutex = new Mutex(); - public constructor( protected core: BaseProtocol, protected connectionManager: ConnectionManager, @@ -41,7 +30,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { ) { this.log = new Logger(`sdk:${core.multicodec}`); - this.healthManager = getHealthManager(); + this.peerManager = new PeerManager(connectionManager, core, this.log); this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; const maintainPeersInterval = @@ -52,7 +41,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } public get connectedPeers(): Peer[] { - return Array.from(this.peers.values()); + return this.peerManager.getPeers(); } /** @@ -61,30 +50,20 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * @returns The new peer that was found and connected to. */ public async renewPeer(peerToDisconnect: PeerId): Promise { - return this.peersMutex.runExclusive(async () => { - this.log.info(`Renewing peer ${peerToDisconnect}`); + this.log.info(`Renewing peer ${peerToDisconnect}`); - await this.connectionManager.dropConnection(peerToDisconnect); + const success = await this.peerManager.disconnectPeer(peerToDisconnect); + if (!success) return undefined; - this.peers.delete(peerToDisconnect.toString()); - this.updatePeers(new Map(this.peers)); - - this.log.info( - `Peer ${peerToDisconnect} disconnected and removed from the peer list` + const newPeer = await this.peerManager.findAndAddPeers(1); + if (newPeer.length === 0) { + this.log.error( + "Failed to find a new peer to replace the disconnected one." ); + return undefined; + } - const newPeer = await this.findAndAddPeers(1); - if (newPeer.length === 0) { - this.log.error( - "Failed to find a new peer to replace the disconnected one." - ); - return undefined; - } - - this.renewPeersLocker.lock(peerToDisconnect); - - return newPeer[0]; - }); + return newPeer[0]; } /** @@ -101,11 +80,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { // private setupEventListeners(): void { // this.core.addLibp2pEventListener( // "peer:connect", - // () => void this.confirmPeers() + // () => void this.maintainPeers() // ); // this.core.addLibp2pEventListener( // "peer:disconnect", - // () => void this.confirmPeers() + // () => void this.maintainPeers() // ); // } @@ -123,57 +102,38 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { * @param options.maxAttempts Optional maximum number of attempts for exponential backoff (default: 3) * @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100) */ - protected hasPeers = async ( + protected async hasPeers( options: Partial = {} - ): Promise => { + ): Promise { const { autoRetry = false, forceUseAllPeers = false, - initialDelay = 10, - maxAttempts = 3, - maxDelay = 100 + maxAttempts = 3 } = options; - let needsMaintenance: boolean; - let currentPeerCount: number; - - const release = await this.peersMutex.acquire(); - try { - currentPeerCount = this.connectedPeers.length; - needsMaintenance = forceUseAllPeers || currentPeerCount === 0; - } finally { - release(); + if (!forceUseAllPeers && this.connectedPeers.length > 0) { + return true; } - if (!needsMaintenance) return true; - - let attempts = 0; - while (attempts < maxAttempts) { - attempts++; - if (await this.maintainPeers()) { - const finalRelease = await this.peersMutex.acquire(); - try { - if (this.peers.size < this.numPeersToUse) { - this.log.warn( - `Found only ${this.peers.size} peers, expected ${this.numPeersToUse}` - ); - } - return true; - } finally { - finalRelease(); + for (let attempts = 0; attempts < maxAttempts; attempts++) { + const success = await this.maintainPeers(); + if (success) { + if (this.connectedPeers.length < this.numPeersToUse) { + this.log.warn( + `Found only ${this.connectedPeers.length} peers, expected ${this.numPeersToUse}` + ); } + return true; } - if (!autoRetry) return false; - const delayMs = Math.min( - initialDelay * Math.pow(2, attempts - 1), - maxDelay - ); - await delay(delayMs); + if (!autoRetry) { + return false; + } + //TODO: handle autoRetry } this.log.error("Failed to find peers to send message to"); return false; - }; + } /** * Starts an interval to maintain the peers list to `numPeersToUse`. @@ -182,7 +142,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { private async startMaintainPeersInterval(interval: number): Promise { this.log.info("Starting maintain peers interval"); try { - await this.maintainPeers(); + // await this.maintainPeers(); this.maintainPeersIntervalId = setInterval(() => { this.maintainPeers().catch((error) => { this.log.error("Error during maintain peers interval:", error); @@ -202,169 +162,30 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { */ private async maintainPeers(): Promise { try { - await this.confirmPeers(); + const currentPeerCount = await this.peerManager.getPeerCount(); + const numPeersToAdd = this.numPeersToUse - currentPeerCount; - const numPeersToAdd = await this.peersMutex.runExclusive(() => { - this.log.info(`Maintaining peers, current count: ${this.peers.size}`); - return this.numPeersToUse - this.peers.size; - }); + if (numPeersToAdd === 0) { + this.log.info("No maintenance required, peer count is sufficient"); + return true; + } + + this.log.info(`Maintaining peers, current count: ${currentPeerCount}`); if (numPeersToAdd > 0) { - await this.findAndAddPeers(numPeersToAdd); + await this.peerManager.findAndAddPeers(numPeersToAdd); + } else { + await this.peerManager.removeExcessPeers(Math.abs(numPeersToAdd)); } - if (numPeersToAdd < 0) { - this.log.warn(` - Peer maintenance completed, but there are more than ${this.numPeersToUse} peers. - This should not happen. - `); - } - - await this.peersMutex.runExclusive(() => { - this.log.info( - `Peer maintenance completed, current count: ${this.peers.size}` - ); - this.renewPeersLocker.cleanUnlocked(); - }); - - return true; + const finalPeerCount = await this.peerManager.getPeerCount(); + this.log.info( + `Peer maintenance completed, current count: ${finalPeerCount}` + ); + return finalPeerCount >= this.numPeersToUse; } catch (error) { - this.log.error("Error during peer maintenance", error); + this.log.error("Error during peer maintenance", { error }); return false; } } - - private async confirmPeers(): Promise { - const connectedPeers = await this.core.connectedPeers(); - const currentPeerIds = new Set(this.peers.keys()); - - // Peers to add (connected but not in our list) - const peersToAdd = connectedPeers.filter( - (p) => !currentPeerIds.has(p.id.toString()) - ); - - // Peers to remove (in our list but not connected) - const peersToRemove = Array.from(this.peers.values()).filter( - (p) => !connectedPeers.some((cp) => cp.id.equals(p.id)) - ); - - await this.peersMutex.runExclusive(async () => { - // Add new peers - for (const peer of peersToAdd) { - this.peers.set(peer.id.toString(), peer); - this.log.info(`Added new peer: ${peer.id.toString()}`); - } - - // Remove disconnected peers - for (const peer of peersToRemove) { - this.peers.delete(peer.id.toString()); - this.log.info(`Removed disconnected peer: ${peer.id.toString()}`); - } - - this.updatePeers(new Map(this.peers)); - this.log.info(`Peers confirmed. Current count: ${this.peers.size}`); - }); - } - - /** - * Finds and adds new peers to the peers list. - * @param numPeers The number of peers to find and add. - */ - private async findAndAddPeers(numPeers: number): Promise { - let newPeers: Peer[]; - const release = await this.peersMutex.acquire(); - try { - this.log.info(`Finding and adding ${numPeers} new peers`); - newPeers = await this.findAdditionalPeers(numPeers); - } finally { - release(); - } - - const dials = await Promise.all( - newPeers.map((peer) => this.connectionManager.attemptDial(peer.id)) - ); - - const finalRelease = await this.peersMutex.acquire(); - try { - const successfulPeers = newPeers.filter((_, index) => dials[index]); - successfulPeers.forEach((peer) => - this.peers.set(peer.id.toString(), peer) - ); - this.updatePeers(new Map(this.peers)); - this.log.info( - `Added ${successfulPeers.length} new peers, total peers: ${this.peers.size}` - ); - return successfulPeers; - } finally { - finalRelease(); - } - } - - /** - * Finds additional peers. - * Attempts to find peers without using bootstrap peers first, - * If no peers are found, - * tries with bootstrap peers. - * @param numPeers The number of peers to find. - */ - private async findAdditionalPeers(numPeers: number): Promise { - this.log.info(`Finding ${numPeers} additional peers`); - try { - let newPeers = await this.core.allPeers(); - - if (newPeers.length === 0) { - this.log.warn("No new peers found."); - } - - newPeers = newPeers - .filter((peer) => !this.peers.has(peer.id.toString())) - .filter((peer) => !this.renewPeersLocker.isLocked(peer.id)) - .slice(0, numPeers); - - return newPeers; - } catch (error) { - this.log.error("Error finding additional peers:", error); - throw error; - } - } - - private updatePeers(peers: Map): void { - this.peers = peers; - this.healthManager.updateProtocolHealth( - this.core.multicodec, - this.peers.size - ); - } -} - -class RenewPeerLocker { - private readonly peers: Map = new Map(); - - public constructor(private lockDuration: number) {} - - public lock(id: PeerId): void { - this.peers.set(id.toString(), Date.now()); - } - - public isLocked(id: PeerId): boolean { - const time = this.peers.get(id.toString()); - - if (time && !this.isTimeUnlocked(time)) { - return true; - } - - return false; - } - - public cleanUnlocked(): void { - Array.from(this.peers.entries()).forEach(([id, lock]) => { - if (this.isTimeUnlocked(lock)) { - this.peers.delete(id.toString()); - } - }); - } - - private isTimeUnlocked(time: number): boolean { - return Date.now() - time >= this.lockDuration; - } } diff --git a/packages/sdk/src/protocols/peer_manager.ts b/packages/sdk/src/protocols/peer_manager.ts new file mode 100644 index 0000000000..db773b1dc3 --- /dev/null +++ b/packages/sdk/src/protocols/peer_manager.ts @@ -0,0 +1,115 @@ +import { Peer, PeerId } from "@libp2p/interface"; +import { ConnectionManager } from "@waku/core"; +import { BaseProtocol } from "@waku/core/lib/base_protocol"; +import { Logger } from "@waku/utils"; +import { Mutex } from "async-mutex"; + +export class PeerManager { + private peers: Map = new Map(); + private readMutex = new Mutex(); + private writeMutex = new Mutex(); + private writeLockHolder: string | null = null; + + public constructor( + private readonly connectionManager: ConnectionManager, + private readonly core: BaseProtocol, + private readonly log: Logger + ) {} + + public getWriteLockHolder(): string | null { + return this.writeLockHolder; + } + + public getPeers(): Peer[] { + return Array.from(this.peers.values()); + } + + public async addPeer(peer: Peer): Promise { + return this.writeMutex.runExclusive(async () => { + this.writeLockHolder = `addPeer: ${peer.id.toString()}`; + await this.connectionManager.attemptDial(peer.id); + this.peers.set(peer.id.toString(), peer); + this.log.info(`Added and dialed peer: ${peer.id.toString()}`); + this.writeLockHolder = null; + }); + } + + public async removePeer(peerId: PeerId): Promise { + return this.writeMutex.runExclusive(() => { + this.writeLockHolder = `removePeer: ${peerId.toString()}`; + this.peers.delete(peerId.toString()); + this.log.info(`Removed peer: ${peerId.toString()}`); + this.writeLockHolder = null; + }); + } + + public async getPeerCount(): Promise { + return this.readMutex.runExclusive(() => this.peers.size); + } + + public async hasPeers(): Promise { + return this.readMutex.runExclusive(() => this.peers.size > 0); + } + + public async removeExcessPeers(excessPeers: number): Promise { + this.log.info(`Removing ${excessPeers} excess peer(s)`); + const peersToRemove = Array.from(this.peers.values()).slice(0, excessPeers); + for (const peer of peersToRemove) { + await this.removePeer(peer.id); + } + } + + public async disconnectPeer(peerId: PeerId): Promise { + try { + this.writeLockHolder = `disconnectPeer: ${peerId.toString()}`; + this.log.info(`Disconnecting peer: ${peerId}`); + await this.connectionManager.dropConnection(peerId); + await this.removePeer(peerId); + this.log.info(`Disconnected peer: ${peerId}`); + this.writeLockHolder = null; + return true; + } catch (error) { + this.log.error("Error disconnecting peer:", error); + this.writeLockHolder = null; + return false; + } + } + + /** + * Finds and adds new peers to the peers list. + * @param numPeers The number of peers to find and add. + */ + public async findAndAddPeers(numPeers: number): Promise { + const additionalPeers = await this.findPeers(numPeers); + if (additionalPeers.length === 0) { + this.log.warn("No additional peers found"); + return []; + } + return this.addMultiplePeers(additionalPeers); + } + + /** + * Finds additional peers. + * @param numPeers The number of peers to find. + */ + private async findPeers(numPeers: number): Promise { + const connectedPeers = await this.core.getPeers(); + + return this.readMutex.runExclusive(async () => { + const newPeers = connectedPeers + .filter((peer) => !this.peers.has(peer.id.toString())) + .slice(0, numPeers); + + return newPeers; + }); + } + + private async addMultiplePeers(peers: Peer[]): Promise { + const addedPeers: Peer[] = []; + for (const peer of peers) { + await this.addPeer(peer); + addedPeers.push(peer); + } + return addedPeers; + } +} diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts index 21ad42f407..d447aafbce 100644 --- a/packages/tests/tests/light-push/peer_management.spec.ts +++ b/packages/tests/tests/light-push/peer_management.spec.ts @@ -47,11 +47,15 @@ describe("Waku Light Push: Connection Management: E2E", function () { expect(failures?.length || 0).to.equal(0); }); - it.only("Failed peers are renewed", async function () { + it("Failed peers are renewed", async function () { // send a lightpush request -- should have all successes - const response1 = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello_World") - }); + const response1 = await waku.lightPush.send( + encoder, + { + payload: utf8ToBytes("Hello_World") + }, + { forceUseAllPeers: true } + ); expect(response1.successes.length).to.be.equal( waku.lightPush.numPeersToUse diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index fa9b97c85d..80feaec6b3 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -1,8 +1,6 @@ -import type { Connection, Peer, PeerStore } from "@libp2p/interface"; -import { ShardInfo } from "@waku/interfaces"; +import type { Peer, PeerStore } from "@libp2p/interface"; import { bytesToUtf8 } from "../bytes/index.js"; -import { decodeRelayShard } from "../common/relay_shard_codec.js"; /** * Returns a pseudo-random peer that supports the given protocol. @@ -69,39 +67,3 @@ export async function getPeersForProtocol( }); return peers; } - -export async function getConnectedPeersForProtocolAndShard( - connections: Connection[], - peerStore: PeerStore, - protocols: string[], - shardInfo?: ShardInfo -): Promise { - const openConnections = connections.filter( - (connection) => connection.status === "open" - ); - - const peerPromises = openConnections.map(async (connection) => { - const peer = await peerStore.get(connection.remotePeer); - const supportsProtocol = protocols.some((protocol) => - peer.protocols.includes(protocol) - ); - - if (supportsProtocol) { - if (shardInfo) { - const encodedPeerShardInfo = peer.metadata.get("shardInfo"); - const peerShardInfo = - encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo); - - if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) { - return peer; - } - } else { - return peer; - } - } - return null; - }); - - const peersWithNulls = await Promise.all(peerPromises); - return peersWithNulls.filter((peer): peer is Peer => peer !== null); -}