diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index 64dd435d7a..e9d21ae79c 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -10,12 +10,19 @@ import { ensureShardingConfigured, Logger } from "@waku/utils"; import { getConnectedPeersForProtocolAndShard, getPeersForProtocol, - sortPeersByLatency + sortPeersByLatency, + sortPeersByLeastActiveConnections } from "@waku/utils/libp2p"; import { filterPeersByDiscovery } from "./filterPeers.js"; import { StreamManager } from "./stream_manager/index.js"; +type GetPeersOptions = { + prioritizeLatency?: boolean; + numPeers: number; + maxBootstrapPeers: number; +}; + /** * A class with predefined helpers, to be used as a base to implement Waku * Protocols. @@ -83,21 +90,19 @@ export class BaseProtocol implements IBaseProtocolCore { * @returns A list of peers that support the protocol sorted by latency. */ public async getPeers( - { - numPeers, - maxBootstrapPeers - }: { - numPeers: number; - maxBootstrapPeers: number; - } = { + { prioritizeLatency, numPeers, maxBootstrapPeers }: GetPeersOptions = { + prioritizeLatency: true, maxBootstrapPeers: 1, numPeers: 0 } ): Promise { + const activeConnections = + this.components.connectionManager.getConnections(); + // Retrieve all connected peers that support the protocol & shard (if configured) const connectedPeersForProtocolAndShard = await getConnectedPeersForProtocolAndShard( - this.components.connectionManager.getConnections(), + activeConnections, this.peerStore, [this.multicodec], this.options?.shardInfo @@ -112,24 +117,32 @@ export class BaseProtocol implements IBaseProtocolCore { maxBootstrapPeers ); - // Sort the peers by latency - const sortedFilteredPeers = await sortPeersByLatency( - this.peerStore, - filteredPeers - ); + let filteredAndSortedPeers: Peer[]; - if (sortedFilteredPeers.length === 0) { + if (prioritizeLatency) { + filteredAndSortedPeers = await sortPeersByLatency( + this.peerStore, + filteredPeers + ); + } else { + filteredAndSortedPeers = sortPeersByLeastActiveConnections( + filteredPeers, + activeConnections + ); + } + + if (filteredAndSortedPeers.length === 0) { this.log.warn( "No peers found. Ensure you have a connection to the network." ); } - if (sortedFilteredPeers.length < numPeers) { + if (filteredAndSortedPeers.length < numPeers) { this.log.warn( - `Only ${sortedFilteredPeers.length} peers found. Requested ${numPeers}.` + `Only ${filteredAndSortedPeers.length} peers found. Requested ${numPeers}.` ); } - return sortedFilteredPeers; + return filteredAndSortedPeers; } } diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index fa9b97c85d..a8f925d6a9 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -2,6 +2,7 @@ import type { Connection, Peer, PeerStore } from "@libp2p/interface"; import { ShardInfo } from "@waku/interfaces"; import { bytesToUtf8 } from "../bytes/index.js"; +import { isDefined } from "../common/is_defined.js"; import { decodeRelayShard } from "../common/relay_shard_codec.js"; /** @@ -51,6 +52,34 @@ export async function sortPeersByLatency( .map((result) => result.peer); } +/** + * Sorts the list of peers based on the number of active connections. + * @param peers A list of all available peers, that support the protocol and shard. + * @param connections A list of all active connections. + * @returns A list of peers sorted by the number of active connections. + */ +export function sortPeersByLeastActiveConnections( + peers: Peer[], + connections: Connection[] +): Peer[] { + const activePeers = connections + .filter((conn) => + conn.streams.map((stream) => stream.protocol).filter(isDefined) + ) + .map((conn) => conn.remotePeer); + + return peers.sort((a, b) => { + const aConnections = activePeers.filter((peerId) => + peerId.equals(a.id) + ).length; + const bConnections = activePeers.filter((peerId) => + peerId.equals(b.id) + ).length; + + return bConnections - aConnections; + }); +} + /** * Returns the list of peers that supports the given protocol. */