diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 8ce6162a50..3ebbc3c443 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -5,12 +5,8 @@ import type { Libp2pComponents, PubsubTopic } from "@waku/interfaces"; -import { Logger, pubsubTopicsToShardInfo } from "@waku/utils"; -import { - getConnectedPeersForProtocolAndShard, - getPeersForProtocol, - sortPeersByLatency -} from "@waku/utils/libp2p"; +import { Logger } from "@waku/utils"; +import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p"; import { filterPeersByDiscovery } from "./filterPeers.js"; import { StreamManager } from "./stream_manager/index.js"; @@ -63,7 +59,7 @@ export class BaseProtocol implements IBaseProtocolCore { return getPeersForProtocol(this.peerStore, [this.multicodec]); } - public async connectedPeers(): Promise { + public async connectedPeers(withOpenStreams = false): Promise { const peers = await this.allPeers(); return peers.filter((peer) => { return ( @@ -77,9 +73,8 @@ export class BaseProtocol implements IBaseProtocolCore { * * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. - - * @returns A list of peers that support the protocol sorted by latency. - */ + * @returns A list of peers that support the protocol sorted by latency. By default, returns all peers available, including bootstrap. + */ public async getPeers( { numPeers, @@ -88,7 +83,7 @@ export class BaseProtocol implements IBaseProtocolCore { numPeers: number; maxBootstrapPeers: number; } = { - maxBootstrapPeers: 1, + maxBootstrapPeers: 0, numPeers: 0 } ): Promise { @@ -103,7 +98,7 @@ export class BaseProtocol implements IBaseProtocolCore { // Filter the peers based on discovery & number of peers requested const filteredPeers = filterPeersByDiscovery( - connectedPeersForProtocolAndShard, + allAvailableConnectedPeers, numPeers, maxBootstrapPeers ); diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 5b5e9ea919..fe7282909e 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -18,7 +18,7 @@ export type IBaseProtocolCore = { multicodec: string; peerStore: PeerStore; allPeers: () => Promise; - connectedPeers: () => Promise; + connectedPeers: (withOpenStreams?: boolean) => Promise; addLibp2pEventListener: Libp2p["addEventListener"]; removeLibp2pEventListener: Libp2p["removeEventListener"]; }; @@ -36,10 +36,6 @@ export type NetworkConfig = StaticSharding | AutoSharding; * Options for using LightPush and Filter */ export type ProtocolUseOptions = { - /** - * Optional flag to enable auto-retry with exponential backoff - */ - autoRetry?: boolean; /** * Optional flag to force using all available peers */ @@ -48,14 +44,6 @@ export type ProtocolUseOptions = { * Optional maximum number of attempts for exponential backoff */ maxAttempts?: number; - /** - * Optional initial delay in milliseconds for exponential backoff - */ - initialDelay?: number; - /** - * Optional maximum delay in milliseconds for exponential backoff - */ - maxDelay?: number; }; export type ProtocolCreateOptions = { diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 52009aa05c..161ccd2a01 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -1,9 +1,10 @@ import type { Peer, PeerId } from "@libp2p/interface"; -import { ConnectionManager, getHealthManager } from "@waku/core"; +import { ConnectionManager } from "@waku/core"; import { BaseProtocol } from "@waku/core/lib/base_protocol"; import { IBaseProtocolSDK, IHealthManager, + PeerIdStr, ProtocolUseOptions } from "@waku/interfaces"; import { delay, Logger } from "@waku/utils"; @@ -12,15 +13,15 @@ interface Options { numPeersToUse?: number; maintainPeersInterval?: number; } +///TODO: update HealthManager -const RENEW_TIME_LOCK_DURATION = 30 * 1000; const DEFAULT_NUM_PEERS_TO_USE = 2; const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; export class BaseProtocolSDK implements IBaseProtocolSDK { - private healthManager: IHealthManager; + private peerManager: PeerManager; public readonly numPeersToUse: number; - private peers: Peer[] = []; + private peers: Map = new Map(); private maintainPeersIntervalId: ReturnType< typeof window.setInterval > | null = null; @@ -38,17 +39,18 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { ) { this.log = new Logger(`sdk:${core.multicodec}`); - this.healthManager = getHealthManager(); + this.peerManager = new PeerManager(connectionManager, core, this.log); this.numPeersToUse = options?.numPeersToUse ?? DEFAULT_NUM_PEERS_TO_USE; const maintainPeersInterval = options?.maintainPeersInterval ?? DEFAULT_MAINTAIN_PEERS_INTERVAL; + void this.setupEventListeners(); void this.startMaintainPeersInterval(maintainPeersInterval); } public get connectedPeers(): Peer[] { - return this.peers; + return Array.from(this.peers.values()); } /** @@ -93,30 +95,30 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } } + private setupEventListeners(): void { + this.core.addLibp2pEventListener( + "peer:connect", + () => void this.confirmPeers() + ); + this.core.addLibp2pEventListener( + "peer:disconnect", + () => void this.confirmPeers() + ); + } + /** - * Checks if there are peers to send a message to. - * If `forceUseAllPeers` is `false` (default) and there are connected peers, returns `true`. - * If `forceUseAllPeers` is `true` or there are no connected peers, tries to find new peers from the ConnectionManager. - * If `autoRetry` is `false`, returns `false` if no peers are found. - * If `autoRetry` is `true`, tries to find new peers from the ConnectionManager with exponential backoff. - * Returns `true` if peers are found, `false` otherwise. + * Checks if there are sufficient peers to send a message to. + * If `forceUseAllPeers` is `false` (default), returns `true` if there are any connected peers. + * If `forceUseAllPeers` is `true`, attempts to connect to `numPeersToUse` peers. * @param options Optional options object - * @param options.autoRetry Optional flag to enable auto-retry with exponential backoff (default: false) - * @param options.forceUseAllPeers Optional flag to force using all available peers (default: false) - * @param options.initialDelay Optional initial delay in milliseconds for exponential backoff (default: 10) - * @param options.maxAttempts Optional maximum number of attempts for exponential backoff (default: 3) - * @param options.maxDelay Optional maximum delay in milliseconds for exponential backoff (default: 100) + * @param options.forceUseAllPeers Optional flag to force connecting to `numPeersToUse` peers (default: false) + * @param options.maxAttempts Optional maximum number of attempts to reach the required number of peers (default: 3) + * @returns `true` if the required number of peers are connected, `false` otherwise */ - protected hasPeers = async ( + protected async hasPeers( options: Partial = {} - ): Promise => { - const { - autoRetry = false, - forceUseAllPeers = false, - initialDelay = 10, - maxAttempts = 3, - maxDelay = 100 - } = options; + ): Promise { + const { forceUseAllPeers = false, maxAttempts = 3 } = options; if (!forceUseAllPeers && this.connectedPeers.length > 0) return true; @@ -124,24 +126,34 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { while (attempts < maxAttempts) { attempts++; if (await this.maintainPeers()) { - if (this.peers.length < this.numPeersToUse) { + if (this.peers.size < this.numPeersToUse) { this.log.warn( - `Found only ${this.peers.length} peers, expected ${this.numPeersToUse}` + `Found only ${this.peers.size} peers, expected ${this.numPeersToUse}` ); } return true; } - if (!autoRetry) return false; - const delayMs = Math.min( - initialDelay * Math.pow(2, attempts - 1), - maxDelay - ); - await delay(delayMs); + if (!autoRetry) { + return false; + } + //TODO: handle autoRetry } - this.log.error("Failed to find peers to send message to"); + for (let attempts = 0; attempts < maxAttempts; attempts++) { + await this.maintainPeers(); + + if (this.connectedPeers.length >= this.numPeersToUse) { + return true; + } + + this.log.warn( + `Found only ${this.connectedPeers.length} peers, expected ${this.numPeersToUse}. Retrying...` + ); + } + + this.log.error("Failed to find required number of peers"); return false; - }; + } /** * Starts an interval to maintain the peers list to `numPeersToUse`. @@ -150,7 +162,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { private async startMaintainPeersInterval(interval: number): Promise { this.log.info("Starting maintain peers interval"); try { - await this.maintainPeers(); + // await this.maintainPeers(); this.maintainPeersIntervalId = setInterval(() => { this.maintainPeers().catch((error) => { this.log.error("Error during maintain peers interval:", error); @@ -176,18 +188,36 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { this.maintainPeersLock = true; this.log.info(`Maintaining peers, current count: ${this.peers.length}`); try { - const numPeersToAdd = this.numPeersToUse - this.peers.length; + await this.confirmPeers(); + this.log.info(`Maintaining peers, current count: ${this.peers.size}`); + + const numPeersToAdd = this.numPeersToUse - this.peers.size; if (numPeersToAdd > 0) { - await this.findAndAddPeers(numPeersToAdd); + await this.peerManager.findAndAddPeers(numPeersToAdd); + } else { + await this.peerManager.removeExcessPeers(Math.abs(numPeersToAdd)); } + this.log.info( - `Peer maintenance completed, current count: ${this.peers.length}` + `Peer maintenance completed, current count: ${this.peers.size}` ); this.renewPeersLocker.cleanUnlocked(); + return true; + } catch (error) { + if (error instanceof Error) { + this.log.error("Error during peer maintenance", { + error: error.message, + stack: error.stack + }); + } else { + this.log.error("Error during peer maintenance", { + error: String(error) + }); + } + return false; } finally { this.maintainPeersLock = false; } - return true; } /** @@ -204,11 +234,13 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { await Promise.all(dials); - const updatedPeers = [...this.peers, ...additionalPeers]; - this.updatePeers(updatedPeers); + additionalPeers.forEach((peer) => + this.peers.set(peer.id.toString(), peer) + ); + this.updatePeers(this.peers); this.log.info( - `Added ${additionalPeers.length} new peers, total peers: ${this.peers.length}` + `Added ${additionalPeers.length} new peers, total peers: ${this.peers.size}` ); return additionalPeers; } catch (error) { @@ -234,9 +266,7 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } newPeers = newPeers - .filter( - (peer) => this.peers.some((p) => p.id.equals(peer.id)) === false - ) + .filter((peer) => !this.peers.has(peer.id.toString())) .filter((peer) => !this.renewPeersLocker.isLocked(peer.id)) .slice(0, numPeers); @@ -247,11 +277,11 @@ export class BaseProtocolSDK implements IBaseProtocolSDK { } } - private updatePeers(peers: Peer[]): void { + private updatePeers(peers: Map): void { this.peers = peers; this.healthManager.updateProtocolHealth( this.core.multicodec, - this.peers.length + this.peers.size ); } } @@ -276,7 +306,7 @@ class RenewPeerLocker { } public cleanUnlocked(): void { - Object.entries(this.peers).forEach(([id, lock]) => { + Array.from(this.peers.entries()).forEach(([id, lock]) => { if (this.isTimeUnlocked(lock)) { this.peers.delete(id.toString()); } diff --git a/packages/sdk/src/protocols/peer_manager.ts b/packages/sdk/src/protocols/peer_manager.ts new file mode 100644 index 0000000000..f1613a8021 --- /dev/null +++ b/packages/sdk/src/protocols/peer_manager.ts @@ -0,0 +1,99 @@ +import { Peer, PeerId } from "@libp2p/interface"; +import { ConnectionManager } from "@waku/core"; +import { BaseProtocol } from "@waku/core/lib/base_protocol"; +import { Logger } from "@waku/utils"; +import { Mutex } from "async-mutex"; + +export class PeerManager { + private peers: Map = new Map(); + private readMutex = new Mutex(); + private writeMutex = new Mutex(); + private writeLockHolder: string | null = null; + + public constructor( + private readonly connectionManager: ConnectionManager, + private readonly core: BaseProtocol, + private readonly log: Logger + ) {} + + public getWriteLockHolder(): string | null { + return this.writeLockHolder; + } + + public getPeers(): Peer[] { + return Array.from(this.peers.values()); + } + + public async addPeer(peer: Peer): Promise { + return this.writeMutex.runExclusive(async () => { + this.writeLockHolder = `addPeer: ${peer.id.toString()}`; + await this.connectionManager.attemptDial(peer.id); + this.peers.set(peer.id.toString(), peer); + this.log.info(`Added and dialed peer: ${peer.id.toString()}`); + this.writeLockHolder = null; + }); + } + + public async removePeer(peerId: PeerId): Promise { + return this.writeMutex.runExclusive(() => { + this.writeLockHolder = `removePeer: ${peerId.toString()}`; + this.peers.delete(peerId.toString()); + this.log.info(`Removed peer: ${peerId.toString()}`); + this.writeLockHolder = null; + }); + } + + public async getPeerCount(): Promise { + return this.readMutex.runExclusive(() => this.peers.size); + } + + public async hasPeers(): Promise { + return this.readMutex.runExclusive(() => this.peers.size > 0); + } + + public async removeExcessPeers(excessPeers: number): Promise { + this.log.info(`Removing ${excessPeers} excess peer(s)`); + const peersToRemove = Array.from(this.peers.values()).slice(0, excessPeers); + for (const peer of peersToRemove) { + await this.removePeer(peer.id); + } + } + + /** + * Finds and adds new peers to the peers list. + * @param numPeers The number of peers to find and add. + */ + public async findAndAddPeers(numPeers: number): Promise { + const additionalPeers = await this.findPeers(numPeers); + if (additionalPeers.length === 0) { + this.log.warn("No additional peers found"); + return []; + } + return this.addMultiplePeers(additionalPeers); + } + + /** + * Finds additional peers. + * @param numPeers The number of peers to find. + */ + public async findPeers(numPeers: number): Promise { + const connectedPeers = await this.core.getPeers(); + + return this.readMutex.runExclusive(async () => { + const newPeers = connectedPeers + .filter((peer) => !this.peers.has(peer.id.toString())) + .slice(0, numPeers); + + return newPeers; + }); + } + + public async addMultiplePeers(peers: Peer[]): Promise { + const addedPeers: Peer[] = []; + for (const peer of peers) { + await this.addPeer(peer); + addedPeers.push(peer); + } + return addedPeers; + } +} diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts index a275f4e970..2718c9b29f 100644 --- a/packages/tests/tests/light-push/peer_management.spec.ts +++ b/packages/tests/tests/light-push/peer_management.spec.ts @@ -54,9 +54,13 @@ describe("Waku Light Push: Peer Management: E2E", function () { it("Failed peers are renewed", async function () { // send a lightpush request -- should have all successes - const response1 = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello_World") - }); + const response1 = await waku.lightPush.send( + encoder, + { + payload: utf8ToBytes("Hello_World") + }, + { forceUseAllPeers: true } + ); expect(response1.successes.length).to.be.equal( waku.lightPush.numPeersToUse diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index fa9b97c85d..80feaec6b3 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -1,8 +1,6 @@ -import type { Connection, Peer, PeerStore } from "@libp2p/interface"; -import { ShardInfo } from "@waku/interfaces"; +import type { Peer, PeerStore } from "@libp2p/interface"; import { bytesToUtf8 } from "../bytes/index.js"; -import { decodeRelayShard } from "../common/relay_shard_codec.js"; /** * Returns a pseudo-random peer that supports the given protocol. @@ -69,39 +67,3 @@ export async function getPeersForProtocol( }); return peers; } - -export async function getConnectedPeersForProtocolAndShard( - connections: Connection[], - peerStore: PeerStore, - protocols: string[], - shardInfo?: ShardInfo -): Promise { - const openConnections = connections.filter( - (connection) => connection.status === "open" - ); - - const peerPromises = openConnections.map(async (connection) => { - const peer = await peerStore.get(connection.remotePeer); - const supportsProtocol = protocols.some((protocol) => - peer.protocols.includes(protocol) - ); - - if (supportsProtocol) { - if (shardInfo) { - const encodedPeerShardInfo = peer.metadata.get("shardInfo"); - const peerShardInfo = - encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo); - - if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) { - return peer; - } - } else { - return peer; - } - } - return null; - }); - - const peersWithNulls = await Promise.all(peerPromises); - return peersWithNulls.filter((peer): peer is Peer => peer !== null); -}