fix: don't dial discovered peers if have already been attempted dial (#1657)

* don't dial peers if they exist in PeerStore already

* fix(tests): bugs & add types to dispatch event

* fix: more tests

* fix: dial validation

* update doc & reduce delay

* refactor test

* use -1 instead of Infinity

* fix comment

* fix rebase
This commit is contained in:
Danish Arora 2023-10-20 18:16:48 +05:30 committed by GitHub
parent 0f7d63ef93
commit 1892f5093d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 162 additions and 49 deletions

View File

@ -38,7 +38,7 @@ export class ConnectionManager
private dialAttemptsForPeer: Map<string, number> = new Map();
private dialErrorsForPeer: Map<string, any> = new Map();
private currentActiveDialCount = 0;
private currentActiveParallelDialCount = 0;
private pendingPeerDialQueue: Array<PeerId> = [];
public static create(
@ -183,7 +183,7 @@ export class ConnectionManager
}
private async dialPeer(peerId: PeerId): Promise<void> {
this.currentActiveDialCount += 1;
this.currentActiveParallelDialCount += 1;
let dialAttempt = 0;
while (dialAttempt < this.options.maxDialAttemptsForPeer) {
try {
@ -199,7 +199,10 @@ export class ConnectionManager
conn.tags = Array.from(new Set([...conn.tags, ...tags]));
});
this.dialAttemptsForPeer.delete(peerId.toString());
// instead of deleting the peer from the peer store, we set the dial attempt to -1
// this helps us keep track of peers that have been dialed before
this.dialAttemptsForPeer.set(peerId.toString(), -1);
// Dialing succeeded, break the loop
break;
} catch (error) {
@ -224,7 +227,7 @@ export class ConnectionManager
}
// Always decrease the active dial count and process the dial queue
this.currentActiveDialCount--;
this.currentActiveParallelDialCount--;
this.processDialQueue();
// If max dial attempts reached and dialing failed, delete the peer
@ -276,7 +279,7 @@ export class ConnectionManager
private processDialQueue(): void {
if (
this.pendingPeerDialQueue.length > 0 &&
this.currentActiveDialCount < this.options.maxParallelDials
this.currentActiveParallelDialCount < this.options.maxParallelDials
) {
const peerId = this.pendingPeerDialQueue.shift();
if (!peerId) return;
@ -322,7 +325,7 @@ export class ConnectionManager
private async attemptDial(peerId: PeerId): Promise<void> {
if (!(await this.shouldDialPeer(peerId))) return;
if (this.currentActiveDialCount >= this.options.maxParallelDials) {
if (this.currentActiveParallelDialCount >= this.options.maxParallelDials) {
this.pendingPeerDialQueue.push(peerId);
return;
}
@ -404,6 +407,7 @@ export class ConnectionManager
* 1. If the peer is already connected, don't dial
* 2. If the peer is not part of any of the configured pubsub topics, don't dial
* 3. If the peer is not dialable based on bootstrap status, don't dial
* 4. If the peer is already has an active dial attempt, or has been dialed before, don't dial it
* @returns true if the peer should be dialed, false otherwise
*/
private async shouldDialPeer(peerId: PeerId): Promise<boolean> {
@ -437,6 +441,14 @@ export class ConnectionManager
return false;
}
// If the peer is already already has an active dial attempt, or has been dialed before, don't dial it
if (this.dialAttemptsForPeer.has(peerId.toString())) {
log.warn(
`Peer ${peerId.toString()} has already been attempted dial before, or already has a dial attempt in progress, skipping dial`
);
return false;
}
return true;
}

View File

@ -1,3 +1,5 @@
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PeerInfo } from "@libp2p/interface/peer-info";
import { CustomEvent } from "@libp2p/interfaces/events";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { EPeersByDiscoveryEvents, LightNode, Tags } from "@waku/interfaces";
@ -49,7 +51,13 @@ describe("ConnectionManager", function () {
});
waku.libp2p.dispatchEvent(
new CustomEvent("peer", { detail: await createSecp256k1PeerId() })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: peerIdBootstrap,
multiaddrs: [],
protocols: []
}
})
);
expect(await peerDiscoveryBootstrap).to.eq(true);
@ -77,7 +85,13 @@ describe("ConnectionManager", function () {
});
waku.libp2p.dispatchEvent(
new CustomEvent("peer", { detail: peerIdPx })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: peerIdPx,
multiaddrs: [],
protocols: []
}
})
);
expect(await peerDiscoveryPeerExchange).to.eq(true);
@ -109,7 +123,7 @@ describe("ConnectionManager", function () {
});
waku.libp2p.dispatchEvent(
new CustomEvent("peer:connect", { detail: peerIdBootstrap })
new CustomEvent<PeerId>("peer:connect", { detail: peerIdBootstrap })
);
expect(await peerConnectedBootstrap).to.eq(true);
@ -136,7 +150,7 @@ describe("ConnectionManager", function () {
});
waku.libp2p.dispatchEvent(
new CustomEvent("peer:connect", { detail: peerIdPx })
new CustomEvent<PeerId>("peer:connect", { detail: peerIdPx })
);
expect(await peerConnectedPeerExchange).to.eq(true);
@ -182,27 +196,34 @@ describe("ConnectionManager", function () {
attemptDialSpy.restore();
});
it("should be called on all `peer:discovery` events", async function () {
it("should be called at least once on all `peer:discovery` events", async function () {
this.timeout(TEST_TIMEOUT);
const totalPeerIds = 5;
for (let i = 1; i <= totalPeerIds; i++) {
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: `peer-id-${i}` })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: await createSecp256k1PeerId(),
multiaddrs: [],
protocols: []
}
})
);
}
// add delay to allow async function calls within attemptDial to finish
await delay(100);
expect(attemptDialSpy.callCount).to.equal(
expect(attemptDialSpy.callCount).to.be.greaterThanOrEqual(
totalPeerIds,
"attemptDial should be called once for each peer:discovery event"
"attemptDial should be called at least once for each peer:discovery event"
);
});
});
describe("dialPeer method", function () {
let peerStoreHasStub: SinonStub;
let dialAttemptsForPeerHasStub: SinonStub;
beforeEach(function () {
getConnectionsStub = sinon.stub(
(waku.connectionManager as any).libp2p,
@ -213,29 +234,44 @@ describe("ConnectionManager", function () {
"getTagNamesForPeer"
);
dialPeerStub = sinon.stub(waku.connectionManager as any, "dialPeer");
peerStoreHasStub = sinon.stub(waku.libp2p.peerStore, "has");
dialAttemptsForPeerHasStub = sinon.stub(
(waku.connectionManager as any).dialAttemptsForPeer,
"has"
);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer is a bootstrap peer
getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);
// simulate that the peer is not in the peerStore
peerStoreHasStub.returns(false);
// simulate that the peer has not been dialed before
dialAttemptsForPeerHasStub.returns(false);
});
afterEach(function () {
dialPeerStub.restore();
getTagNamesForPeerStub.restore();
getConnectionsStub.restore();
peerStoreHasStub.restore();
dialAttemptsForPeerHasStub.restore();
});
describe("For bootstrap peers", function () {
it("should be called for bootstrap peers", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer is a bootstrap peer
getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);
const bootstrapPeer = await createSecp256k1PeerId();
// emit a peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: bootstrapPeer })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: { id: bootstrapPeer, multiaddrs: [], protocols: [] }
})
);
// wait for the async function calls within attemptDial to finish
@ -251,15 +287,15 @@ describe("ConnectionManager", function () {
it("should not be called more than DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED times for bootstrap peers", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer is a bootstrap peer
getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);
// emit first peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: "bootstrap-peer" })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: await createSecp256k1PeerId(),
multiaddrs: [],
protocols: []
}
})
);
await delay(500);
@ -271,8 +307,12 @@ describe("ConnectionManager", function () {
for (let i = 1; i <= totalBootstrapPeers; i++) {
await delay(500);
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", {
detail: await createSecp256k1PeerId()
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: await createSecp256k1PeerId(),
multiaddrs: [],
protocols: []
}
})
);
}
@ -289,17 +329,17 @@ describe("ConnectionManager", function () {
it("should be called for peers with PEER_EXCHANGE tags", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer has a PEER_EXCHANGE tag
getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]);
const pxPeer = await createSecp256k1PeerId();
// emit a peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: pxPeer })
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: pxPeer,
multiaddrs: [],
protocols: []
}
})
);
// wait for the async function calls within attemptDial to finish
@ -315,18 +355,16 @@ describe("ConnectionManager", function () {
it("should be called for every peer with PEER_EXCHANGE tags", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer has a PEER_EXCHANGE tag
getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]);
// emit multiple peer:discovery events
const totalPxPeers = 5;
for (let i = 0; i < totalPxPeers; i++) {
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", {
detail: await createSecp256k1PeerId()
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: await createSecp256k1PeerId(),
multiaddrs: [],
protocols: []
}
})
);
await delay(500);

View File

@ -1,20 +1,32 @@
import { CustomEvent } from "@libp2p/interface/events";
import type { PeerId } from "@libp2p/interface/peer-id";
import type { PeerInfo } from "@libp2p/interface/peer-info";
import { multiaddr } from "@multiformats/multiaddr";
import type { Multiaddr } from "@multiformats/multiaddr";
import type { Waku } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk";
import { expect } from "chai";
import Sinon, { SinonSpy, SinonStub } from "sinon";
import { NimGoNode, tearDownNodes } from "../src/index.js";
import {
delay,
makeLogFileName,
NimGoNode,
tearDownNodes
} from "../src/index.js";
describe("dials multiaddr", function () {
describe("multiaddr: dialing", function () {
let waku: Waku;
let nwaku: NimGoNode;
let dialPeerSpy: SinonSpy;
let isPeerTopicConfigured: SinonStub;
afterEach(async function () {
this.timeout(15000);
await tearDownNodes(nwaku, waku);
});
it("TLS", async function () {
it("can dial TLS multiaddrs", async function () {
this.timeout(20_000);
let tlsWorks = true;
@ -36,4 +48,55 @@ describe("dials multiaddr", function () {
expect(tlsWorks).to.eq(true);
});
describe("does not attempt the same peer discovered multiple times more than once", function () {
const PEER_DISCOVERY_COUNT = 3;
let peerId: PeerId;
let multiaddr: Multiaddr;
beforeEach(async function () {
this.timeout(10_000);
nwaku = new NimGoNode(makeLogFileName(this));
await nwaku.start();
waku = await createLightNode();
peerId = await nwaku.getPeerId();
multiaddr = await nwaku.getMultiaddrWithId();
isPeerTopicConfigured = Sinon.stub(
waku.connectionManager as any,
"isPeerTopicConfigured"
);
isPeerTopicConfigured.resolves(true);
dialPeerSpy = Sinon.spy(waku.connectionManager as any, "dialPeer");
});
afterEach(function () {
dialPeerSpy.restore();
});
it("through manual discovery", async function () {
this.timeout(20_000);
const discoverPeer = (): void => {
waku.libp2p.dispatchEvent(
new CustomEvent<PeerInfo>("peer:discovery", {
detail: {
id: peerId,
protocols: [],
multiaddrs: [multiaddr]
}
})
);
};
for (let i = 0; i < PEER_DISCOVERY_COUNT; i++) {
discoverPeer();
await delay(100);
}
expect(dialPeerSpy.callCount).to.eq(1);
});
});
});