feat: enable event emission for peer discovery/connection in ConnectionManager (#1438)

* disable libp2p autodial

* improve logs for peer-exchange

* add a function to fetch discovered and connected peers by discovery

* connection-manager: introduce event emissions by discovery

* write a spec test for events

* minor code improvement for peer-exchange

* rm: comment

* rename peer event result interface

* switch to using libp2p EventEmitter

* rename variables for readability

* reset peer-exchange spec file

* address review

* test: minor refactor

* fix: failing test

* increase peer IDs to test against for attemptDial

* improve structuring
This commit is contained in:
Danish Arora 2023-07-26 22:51:55 +05:30 committed by GitHub
parent 793abe7d22
commit 6ce898d771
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 393 additions and 126 deletions

View File

@ -1,6 +1,14 @@
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerInfo } from "@libp2p/interface-peer-info"; import type { PeerInfo } from "@libp2p/interface-peer-info";
import type { ConnectionManagerOptions, IRelay } from "@waku/interfaces"; import type { Peer } from "@libp2p/interface-peer-store";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import {
ConnectionManagerOptions,
EPeersByDiscoveryEvents,
IPeersByDiscoveryEvents,
IRelay,
PeersByDiscoveryResult,
} from "@waku/interfaces";
import { Libp2p, Tags } from "@waku/interfaces"; import { Libp2p, Tags } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
@ -12,7 +20,7 @@ export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1;
export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3; export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3;
export const DEFAULT_MAX_PARALLEL_DIALS = 3; export const DEFAULT_MAX_PARALLEL_DIALS = 3;
export class ConnectionManager { export class ConnectionManager extends EventEmitter<IPeersByDiscoveryEvents> {
private static instances = new Map<string, ConnectionManager>(); private static instances = new Map<string, ConnectionManager>();
private keepAliveManager: KeepAliveManager; private keepAliveManager: KeepAliveManager;
private options: ConnectionManagerOptions; private options: ConnectionManagerOptions;
@ -44,12 +52,57 @@ export class ConnectionManager {
return instance; return instance;
} }
public async getPeersByDiscovery(): Promise<PeersByDiscoveryResult> {
const peersDiscovered = await this.libp2p.peerStore.all();
const peersConnected = this.libp2p
.getConnections()
.map((conn) => conn.remotePeer);
const peersDiscoveredByBootstrap: Peer[] = [];
const peersDiscoveredByPeerExchange: Peer[] = [];
const peersConnectedByBootstrap: Peer[] = [];
const peersConnectedByPeerExchange: Peer[] = [];
for (const peer of peersDiscovered) {
const tags = await this.getTagNamesForPeer(peer.id);
if (tags.includes(Tags.BOOTSTRAP)) {
peersDiscoveredByBootstrap.push(peer);
} else if (tags.includes(Tags.PEER_EXCHANGE)) {
peersDiscoveredByPeerExchange.push(peer);
}
}
for (const peerId of peersConnected) {
const peer = await this.libp2p.peerStore.get(peerId);
const tags = await this.getTagNamesForPeer(peerId);
if (tags.includes(Tags.BOOTSTRAP)) {
peersConnectedByBootstrap.push(peer);
} else if (tags.includes(Tags.PEER_EXCHANGE)) {
peersConnectedByPeerExchange.push(peer);
}
}
return {
DISCOVERED: {
[Tags.BOOTSTRAP]: peersDiscoveredByBootstrap,
[Tags.PEER_EXCHANGE]: peersDiscoveredByPeerExchange,
},
CONNECTED: {
[Tags.BOOTSTRAP]: peersConnectedByBootstrap,
[Tags.PEER_EXCHANGE]: peersConnectedByPeerExchange,
},
};
}
private constructor( private constructor(
libp2p: Libp2p, libp2p: Libp2p,
keepAliveOptions: KeepAliveOptions, keepAliveOptions: KeepAliveOptions,
relay?: IRelay, relay?: IRelay,
options?: Partial<ConnectionManagerOptions> options?: Partial<ConnectionManagerOptions>
) { ) {
super();
this.libp2p = libp2p; this.libp2p = libp2p;
this.options = { this.options = {
maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER, maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER,
@ -240,6 +293,30 @@ export class ConnectionManager {
void (async () => { void (async () => {
const { id: peerId } = evt.detail; const { id: peerId } = evt.detail;
const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes(
Tags.BOOTSTRAP
);
if (isBootstrap) {
this.dispatchEvent(
new CustomEvent<PeerId>(
EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP,
{
detail: peerId,
}
)
);
} else {
this.dispatchEvent(
new CustomEvent<PeerId>(
EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE,
{
detail: peerId,
}
)
);
}
try { try {
await this.attemptDial(peerId); await this.attemptDial(peerId);
} catch (error) { } catch (error) {
@ -267,7 +344,25 @@ export class ConnectionManager {
bootstrapConnections.length > this.options.maxBootstrapPeersAllowed bootstrapConnections.length > this.options.maxBootstrapPeersAllowed
) { ) {
await this.dropConnection(peerId); await this.dropConnection(peerId);
} else {
this.dispatchEvent(
new CustomEvent<PeerId>(
EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP,
{
detail: peerId,
}
)
);
} }
} else {
this.dispatchEvent(
new CustomEvent<PeerId>(
EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE,
{
detail: peerId,
}
)
);
} }
})(); })();
}, },

View File

@ -1,3 +1,6 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
export enum Tags { export enum Tags {
BOOTSTRAP = "bootstrap", BOOTSTRAP = "bootstrap",
PEER_EXCHANGE = "peer-exchange", PEER_EXCHANGE = "peer-exchange",
@ -19,3 +22,28 @@ export interface ConnectionManagerOptions {
*/ */
maxParallelDials: number; maxParallelDials: number;
} }
export enum EPeersByDiscoveryEvents {
PEER_DISCOVERY_BOOTSTRAP = "peer:discovery:bootstrap",
PEER_DISCOVERY_PEER_EXCHANGE = "peer:discovery:peer-exchange",
PEER_CONNECT_BOOTSTRAP = "peer:connected:bootstrap",
PEER_CONNECT_PEER_EXCHANGE = "peer:connected:peer-exchange",
}
export interface IPeersByDiscoveryEvents {
[EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP]: CustomEvent<PeerId>;
[EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE]: CustomEvent<PeerId>;
[EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP]: CustomEvent<PeerId>;
[EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE]: CustomEvent<PeerId>;
}
export interface PeersByDiscoveryResult {
DISCOVERED: {
[Tags.BOOTSTRAP]: Peer[];
[Tags.PEER_EXCHANGE]: Peer[];
};
CONNECTED: {
[Tags.BOOTSTRAP]: Peer[];
[Tags.PEER_EXCHANGE]: Peer[];
};
}

View File

@ -7,7 +7,7 @@ import { peerDiscovery as symbol } from "@libp2p/interface-peer-discovery";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerInfo } from "@libp2p/interface-peer-info"; import type { PeerInfo } from "@libp2p/interface-peer-info";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events"; import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import type { Libp2pComponents } from "@waku/interfaces"; import { Libp2pComponents, Tags } from "@waku/interfaces";
import debug from "debug"; import debug from "debug";
import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js"; import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js";
@ -45,7 +45,7 @@ export interface Options {
maxRetries?: number; maxRetries?: number;
} }
export const DEFAULT_PEER_EXCHANGE_TAG_NAME = "peer-exchange"; export const DEFAULT_PEER_EXCHANGE_TAG_NAME = Tags.PEER_EXCHANGE;
const DEFAULT_PEER_EXCHANGE_TAG_VALUE = 50; const DEFAULT_PEER_EXCHANGE_TAG_VALUE = 50;
const DEFAULT_PEER_EXCHANGE_TAG_TTL = 120000; const DEFAULT_PEER_EXCHANGE_TAG_TTL = 120000;
@ -134,6 +134,12 @@ export class PeerExchangeDiscovery
maxRetries = DEFAULT_MAX_RETRIES, maxRetries = DEFAULT_MAX_RETRIES,
} = this.options; } = this.options;
log(
`Querying peer: ${peerIdStr} (attempt ${
this.queryAttempts.get(peerIdStr) ?? 1
})`
);
await this.query(peerId); await this.query(peerId);
const currentAttempt = this.queryAttempts.get(peerIdStr) ?? 1; const currentAttempt = this.queryAttempts.get(peerIdStr) ?? 1;
@ -189,6 +195,8 @@ export class PeerExchangeDiscovery
}, },
}); });
log(`Discovered peer: ${peerId.toString()}`);
this.dispatchEvent( this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", { new CustomEvent<PeerInfo>("peer", {
detail: { detail: {

View File

@ -1,6 +1,7 @@
import { CustomEvent } from "@libp2p/interfaces/events"; import { CustomEvent } from "@libp2p/interfaces/events";
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { ConnectionManager, KeepAliveOptions } from "@waku/core"; import { ConnectionManager, KeepAliveOptions } from "@waku/core";
import { LightNode, Tags } from "@waku/interfaces"; import { EPeersByDiscoveryEvents, LightNode, Tags } from "@waku/interfaces";
import { createLightNode } from "@waku/sdk"; import { createLightNode } from "@waku/sdk";
import { expect } from "chai"; import { expect } from "chai";
import sinon, { SinonSpy, SinonStub } from "sinon"; import sinon, { SinonSpy, SinonStub } from "sinon";
@ -18,9 +19,6 @@ describe("ConnectionManager", function () {
let connectionManager: ConnectionManager | undefined; let connectionManager: ConnectionManager | undefined;
let waku: LightNode; let waku: LightNode;
let peerId: string; let peerId: string;
let getConnectionsStub: SinonStub;
let getTagNamesForPeerStub: SinonStub;
let dialPeerStub: SinonStub;
beforeEach(async function () { beforeEach(async function () {
waku = await createLightNode(); waku = await createLightNode();
@ -34,162 +32,300 @@ describe("ConnectionManager", function () {
afterEach(async () => { afterEach(async () => {
await waku.stop(); await waku.stop();
sinon.restore();
}); });
describe("attemptDial method", function () { describe("Events", () => {
let attemptDialSpy: SinonSpy; describe("peer:discovery", () => {
it("should emit `peer:discovery:bootstrap` event when a peer is discovered", async function () {
this.timeout(TEST_TIMEOUT);
beforeEach(function () { const peerIdBootstrap = await createSecp256k1PeerId();
attemptDialSpy = sinon.spy(connectionManager as any, "attemptDial");
});
afterEach(function () { await waku.libp2p.peerStore.save(peerIdBootstrap, {
attemptDialSpy.restore(); tags: {
}); [Tags.BOOTSTRAP]: {
value: 50,
ttl: 1200000,
},
},
});
it("should be called on all `peer:discovery` events", async function () { const peerDiscoveryBootstrap = new Promise<boolean>((resolve) => {
this.timeout(TEST_TIMEOUT); connectionManager!.addEventListener(
EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdBootstrap.toString());
}
);
});
waku.libp2p.dispatchEvent(new CustomEvent("peer", { detail: peerId }));
expect(await peerDiscoveryBootstrap).to.eq(true);
});
it("should emit `peer:discovery:peer-exchange` event when a peer is discovered", async function () {
const peerIdPx = await createSecp256k1PeerId();
await waku.libp2p.peerStore.save(peerIdPx, {
tags: {
[Tags.PEER_EXCHANGE]: {
value: 50,
ttl: 1200000,
},
},
});
const peerDiscoveryPeerExchange = new Promise<boolean>((resolve) => {
connectionManager!.addEventListener(
EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdPx.toString());
}
);
});
const totalPeerIds = 1;
for (let i = 1; i <= totalPeerIds; i++) {
waku.libp2p.dispatchEvent( waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: `peer-id-${i}` }) new CustomEvent("peer", { detail: peerIdPx })
); );
}
expect(attemptDialSpy.callCount).to.equal( expect(await peerDiscoveryPeerExchange).to.eq(true);
totalPeerIds, });
"attemptDial should be called once for each peer:discovery event" });
);
describe("peer:connect", () => {
it("should emit `peer:connected:bootstrap` event when a peer is connected", async function () {
this.timeout(TEST_TIMEOUT);
const peerIdBootstrap = await createSecp256k1PeerId();
await waku.libp2p.peerStore.save(peerIdBootstrap, {
tags: {
[Tags.BOOTSTRAP]: {
value: 50,
ttl: 1200000,
},
},
});
const peerConnectedBootstrap = new Promise<boolean>((resolve) => {
connectionManager!.addEventListener(
EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdBootstrap.toString());
}
);
});
waku.libp2p.dispatchEvent(
new CustomEvent("peer:connect", { detail: peerIdBootstrap })
);
expect(await peerConnectedBootstrap).to.eq(true);
});
it("should emit `peer:connected:peer-exchange` event when a peer is connected", async function () {
const peerIdPx = await createSecp256k1PeerId();
await waku.libp2p.peerStore.save(peerIdPx, {
tags: {
[Tags.PEER_EXCHANGE]: {
value: 50,
ttl: 1200000,
},
},
});
const peerConnectedPeerExchange = new Promise<boolean>((resolve) => {
connectionManager!.addEventListener(
EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE,
({ detail: receivedPeerId }) => {
resolve(receivedPeerId.toString() === peerIdPx.toString());
}
);
});
waku.libp2p.dispatchEvent(
new CustomEvent("peer:connect", { detail: peerIdPx })
);
expect(await peerConnectedPeerExchange).to.eq(true);
});
}); });
}); });
describe("dialPeer method", function () { describe("Dials", () => {
beforeEach(function () { let dialPeerStub: SinonStub;
getConnectionsStub = sinon.stub( let getConnectionsStub: SinonStub;
(connectionManager as any).libp2p, let getTagNamesForPeerStub: SinonStub;
"getConnections"
); afterEach(() => {
getTagNamesForPeerStub = sinon.stub( sinon.restore();
connectionManager as any,
"getTagNamesForPeer"
);
dialPeerStub = sinon.stub(connectionManager as any, "dialPeer");
}); });
afterEach(function () { describe("attemptDial method", function () {
dialPeerStub.restore(); let attemptDialSpy: SinonSpy;
getTagNamesForPeerStub.restore();
getConnectionsStub.restore();
});
describe("For bootstrap peers", function () { beforeEach(function () {
it("should be called for bootstrap peers", async function () { attemptDialSpy = sinon.spy(connectionManager as any, "attemptDial");
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer is a bootstrap peer
getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);
// emit a peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: "bootstrap-peer" })
);
// wait for the async function calls within attemptDial to finish
await delay(DELAY_MS);
// check that dialPeer was called once
expect(dialPeerStub.callCount).to.equal(
1,
"dialPeer should be called for bootstrap peers"
);
}); });
it("should not be called more than DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED times for bootstrap peers", async function () { afterEach(function () {
attemptDialSpy.restore();
});
it("should be called on all `peer:discovery` events", async function () {
this.timeout(TEST_TIMEOUT); this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected const totalPeerIds = 5;
getConnectionsStub.returns([]); for (let i = 1; i <= totalPeerIds; i++) {
// simulate that the peer is a bootstrap peer
getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);
// emit first peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: "bootstrap-peer" })
);
// simulate that the peer is connected
getConnectionsStub.returns([{ tags: [{ name: Tags.BOOTSTRAP }] }]);
// emit multiple peer:discovery events
const totalBootstrapPeers = 5;
for (let i = 1; i <= totalBootstrapPeers; i++) {
await delay(500);
waku.libp2p.dispatchEvent( waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { new CustomEvent("peer:discovery", { detail: `peer-id-${i}` })
detail: `bootstrap-peer-id-${i}`,
})
); );
} }
// check that dialPeer was called only once // add delay to allow async function calls within attemptDial to finish
expect(dialPeerStub.callCount).to.equal( await delay(100);
1,
"dialPeer should not be called more than once for bootstrap peers" expect(attemptDialSpy.callCount).to.equal(
totalPeerIds,
"attemptDial should be called once for each peer:discovery event"
); );
}); });
}); });
describe("For peer-exchange peers", function () { describe("dialPeer method", function () {
it("should be called for peers with PEER_EXCHANGE tags", async function () { beforeEach(function () {
this.timeout(TEST_TIMEOUT); getConnectionsStub = sinon.stub(
(connectionManager as any).libp2p,
// simulate that the peer is not connected "getConnections"
getConnectionsStub.returns([]);
// simulate that the peer has a PEER_EXCHANGE tag
getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]);
// emit a peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: "px-peer" })
); );
getTagNamesForPeerStub = sinon.stub(
// wait for the async function calls within attemptDial to finish connectionManager as any,
await delay(DELAY_MS); "getTagNamesForPeer"
// check that dialPeer was called once
expect(dialPeerStub.callCount).to.equal(
1,
"dialPeer should be called for peers with PEER_EXCHANGE tags"
); );
dialPeerStub = sinon.stub(connectionManager as any, "dialPeer");
}); });
it("should be called for every peer with PEER_EXCHANGE tags", async function () { afterEach(function () {
this.timeout(TEST_TIMEOUT); dialPeerStub.restore();
getTagNamesForPeerStub.restore();
getConnectionsStub.restore();
});
// simulate that the peer is not connected describe("For bootstrap peers", function () {
getConnectionsStub.returns([]); it("should be called for bootstrap peers", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer has a PEER_EXCHANGE tag // simulate that the peer is not connected
getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]); getConnectionsStub.returns([]);
// emit multiple peer:discovery events // simulate that the peer is a bootstrap peer
const totalPxPeers = 5; getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);
for (let i = 0; i < totalPxPeers; i++) {
const bootstrapPeer = await createSecp256k1PeerId();
// emit a peer:discovery event
waku.libp2p.dispatchEvent( waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: `px-peer-id-${i}` }) new CustomEvent("peer:discovery", { detail: bootstrapPeer })
);
// wait for the async function calls within attemptDial to finish
await delay(DELAY_MS);
// check that dialPeer was called once
expect(dialPeerStub.callCount).to.equal(
1,
"dialPeer should be called for bootstrap peers"
);
});
it("should not be called more than DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED times for bootstrap peers", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer is a bootstrap peer
getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]);
// emit first peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: "bootstrap-peer" })
); );
await delay(500); await delay(500);
}
// check that dialPeer was called for each peer with PEER_EXCHANGE tags // simulate that the peer is connected
expect(dialPeerStub.callCount).to.equal(totalPxPeers); getConnectionsStub.returns([{ tags: [{ name: Tags.BOOTSTRAP }] }]);
// emit multiple peer:discovery events
const totalBootstrapPeers = 5;
for (let i = 1; i <= totalBootstrapPeers; i++) {
await delay(500);
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", {
detail: await createSecp256k1PeerId(),
})
);
}
// check that dialPeer was called only once
expect(dialPeerStub.callCount).to.equal(
1,
"dialPeer should not be called more than once for bootstrap peers"
);
});
});
describe("For peer-exchange peers", function () {
it("should be called for peers with PEER_EXCHANGE tags", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer has a PEER_EXCHANGE tag
getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]);
const pxPeer = await createSecp256k1PeerId();
// emit a peer:discovery event
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", { detail: pxPeer })
);
// wait for the async function calls within attemptDial to finish
await delay(DELAY_MS);
// check that dialPeer was called once
expect(dialPeerStub.callCount).to.equal(
1,
"dialPeer should be called for peers with PEER_EXCHANGE tags"
);
});
it("should be called for every peer with PEER_EXCHANGE tags", async function () {
this.timeout(TEST_TIMEOUT);
// simulate that the peer is not connected
getConnectionsStub.returns([]);
// simulate that the peer has a PEER_EXCHANGE tag
getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]);
// emit multiple peer:discovery events
const totalPxPeers = 5;
for (let i = 0; i < totalPxPeers; i++) {
waku.libp2p.dispatchEvent(
new CustomEvent("peer:discovery", {
detail: await createSecp256k1PeerId(),
})
);
await delay(500);
}
// check that dialPeer was called for each peer with PEER_EXCHANGE tags
expect(dialPeerStub.callCount).to.equal(totalPxPeers);
});
}); });
}); });
}); });