From 1892f5093da540530d7ee5640178ebaa46cf769f Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Fri, 20 Oct 2023 18:16:48 +0530 Subject: [PATCH] fix: don't dial discovered peers if have already been attempted dial (#1657) * don't dial peers if they exist in PeerStore already * fix(tests): bugs & add types to dispatch event * fix: more tests * fix: dial validation * update doc & reduce delay * refactor test * use -1 instead of Infinity * fix comment * fix rebase --- packages/core/src/lib/connection_manager.ts | 24 +++- .../tests/tests/connection_manager.spec.ts | 118 ++++++++++++------ packages/tests/tests/multiaddr.node.spec.ts | 69 +++++++++- 3 files changed, 162 insertions(+), 49 deletions(-) diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index debe9c5096..bd71345b90 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -38,7 +38,7 @@ export class ConnectionManager private dialAttemptsForPeer: Map = new Map(); private dialErrorsForPeer: Map = new Map(); - private currentActiveDialCount = 0; + private currentActiveParallelDialCount = 0; private pendingPeerDialQueue: Array = []; public static create( @@ -183,7 +183,7 @@ export class ConnectionManager } private async dialPeer(peerId: PeerId): Promise { - this.currentActiveDialCount += 1; + this.currentActiveParallelDialCount += 1; let dialAttempt = 0; while (dialAttempt < this.options.maxDialAttemptsForPeer) { try { @@ -199,7 +199,10 @@ export class ConnectionManager conn.tags = Array.from(new Set([...conn.tags, ...tags])); }); - this.dialAttemptsForPeer.delete(peerId.toString()); + // instead of deleting the peer from the peer store, we set the dial attempt to -1 + // this helps us keep track of peers that have been dialed before + this.dialAttemptsForPeer.set(peerId.toString(), -1); + // Dialing succeeded, break the loop break; } catch (error) { @@ -224,7 +227,7 @@ export class ConnectionManager } // Always decrease the active dial count and process the dial queue - this.currentActiveDialCount--; + this.currentActiveParallelDialCount--; this.processDialQueue(); // If max dial attempts reached and dialing failed, delete the peer @@ -276,7 +279,7 @@ export class ConnectionManager private processDialQueue(): void { if ( this.pendingPeerDialQueue.length > 0 && - this.currentActiveDialCount < this.options.maxParallelDials + this.currentActiveParallelDialCount < this.options.maxParallelDials ) { const peerId = this.pendingPeerDialQueue.shift(); if (!peerId) return; @@ -322,7 +325,7 @@ export class ConnectionManager private async attemptDial(peerId: PeerId): Promise { if (!(await this.shouldDialPeer(peerId))) return; - if (this.currentActiveDialCount >= this.options.maxParallelDials) { + if (this.currentActiveParallelDialCount >= this.options.maxParallelDials) { this.pendingPeerDialQueue.push(peerId); return; } @@ -404,6 +407,7 @@ export class ConnectionManager * 1. If the peer is already connected, don't dial * 2. If the peer is not part of any of the configured pubsub topics, don't dial * 3. If the peer is not dialable based on bootstrap status, don't dial + * 4. If the peer is already has an active dial attempt, or has been dialed before, don't dial it * @returns true if the peer should be dialed, false otherwise */ private async shouldDialPeer(peerId: PeerId): Promise { @@ -437,6 +441,14 @@ export class ConnectionManager return false; } + // If the peer is already already has an active dial attempt, or has been dialed before, don't dial it + if (this.dialAttemptsForPeer.has(peerId.toString())) { + log.warn( + `Peer ${peerId.toString()} has already been attempted dial before, or already has a dial attempt in progress, skipping dial` + ); + return false; + } + return true; } diff --git a/packages/tests/tests/connection_manager.spec.ts b/packages/tests/tests/connection_manager.spec.ts index f3e479b28c..ff777e6b82 100644 --- a/packages/tests/tests/connection_manager.spec.ts +++ b/packages/tests/tests/connection_manager.spec.ts @@ -1,3 +1,5 @@ +import type { PeerId } from "@libp2p/interface/peer-id"; +import type { PeerInfo } from "@libp2p/interface/peer-info"; import { CustomEvent } from "@libp2p/interfaces/events"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { EPeersByDiscoveryEvents, LightNode, Tags } from "@waku/interfaces"; @@ -49,7 +51,13 @@ describe("ConnectionManager", function () { }); waku.libp2p.dispatchEvent( - new CustomEvent("peer", { detail: await createSecp256k1PeerId() }) + new CustomEvent("peer:discovery", { + detail: { + id: peerIdBootstrap, + multiaddrs: [], + protocols: [] + } + }) ); expect(await peerDiscoveryBootstrap).to.eq(true); @@ -77,7 +85,13 @@ describe("ConnectionManager", function () { }); waku.libp2p.dispatchEvent( - new CustomEvent("peer", { detail: peerIdPx }) + new CustomEvent("peer:discovery", { + detail: { + id: peerIdPx, + multiaddrs: [], + protocols: [] + } + }) ); expect(await peerDiscoveryPeerExchange).to.eq(true); @@ -109,7 +123,7 @@ describe("ConnectionManager", function () { }); waku.libp2p.dispatchEvent( - new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + new CustomEvent("peer:connect", { detail: peerIdBootstrap }) ); expect(await peerConnectedBootstrap).to.eq(true); @@ -136,7 +150,7 @@ describe("ConnectionManager", function () { }); waku.libp2p.dispatchEvent( - new CustomEvent("peer:connect", { detail: peerIdPx }) + new CustomEvent("peer:connect", { detail: peerIdPx }) ); expect(await peerConnectedPeerExchange).to.eq(true); @@ -182,27 +196,34 @@ describe("ConnectionManager", function () { attemptDialSpy.restore(); }); - it("should be called on all `peer:discovery` events", async function () { + it("should be called at least once on all `peer:discovery` events", async function () { this.timeout(TEST_TIMEOUT); const totalPeerIds = 5; for (let i = 1; i <= totalPeerIds; i++) { waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { detail: `peer-id-${i}` }) + new CustomEvent("peer:discovery", { + detail: { + id: await createSecp256k1PeerId(), + multiaddrs: [], + protocols: [] + } + }) ); } - // add delay to allow async function calls within attemptDial to finish await delay(100); - expect(attemptDialSpy.callCount).to.equal( + expect(attemptDialSpy.callCount).to.be.greaterThanOrEqual( totalPeerIds, - "attemptDial should be called once for each peer:discovery event" + "attemptDial should be called at least once for each peer:discovery event" ); }); }); describe("dialPeer method", function () { + let peerStoreHasStub: SinonStub; + let dialAttemptsForPeerHasStub: SinonStub; beforeEach(function () { getConnectionsStub = sinon.stub( (waku.connectionManager as any).libp2p, @@ -213,29 +234,44 @@ describe("ConnectionManager", function () { "getTagNamesForPeer" ); dialPeerStub = sinon.stub(waku.connectionManager as any, "dialPeer"); + peerStoreHasStub = sinon.stub(waku.libp2p.peerStore, "has"); + dialAttemptsForPeerHasStub = sinon.stub( + (waku.connectionManager as any).dialAttemptsForPeer, + "has" + ); + + // simulate that the peer is not connected + getConnectionsStub.returns([]); + + // simulate that the peer is a bootstrap peer + getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]); + + // simulate that the peer is not in the peerStore + peerStoreHasStub.returns(false); + + // simulate that the peer has not been dialed before + dialAttemptsForPeerHasStub.returns(false); }); afterEach(function () { dialPeerStub.restore(); getTagNamesForPeerStub.restore(); getConnectionsStub.restore(); + peerStoreHasStub.restore(); + dialAttemptsForPeerHasStub.restore(); }); describe("For bootstrap peers", function () { it("should be called for bootstrap peers", async function () { this.timeout(TEST_TIMEOUT); - // simulate that the peer is not connected - getConnectionsStub.returns([]); - - // simulate that the peer is a bootstrap peer - getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]); - const bootstrapPeer = await createSecp256k1PeerId(); // emit a peer:discovery event waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { detail: bootstrapPeer }) + new CustomEvent("peer:discovery", { + detail: { id: bootstrapPeer, multiaddrs: [], protocols: [] } + }) ); // wait for the async function calls within attemptDial to finish @@ -251,15 +287,15 @@ describe("ConnectionManager", function () { it("should not be called more than DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED times for bootstrap peers", async function () { this.timeout(TEST_TIMEOUT); - // simulate that the peer is not connected - getConnectionsStub.returns([]); - - // simulate that the peer is a bootstrap peer - getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]); - // emit first peer:discovery event waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { detail: "bootstrap-peer" }) + new CustomEvent("peer:discovery", { + detail: { + id: await createSecp256k1PeerId(), + multiaddrs: [], + protocols: [] + } + }) ); await delay(500); @@ -271,8 +307,12 @@ describe("ConnectionManager", function () { for (let i = 1; i <= totalBootstrapPeers; i++) { await delay(500); waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { - detail: await createSecp256k1PeerId() + new CustomEvent("peer:discovery", { + detail: { + id: await createSecp256k1PeerId(), + multiaddrs: [], + protocols: [] + } }) ); } @@ -289,17 +329,17 @@ describe("ConnectionManager", function () { it("should be called for peers with PEER_EXCHANGE tags", async function () { this.timeout(TEST_TIMEOUT); - // simulate that the peer is not connected - getConnectionsStub.returns([]); - - // simulate that the peer has a PEER_EXCHANGE tag - getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]); - const pxPeer = await createSecp256k1PeerId(); // emit a peer:discovery event waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { detail: pxPeer }) + new CustomEvent("peer:discovery", { + detail: { + id: pxPeer, + multiaddrs: [], + protocols: [] + } + }) ); // wait for the async function calls within attemptDial to finish @@ -315,18 +355,16 @@ describe("ConnectionManager", function () { it("should be called for every peer with PEER_EXCHANGE tags", async function () { this.timeout(TEST_TIMEOUT); - // simulate that the peer is not connected - getConnectionsStub.returns([]); - - // simulate that the peer has a PEER_EXCHANGE tag - getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]); - // emit multiple peer:discovery events const totalPxPeers = 5; for (let i = 0; i < totalPxPeers; i++) { waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { - detail: await createSecp256k1PeerId() + new CustomEvent("peer:discovery", { + detail: { + id: await createSecp256k1PeerId(), + multiaddrs: [], + protocols: [] + } }) ); await delay(500); diff --git a/packages/tests/tests/multiaddr.node.spec.ts b/packages/tests/tests/multiaddr.node.spec.ts index 6b9dd34415..a457d62211 100644 --- a/packages/tests/tests/multiaddr.node.spec.ts +++ b/packages/tests/tests/multiaddr.node.spec.ts @@ -1,20 +1,32 @@ +import { CustomEvent } from "@libp2p/interface/events"; +import type { PeerId } from "@libp2p/interface/peer-id"; +import type { PeerInfo } from "@libp2p/interface/peer-info"; import { multiaddr } from "@multiformats/multiaddr"; +import type { Multiaddr } from "@multiformats/multiaddr"; import type { Waku } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { expect } from "chai"; +import Sinon, { SinonSpy, SinonStub } from "sinon"; -import { NimGoNode, tearDownNodes } from "../src/index.js"; +import { + delay, + makeLogFileName, + NimGoNode, + tearDownNodes +} from "../src/index.js"; -describe("dials multiaddr", function () { +describe("multiaddr: dialing", function () { let waku: Waku; let nwaku: NimGoNode; + let dialPeerSpy: SinonSpy; + let isPeerTopicConfigured: SinonStub; afterEach(async function () { this.timeout(15000); await tearDownNodes(nwaku, waku); }); - it("TLS", async function () { + it("can dial TLS multiaddrs", async function () { this.timeout(20_000); let tlsWorks = true; @@ -36,4 +48,55 @@ describe("dials multiaddr", function () { expect(tlsWorks).to.eq(true); }); + + describe("does not attempt the same peer discovered multiple times more than once", function () { + const PEER_DISCOVERY_COUNT = 3; + let peerId: PeerId; + let multiaddr: Multiaddr; + + beforeEach(async function () { + this.timeout(10_000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start(); + + waku = await createLightNode(); + + peerId = await nwaku.getPeerId(); + multiaddr = await nwaku.getMultiaddrWithId(); + + isPeerTopicConfigured = Sinon.stub( + waku.connectionManager as any, + "isPeerTopicConfigured" + ); + isPeerTopicConfigured.resolves(true); + dialPeerSpy = Sinon.spy(waku.connectionManager as any, "dialPeer"); + }); + + afterEach(function () { + dialPeerSpy.restore(); + }); + + it("through manual discovery", async function () { + this.timeout(20_000); + + const discoverPeer = (): void => { + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { + detail: { + id: peerId, + protocols: [], + multiaddrs: [multiaddr] + } + }) + ); + }; + + for (let i = 0; i < PEER_DISCOVERY_COUNT; i++) { + discoverPeer(); + await delay(100); + } + + expect(dialPeerSpy.callCount).to.eq(1); + }); + }); });