chore: refactor `ConnectionManager` for readability & maintainability (#1658)

* restructure shouldDialPeer for readability & maintainability:

* fix return & add logs

* fix: check on dialPeer instead of attemptDial

* rm: console log
This commit is contained in:
Danish Arora 2023-10-13 18:08:37 +05:30 committed by GitHub
parent e6527e9ab1
commit b8d006bee7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 70 additions and 48 deletions

View File

@ -314,13 +314,13 @@ export class ConnectionManager
} }
private async attemptDial(peerId: PeerId): Promise<void> { private async attemptDial(peerId: PeerId): Promise<void> {
if (!(await this.shouldDialPeer(peerId))) return;
if (this.currentActiveDialCount >= this.options.maxParallelDials) { if (this.currentActiveDialCount >= this.options.maxParallelDials) {
this.pendingPeerDialQueue.push(peerId); this.pendingPeerDialQueue.push(peerId);
return; return;
} }
if (!(await this.shouldDialPeer(peerId))) return;
this.dialPeer(peerId).catch((err) => { this.dialPeer(peerId).catch((err) => {
log(`Error dialing peer ${peerId.toString()} : ${err}`); log(`Error dialing peer ${peerId.toString()} : ${err}`);
}); });
@ -331,34 +331,7 @@ export class ConnectionManager
void (async () => { void (async () => {
const { id: peerId } = evt.detail; const { id: peerId } = evt.detail;
if (!(await this.isPeerTopicConfigured(peerId))) { await this.dispatchDiscoveryEvent(peerId);
const shardInfo = await this.getPeerShardInfo(
peerId,
this.libp2p.peerStore
);
log(
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
this.configuredPubSubTopics
}).
Not dialing.`
);
return;
}
const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
);
this.dispatchEvent(
new CustomEvent<PeerId>(
isBootstrap
? EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP
: EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE,
{
detail: peerId
}
)
);
try { try {
await this.attemptDial(peerId); await this.attemptDial(peerId);
@ -420,16 +393,55 @@ export class ConnectionManager
} }
}; };
/**
* Checks if the peer should be dialed based on the following conditions:
* 1. If the peer is already connected, don't dial
* 2. If the peer is not part of any of the configured pubsub topics, don't dial
* 3. If the peer is not dialable based on bootstrap status, don't dial
* @returns true if the peer should be dialed, false otherwise
*/
private async shouldDialPeer(peerId: PeerId): Promise<boolean> {
// if we're already connected to the peer, don't dial
const isConnected = this.libp2p.getConnections(peerId).length > 0;
if (isConnected) {
log(`Already connected to peer ${peerId.toString()}. Not dialing.`);
return false;
}
// if the peer is not part of any of the configured pubsub topics, don't dial
if (!(await this.isPeerTopicConfigured(peerId))) {
const shardInfo = await this.getPeerShardInfo(
peerId,
this.libp2p.peerStore
);
log(
`Discovered peer ${peerId.toString()} with ShardInfo ${shardInfo} is not part of any of the configured pubsub topics (${
this.configuredPubSubTopics
}).
Not dialing.`
);
return false;
}
// if the peer is not dialable based on bootstrap status, don't dial
if (!(await this.isPeerDialableBasedOnBootstrapStatus(peerId))) {
log(
`Peer ${peerId.toString()} is not dialable based on bootstrap status. Not dialing.`
);
return false;
}
return true;
}
/** /**
* Checks if the peer is dialable based on the following conditions: * Checks if the peer is dialable based on the following conditions:
* 1. If the peer is a bootstrap peer, it is only dialable if the number of current bootstrap connections is less than the max allowed. * 1. If the peer is a bootstrap peer, it is only dialable if the number of current bootstrap connections is less than the max allowed.
* 2. If the peer is not a bootstrap peer * 2. If the peer is not a bootstrap peer
*/ */
private async shouldDialPeer(peerId: PeerId): Promise<boolean> { private async isPeerDialableBasedOnBootstrapStatus(
const isConnected = this.libp2p.getConnections(peerId).length > 0; peerId: PeerId
): Promise<boolean> {
if (isConnected) return false;
const tagNames = await this.getTagNamesForPeer(peerId); const tagNames = await this.getTagNamesForPeer(peerId);
const isBootstrap = tagNames.some((tagName) => tagName === Tags.BOOTSTRAP); const isBootstrap = tagNames.some((tagName) => tagName === Tags.BOOTSTRAP);
@ -449,6 +461,23 @@ export class ConnectionManager
return false; return false;
} }
private async dispatchDiscoveryEvent(peerId: PeerId): Promise<void> {
const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
);
this.dispatchEvent(
new CustomEvent<PeerId>(
isBootstrap
? EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP
: EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE,
{
detail: peerId
}
)
);
}
/** /**
* Fetches the tag names for a given peer * Fetches the tag names for a given peer
*/ */

View File

@ -20,7 +20,7 @@ describe("Static Sharding: Peer Management", function () {
let nwaku2: NimGoNode; let nwaku2: NimGoNode;
let nwaku3: NimGoNode; let nwaku3: NimGoNode;
let attemptDialSpy: SinonSpy; let dialPeerSpy: SinonSpy;
beforeEach(async function () { beforeEach(async function () {
this.timeout(15000); this.timeout(15000);
@ -32,7 +32,7 @@ describe("Static Sharding: Peer Management", function () {
afterEach(async function () { afterEach(async function () {
this.timeout(15000); this.timeout(15000);
await tearDownNodes([nwaku1, nwaku2, nwaku3], waku); await tearDownNodes([nwaku1, nwaku2, nwaku3], waku);
attemptDialSpy && attemptDialSpy.restore(); dialPeerSpy && dialPeerSpy.restore();
}); });
it("all px service nodes subscribed to the shard topic should be dialed", async function () { it("all px service nodes subscribed to the shard topic should be dialed", async function () {
@ -80,10 +80,7 @@ describe("Static Sharding: Peer Management", function () {
await waku.start(); await waku.start();
attemptDialSpy = Sinon.spy( dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer");
(waku as any).connectionManager,
"attemptDial"
);
const pxPeersDiscovered = new Set<PeerId>(); const pxPeersDiscovered = new Set<PeerId>();
@ -105,7 +102,7 @@ describe("Static Sharding: Peer Management", function () {
await delay(1000); await delay(1000);
expect(attemptDialSpy.callCount).to.equal(3); expect(dialPeerSpy.callCount).to.equal(3);
}); });
it("px service nodes not subscribed to the shard should not be dialed", async function () { it("px service nodes not subscribed to the shard should not be dialed", async function () {
@ -151,10 +148,7 @@ describe("Static Sharding: Peer Management", function () {
} }
}); });
attemptDialSpy = Sinon.spy( dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer");
(waku as any).connectionManager,
"attemptDial"
);
await waku.start(); await waku.start();
@ -177,8 +171,7 @@ describe("Static Sharding: Peer Management", function () {
}); });
await delay(1000); await delay(1000);
expect(dialPeerSpy.callCount).to.equal(2);
expect(attemptDialSpy.callCount).to.equal(2);
}); });
}); });
}); });