diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 21118e0e13..4a43c8081c 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -12,6 +12,7 @@ const log = debug("waku:connection-manager"); export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1; export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3; +export const DEFAULT_MAX_PARALLEL_DIALS = 3; export class ConnectionManager { private static instances = new Map(); @@ -21,6 +22,9 @@ export class ConnectionManager { private dialAttemptsForPeer: Map = new Map(); private dialErrorsForPeer: Map = new Map(); + private currentActiveDialCount = 0; + private pendingPeerDialQueue: Array = []; + public static create( peerId: string, libp2p: Libp2p, @@ -52,6 +56,7 @@ export class ConnectionManager { this.options = { maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER, maxBootstrapPeersAllowed: DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED, + maxParallelDials: DEFAULT_MAX_PARALLEL_DIALS, ...options, }; @@ -60,6 +65,31 @@ export class ConnectionManager { this.run() .then(() => log(`Connection Manager is now running`)) .catch((error) => log(`Unexpected error while running service`, error)); + + // libp2p emits `peer:discovery` events during its initialization + // which means that before the ConnectionManager is initialized, some peers may have been discovered + // we will dial the peers in peerStore ONCE before we start to listen to the `peer:discovery` events within the ConnectionManager + this.dialPeerStorePeers(); + } + + private async dialPeerStorePeers(): Promise { + const peerInfos = await this.libp2pComponents.peerStore.all(); + const dialPromises = []; + for (const peerInfo of peerInfos) { + if ( + this.libp2pComponents + .getConnections() + .find((c) => c.remotePeer === peerInfo.id) + ) + continue; + + dialPromises.push(this.attemptDial(peerInfo.id)); + } + try { + await Promise.all(dialPromises); + } catch (error) { + log(`Unexpected error while dialing peer store peers`, error); + } } private async run(): Promise { @@ -86,6 +116,7 @@ export class ConnectionManager { } private async dialPeer(peerId: PeerId): Promise { + this.currentActiveDialCount += 1; let dialAttempt = 0; while (dialAttempt <= this.options.maxDialAttemptsForPeer) { try { @@ -105,6 +136,7 @@ export class ConnectionManager { return; } catch (e) { const error = e as AggregateError; + this.dialErrorsForPeer.set(peerId.toString(), error); log(`Error dialing peer ${peerId.toString()} - ${error.errors}`); @@ -128,6 +160,33 @@ export class ConnectionManager { return await this.libp2pComponents.peerStore.delete(peerId); } catch (error) { throw `Error deleting undialable peer ${peerId.toString()} from peer store - ${error}`; + } finally { + this.currentActiveDialCount -= 1; + this.processDialQueue(); + } + } + + async dropConnection(peerId: PeerId): Promise { + try { + await this.libp2pComponents.hangUp(peerId); + log(`Dropped connection with peer ${peerId.toString()}`); + } catch (error) { + log( + `Error dropping connection with peer ${peerId.toString()} - ${error}` + ); + } + } + + private async processDialQueue(): Promise { + if ( + this.pendingPeerDialQueue.length > 0 && + this.currentActiveDialCount < this.options.maxParallelDials + ) { + const peerId = this.pendingPeerDialQueue.shift(); + if (!peerId) return; + this.attemptDial(peerId).catch((error) => { + log(error); + }); } } @@ -164,21 +223,50 @@ export class ConnectionManager { ); } + private async attemptDial(peerId: PeerId): Promise { + if (this.currentActiveDialCount >= this.options.maxParallelDials) { + this.pendingPeerDialQueue.push(peerId); + return; + } + + if (!(await this.shouldDialPeer(peerId))) return; + + this.dialPeer(peerId).catch((err) => { + throw `Error dialing peer ${peerId.toString()} : ${err}`; + }); + } + private onEventHandlers = { "peer:discovery": async (evt: CustomEvent): Promise => { const { id: peerId } = evt.detail; - if (!(await this.shouldDialPeer(peerId))) return; - this.dialPeer(peerId).catch((err) => + this.attemptDial(peerId).catch((err) => log(`Error dialing peer ${peerId.toString()} : ${err}`) ); }, - "peer:connect": (evt: CustomEvent): void => { - { - this.keepAliveManager.start( - evt.detail.remotePeer, - this.libp2pComponents.ping.bind(this) - ); + "peer:connect": async (evt: CustomEvent): Promise => { + const { remotePeer: peerId } = evt.detail; + + this.keepAliveManager.start( + peerId, + this.libp2pComponents.ping.bind(this) + ); + + const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes( + Tags.BOOTSTRAP + ); + + if (isBootstrap) { + const bootstrapConnections = this.libp2pComponents + .getConnections() + .filter((conn) => conn.tags.includes(Tags.BOOTSTRAP)); + + // If we have too many bootstrap connections, drop one + if ( + bootstrapConnections.length > this.options.maxBootstrapPeersAllowed + ) { + await this.dropConnection(peerId); + } } }, "peer:disconnect": () => { diff --git a/packages/dns-discovery/src/index.ts b/packages/dns-discovery/src/index.ts index db4a15969b..fd68d4c327 100644 --- a/packages/dns-discovery/src/index.ts +++ b/packages/dns-discovery/src/index.ts @@ -32,7 +32,7 @@ export interface Options { /** * ENR URL to use for DNS discovery */ - enrUrl: string; + enrUrls: string | string[]; /** * Specifies what type of nodes are wanted from the discovery process */ @@ -71,8 +71,8 @@ export class PeerDiscoveryDns this._components = components; this._options = options; - const { enrUrl } = options; - log("Use following EIP-1459 ENR Tree URL: ", enrUrl); + const { enrUrls } = options; + log("Use following EIP-1459 ENR Tree URLs: ", enrUrls); } /** @@ -84,12 +84,15 @@ export class PeerDiscoveryDns this._started = true; if (this.nextPeer === undefined) { - const { enrUrl, wantedNodeCapabilityCount } = this._options; + let { enrUrls } = this._options; + if (!Array.isArray(enrUrls)) enrUrls = [enrUrls]; + + const { wantedNodeCapabilityCount } = this._options; const dns = await DnsNodeDiscovery.dnsOverHttp(); this.nextPeer = dns.getNextPeer.bind( dns, - [enrUrl], + enrUrls, wantedNodeCapabilityCount ); } @@ -138,11 +141,11 @@ export class PeerDiscoveryDns } export function wakuDnsDiscovery( - enrUrl: string, + enrUrls: string[], wantedNodeCapabilityCount: Partial ): (components: DnsDiscoveryComponents) => PeerDiscoveryDns { return (components: DnsDiscoveryComponents) => - new PeerDiscoveryDns(components, { enrUrl, wantedNodeCapabilityCount }); + new PeerDiscoveryDns(components, { enrUrls, wantedNodeCapabilityCount }); } export { DnsNodeDiscovery, SearchContext, DnsClient } from "./dns.js"; diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index 4ddce1ccd3..7424621cd4 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -14,4 +14,8 @@ export interface ConnectionManagerOptions { * This is used to increase intention of dialing non-bootstrap peers, found using other discovery mechanisms (like Peer Exchange) */ maxBootstrapPeersAllowed: number; + /** + * Max number of parallel dials allowed + */ + maxParallelDials: number; } diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index 22722ad625..318f60d416 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -171,7 +171,7 @@ export async function createFullNode( export function defaultPeerDiscovery(): ( components: Libp2pComponents ) => PeerDiscovery { - return wakuDnsDiscovery(enrTree["PROD"], DEFAULT_NODE_REQUIREMENTS); + return wakuDnsDiscovery([enrTree["PROD"]], DEFAULT_NODE_REQUIREMENTS); } export async function defaultLibp2p( diff --git a/packages/tests/tests/dns-peer-discovery.spec.ts b/packages/tests/tests/dns-peer-discovery.spec.ts index edfa9f3f75..2080c2eab3 100644 --- a/packages/tests/tests/dns-peer-discovery.spec.ts +++ b/packages/tests/tests/dns-peer-discovery.spec.ts @@ -13,6 +13,8 @@ import { createLightNode } from "@waku/sdk"; import { expect } from "chai"; import { MemoryDatastore } from "datastore-core"; +import { delay } from "../src/delay.js"; + const maxQuantity = 3; describe("DNS Discovery: Compliance Test", async function () { @@ -28,7 +30,7 @@ describe("DNS Discovery: Compliance Test", async function () { }); return new PeerDiscoveryDns(components, { - enrUrl: enrTree["PROD"], + enrUrls: [enrTree["PROD"]], wantedNodeCapabilityCount: { filter: 1, }, @@ -60,7 +62,7 @@ describe("DNS Node Discovery [live data]", function () { const waku = await createLightNode({ libp2p: { - peerDiscovery: [wakuDnsDiscovery(enrTree["PROD"], nodeRequirements)], + peerDiscovery: [wakuDnsDiscovery([enrTree["PROD"]], nodeRequirements)], }, }); @@ -110,4 +112,28 @@ describe("DNS Node Discovery [live data]", function () { seen.push(ma!.toString()); } }); + it("passes more than one ENR URLs and attempts connection", async function () { + if (process.env.CI) this.skip(); + this.timeout(30_000); + + const nodesToConnect = 2; + + const waku = await createLightNode({ + libp2p: { + peerDiscovery: [ + wakuDnsDiscovery([enrTree["PROD"], enrTree["TEST"]], { + filter: nodesToConnect, + }), + ], + }, + }); + + await waku.start(); + + const allPeers = await waku.libp2p.peerStore.all(); + while (allPeers.length < nodesToConnect) { + await delay(2000); + } + expect(allPeers.length).to.be.eq(nodesToConnect); + }); });