diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 1401a967be..7d8e3d2845 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -314,13 +314,13 @@ export class ConnectionManager } private async attemptDial(peerId: PeerId): Promise { + if (!(await this.shouldDialPeer(peerId))) return; + if (this.currentActiveDialCount >= this.options.maxParallelDials) { this.pendingPeerDialQueue.push(peerId); return; } - if (!(await this.shouldDialPeer(peerId))) return; - this.dialPeer(peerId).catch((err) => { log(`Error dialing peer ${peerId.toString()} : ${err}`); }); @@ -331,34 +331,7 @@ export class ConnectionManager void (async () => { const { id: peerId } = evt.detail; - if (!(await this.isPeerTopicConfigured(peerId))) { - const shardInfo = await this.getPeerShardInfo( - peerId, - this.libp2p.peerStore - ); - log( - `Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${ - this.configuredPubSubTopics - }). - Not dialing.` - ); - return; - } - - const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes( - Tags.BOOTSTRAP - ); - - this.dispatchEvent( - new CustomEvent( - isBootstrap - ? EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP - : EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE, - { - detail: peerId - } - ) - ); + await this.dispatchDiscoveryEvent(peerId); try { await this.attemptDial(peerId); @@ -420,16 +393,55 @@ export class ConnectionManager } }; + /** + * Checks if the peer should be dialed based on the following conditions: + * 1. If the peer is already connected, don't dial + * 2. If the peer is not part of any of the configured pubsub topics, don't dial + * 3. If the peer is not dialable based on bootstrap status, don't dial + * @returns true if the peer should be dialed, false otherwise + */ + private async shouldDialPeer(peerId: PeerId): Promise { + // if we're already connected to the peer, don't dial + const isConnected = this.libp2p.getConnections(peerId).length > 0; + if (isConnected) { + log(`Already connected to peer ${peerId.toString()}. Not dialing.`); + return false; + } + + // if the peer is not part of any of the configured pubsub topics, don't dial + if (!(await this.isPeerTopicConfigured(peerId))) { + const shardInfo = await this.getPeerShardInfo( + peerId, + this.libp2p.peerStore + ); + log( + `Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${ + this.configuredPubSubTopics + }). + Not dialing.` + ); + return false; + } + + // if the peer is not dialable based on bootstrap status, don't dial + if (!(await this.isPeerDialableBasedOnBootstrapStatus(peerId))) { + log( + `Peer ${peerId.toString()} is not dialable based on bootstrap status. Not dialing.` + ); + return false; + } + + return true; + } + /** * Checks if the peer is dialable based on the following conditions: * 1. If the peer is a bootstrap peer, it is only dialable if the number of current bootstrap connections is less than the max allowed. * 2. If the peer is not a bootstrap peer */ - private async shouldDialPeer(peerId: PeerId): Promise { - const isConnected = this.libp2p.getConnections(peerId).length > 0; - - if (isConnected) return false; - + private async isPeerDialableBasedOnBootstrapStatus( + peerId: PeerId + ): Promise { const tagNames = await this.getTagNamesForPeer(peerId); const isBootstrap = tagNames.some((tagName) => tagName === Tags.BOOTSTRAP); @@ -449,6 +461,23 @@ export class ConnectionManager return false; } + private async dispatchDiscoveryEvent(peerId: PeerId): Promise { + const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes( + Tags.BOOTSTRAP + ); + + this.dispatchEvent( + new CustomEvent( + isBootstrap + ? EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP + : EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE, + { + detail: peerId + } + ) + ); + } + /** * Fetches the tag names for a given peer */ diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index 36152fe3c7..be4c7a5210 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -20,7 +20,7 @@ describe("Static Sharding: Peer Management", function () { let nwaku2: NimGoNode; let nwaku3: NimGoNode; - let attemptDialSpy: SinonSpy; + let dialPeerSpy: SinonSpy; beforeEach(async function () { this.timeout(15000); @@ -32,7 +32,7 @@ describe("Static Sharding: Peer Management", function () { afterEach(async function () { this.timeout(15000); await tearDownNodes([nwaku1, nwaku2, nwaku3], waku); - attemptDialSpy && attemptDialSpy.restore(); + dialPeerSpy && dialPeerSpy.restore(); }); it("all px service nodes subscribed to the shard topic should be dialed", async function () { @@ -80,10 +80,7 @@ describe("Static Sharding: Peer Management", function () { await waku.start(); - attemptDialSpy = Sinon.spy( - (waku as any).connectionManager, - "attemptDial" - ); + dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); const pxPeersDiscovered = new Set(); @@ -105,7 +102,7 @@ describe("Static Sharding: Peer Management", function () { await delay(1000); - expect(attemptDialSpy.callCount).to.equal(3); + expect(dialPeerSpy.callCount).to.equal(3); }); it("px service nodes not subscribed to the shard should not be dialed", async function () { @@ -151,10 +148,7 @@ describe("Static Sharding: Peer Management", function () { } }); - attemptDialSpy = Sinon.spy( - (waku as any).connectionManager, - "attemptDial" - ); + dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); await waku.start(); @@ -177,8 +171,7 @@ describe("Static Sharding: Peer Management", function () { }); await delay(1000); - - expect(attemptDialSpy.callCount).to.equal(2); + expect(dialPeerSpy.callCount).to.equal(2); }); }); });