diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index e01f97f9a6..9bf978ab7b 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -1,6 +1,14 @@ import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerInfo } from "@libp2p/interface-peer-info"; -import type { ConnectionManagerOptions, IRelay } from "@waku/interfaces"; +import type { Peer } from "@libp2p/interface-peer-store"; +import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events"; +import { + ConnectionManagerOptions, + EPeersByDiscoveryEvents, + IPeersByDiscoveryEvents, + IRelay, + PeersByDiscoveryResult, +} from "@waku/interfaces"; import { Libp2p, Tags } from "@waku/interfaces"; import debug from "debug"; @@ -12,7 +20,7 @@ export const DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED = 1; export const DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER = 3; export const DEFAULT_MAX_PARALLEL_DIALS = 3; -export class ConnectionManager { +export class ConnectionManager extends EventEmitter { private static instances = new Map(); private keepAliveManager: KeepAliveManager; private options: ConnectionManagerOptions; @@ -44,12 +52,57 @@ export class ConnectionManager { return instance; } + public async getPeersByDiscovery(): Promise { + const peersDiscovered = await this.libp2p.peerStore.all(); + const peersConnected = this.libp2p + .getConnections() + .map((conn) => conn.remotePeer); + + const peersDiscoveredByBootstrap: Peer[] = []; + const peersDiscoveredByPeerExchange: Peer[] = []; + const peersConnectedByBootstrap: Peer[] = []; + const peersConnectedByPeerExchange: Peer[] = []; + + for (const peer of peersDiscovered) { + const tags = await this.getTagNamesForPeer(peer.id); + + if (tags.includes(Tags.BOOTSTRAP)) { + peersDiscoveredByBootstrap.push(peer); + } else if (tags.includes(Tags.PEER_EXCHANGE)) { + peersDiscoveredByPeerExchange.push(peer); + } + } + + for (const peerId of peersConnected) { + const peer = await this.libp2p.peerStore.get(peerId); + const tags = await this.getTagNamesForPeer(peerId); + + if (tags.includes(Tags.BOOTSTRAP)) { + peersConnectedByBootstrap.push(peer); + } else if (tags.includes(Tags.PEER_EXCHANGE)) { + peersConnectedByPeerExchange.push(peer); + } + } + + return { + DISCOVERED: { + [Tags.BOOTSTRAP]: peersDiscoveredByBootstrap, + [Tags.PEER_EXCHANGE]: peersDiscoveredByPeerExchange, + }, + CONNECTED: { + [Tags.BOOTSTRAP]: peersConnectedByBootstrap, + [Tags.PEER_EXCHANGE]: peersConnectedByPeerExchange, + }, + }; + } + private constructor( libp2p: Libp2p, keepAliveOptions: KeepAliveOptions, relay?: IRelay, options?: Partial ) { + super(); this.libp2p = libp2p; this.options = { maxDialAttemptsForPeer: DEFAULT_MAX_DIAL_ATTEMPTS_FOR_PEER, @@ -240,6 +293,30 @@ export class ConnectionManager { void (async () => { const { id: peerId } = evt.detail; + const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes( + Tags.BOOTSTRAP + ); + + if (isBootstrap) { + this.dispatchEvent( + new CustomEvent( + EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP, + { + detail: peerId, + } + ) + ); + } else { + this.dispatchEvent( + new CustomEvent( + EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE, + { + detail: peerId, + } + ) + ); + } + try { await this.attemptDial(peerId); } catch (error) { @@ -267,7 +344,25 @@ export class ConnectionManager { bootstrapConnections.length > this.options.maxBootstrapPeersAllowed ) { await this.dropConnection(peerId); + } else { + this.dispatchEvent( + new CustomEvent( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + { + detail: peerId, + } + ) + ); } + } else { + this.dispatchEvent( + new CustomEvent( + EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE, + { + detail: peerId, + } + ) + ); } })(); }, diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index 7424621cd4..8814aefba6 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -1,3 +1,6 @@ +import type { PeerId } from "@libp2p/interface-peer-id"; +import type { Peer } from "@libp2p/interface-peer-store"; + export enum Tags { BOOTSTRAP = "bootstrap", PEER_EXCHANGE = "peer-exchange", @@ -19,3 +22,28 @@ export interface ConnectionManagerOptions { */ maxParallelDials: number; } + +export enum EPeersByDiscoveryEvents { + PEER_DISCOVERY_BOOTSTRAP = "peer:discovery:bootstrap", + PEER_DISCOVERY_PEER_EXCHANGE = "peer:discovery:peer-exchange", + PEER_CONNECT_BOOTSTRAP = "peer:connected:bootstrap", + PEER_CONNECT_PEER_EXCHANGE = "peer:connected:peer-exchange", +} + +export interface IPeersByDiscoveryEvents { + [EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP]: CustomEvent; + [EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE]: CustomEvent; + [EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP]: CustomEvent; + [EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE]: CustomEvent; +} + +export interface PeersByDiscoveryResult { + DISCOVERED: { + [Tags.BOOTSTRAP]: Peer[]; + [Tags.PEER_EXCHANGE]: Peer[]; + }; + CONNECTED: { + [Tags.BOOTSTRAP]: Peer[]; + [Tags.PEER_EXCHANGE]: Peer[]; + }; +} diff --git a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts index 9dafba5ae7..1c14f6360f 100644 --- a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts +++ b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts @@ -7,7 +7,7 @@ import { peerDiscovery as symbol } from "@libp2p/interface-peer-discovery"; import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerInfo } from "@libp2p/interface-peer-info"; import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events"; -import type { Libp2pComponents } from "@waku/interfaces"; +import { Libp2pComponents, Tags } from "@waku/interfaces"; import debug from "debug"; import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js"; @@ -45,7 +45,7 @@ export interface Options { maxRetries?: number; } -export const DEFAULT_PEER_EXCHANGE_TAG_NAME = "peer-exchange"; +export const DEFAULT_PEER_EXCHANGE_TAG_NAME = Tags.PEER_EXCHANGE; const DEFAULT_PEER_EXCHANGE_TAG_VALUE = 50; const DEFAULT_PEER_EXCHANGE_TAG_TTL = 120000; @@ -134,6 +134,12 @@ export class PeerExchangeDiscovery maxRetries = DEFAULT_MAX_RETRIES, } = this.options; + log( + `Querying peer: ${peerIdStr} (attempt ${ + this.queryAttempts.get(peerIdStr) ?? 1 + })` + ); + await this.query(peerId); const currentAttempt = this.queryAttempts.get(peerIdStr) ?? 1; @@ -189,6 +195,8 @@ export class PeerExchangeDiscovery }, }); + log(`Discovered peer: ${peerId.toString()}`); + this.dispatchEvent( new CustomEvent("peer", { detail: { diff --git a/packages/tests/tests/connection_manager.spec.ts b/packages/tests/tests/connection_manager.spec.ts index b29c22efbb..d00a61aa60 100644 --- a/packages/tests/tests/connection_manager.spec.ts +++ b/packages/tests/tests/connection_manager.spec.ts @@ -1,6 +1,7 @@ import { CustomEvent } from "@libp2p/interfaces/events"; +import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { ConnectionManager, KeepAliveOptions } from "@waku/core"; -import { LightNode, Tags } from "@waku/interfaces"; +import { EPeersByDiscoveryEvents, LightNode, Tags } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { expect } from "chai"; import sinon, { SinonSpy, SinonStub } from "sinon"; @@ -18,9 +19,6 @@ describe("ConnectionManager", function () { let connectionManager: ConnectionManager | undefined; let waku: LightNode; let peerId: string; - let getConnectionsStub: SinonStub; - let getTagNamesForPeerStub: SinonStub; - let dialPeerStub: SinonStub; beforeEach(async function () { waku = await createLightNode(); @@ -34,162 +32,300 @@ describe("ConnectionManager", function () { afterEach(async () => { await waku.stop(); - sinon.restore(); }); - describe("attemptDial method", function () { - let attemptDialSpy: SinonSpy; + describe("Events", () => { + describe("peer:discovery", () => { + it("should emit `peer:discovery:bootstrap` event when a peer is discovered", async function () { + this.timeout(TEST_TIMEOUT); - beforeEach(function () { - attemptDialSpy = sinon.spy(connectionManager as any, "attemptDial"); - }); + const peerIdBootstrap = await createSecp256k1PeerId(); - afterEach(function () { - attemptDialSpy.restore(); - }); + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: 50, + ttl: 1200000, + }, + }, + }); - it("should be called on all `peer:discovery` events", async function () { - this.timeout(TEST_TIMEOUT); + const peerDiscoveryBootstrap = new Promise((resolve) => { + connectionManager!.addEventListener( + EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdBootstrap.toString()); + } + ); + }); + + waku.libp2p.dispatchEvent(new CustomEvent("peer", { detail: peerId })); + + expect(await peerDiscoveryBootstrap).to.eq(true); + }); + + it("should emit `peer:discovery:peer-exchange` event when a peer is discovered", async function () { + const peerIdPx = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdPx, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000, + }, + }, + }); + + const peerDiscoveryPeerExchange = new Promise((resolve) => { + connectionManager!.addEventListener( + EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdPx.toString()); + } + ); + }); - const totalPeerIds = 1; - for (let i = 1; i <= totalPeerIds; i++) { waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { detail: `peer-id-${i}` }) + new CustomEvent("peer", { detail: peerIdPx }) ); - } - expect(attemptDialSpy.callCount).to.equal( - totalPeerIds, - "attemptDial should be called once for each peer:discovery event" - ); + expect(await peerDiscoveryPeerExchange).to.eq(true); + }); + }); + + describe("peer:connect", () => { + it("should emit `peer:connected:bootstrap` event when a peer is connected", async function () { + this.timeout(TEST_TIMEOUT); + + const peerIdBootstrap = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: 50, + ttl: 1200000, + }, + }, + }); + + const peerConnectedBootstrap = new Promise((resolve) => { + connectionManager!.addEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdBootstrap.toString()); + } + ); + }); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + ); + + expect(await peerConnectedBootstrap).to.eq(true); + }); + it("should emit `peer:connected:peer-exchange` event when a peer is connected", async function () { + const peerIdPx = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdPx, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000, + }, + }, + }); + + const peerConnectedPeerExchange = new Promise((resolve) => { + connectionManager!.addEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdPx.toString()); + } + ); + }); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdPx }) + ); + + expect(await peerConnectedPeerExchange).to.eq(true); + }); }); }); - describe("dialPeer method", function () { - beforeEach(function () { - getConnectionsStub = sinon.stub( - (connectionManager as any).libp2p, - "getConnections" - ); - getTagNamesForPeerStub = sinon.stub( - connectionManager as any, - "getTagNamesForPeer" - ); - dialPeerStub = sinon.stub(connectionManager as any, "dialPeer"); + describe("Dials", () => { + let dialPeerStub: SinonStub; + let getConnectionsStub: SinonStub; + let getTagNamesForPeerStub: SinonStub; + + afterEach(() => { + sinon.restore(); }); - afterEach(function () { - dialPeerStub.restore(); - getTagNamesForPeerStub.restore(); - getConnectionsStub.restore(); - }); + describe("attemptDial method", function () { + let attemptDialSpy: SinonSpy; - describe("For bootstrap peers", function () { - it("should be called for bootstrap peers", async function () { - this.timeout(TEST_TIMEOUT); - - // simulate that the peer is not connected - getConnectionsStub.returns([]); - - // simulate that the peer is a bootstrap peer - getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]); - - // emit a peer:discovery event - waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { detail: "bootstrap-peer" }) - ); - - // wait for the async function calls within attemptDial to finish - await delay(DELAY_MS); - - // check that dialPeer was called once - expect(dialPeerStub.callCount).to.equal( - 1, - "dialPeer should be called for bootstrap peers" - ); + beforeEach(function () { + attemptDialSpy = sinon.spy(connectionManager as any, "attemptDial"); }); - it("should not be called more than DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED times for bootstrap peers", async function () { + afterEach(function () { + attemptDialSpy.restore(); + }); + + it("should be called on all `peer:discovery` events", async function () { this.timeout(TEST_TIMEOUT); - // simulate that the peer is not connected - getConnectionsStub.returns([]); - - // simulate that the peer is a bootstrap peer - getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]); - - // emit first peer:discovery event - waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { detail: "bootstrap-peer" }) - ); - - // simulate that the peer is connected - getConnectionsStub.returns([{ tags: [{ name: Tags.BOOTSTRAP }] }]); - - // emit multiple peer:discovery events - const totalBootstrapPeers = 5; - for (let i = 1; i <= totalBootstrapPeers; i++) { - await delay(500); + const totalPeerIds = 5; + for (let i = 1; i <= totalPeerIds; i++) { waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { - detail: `bootstrap-peer-id-${i}`, - }) + new CustomEvent("peer:discovery", { detail: `peer-id-${i}` }) ); } - // check that dialPeer was called only once - expect(dialPeerStub.callCount).to.equal( - 1, - "dialPeer should not be called more than once for bootstrap peers" + // add delay to allow async function calls within attemptDial to finish + await delay(100); + + expect(attemptDialSpy.callCount).to.equal( + totalPeerIds, + "attemptDial should be called once for each peer:discovery event" ); }); }); - describe("For peer-exchange peers", function () { - it("should be called for peers with PEER_EXCHANGE tags", async function () { - this.timeout(TEST_TIMEOUT); - - // simulate that the peer is not connected - getConnectionsStub.returns([]); - - // simulate that the peer has a PEER_EXCHANGE tag - getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]); - - // emit a peer:discovery event - waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { detail: "px-peer" }) + describe("dialPeer method", function () { + beforeEach(function () { + getConnectionsStub = sinon.stub( + (connectionManager as any).libp2p, + "getConnections" ); - - // wait for the async function calls within attemptDial to finish - await delay(DELAY_MS); - - // check that dialPeer was called once - expect(dialPeerStub.callCount).to.equal( - 1, - "dialPeer should be called for peers with PEER_EXCHANGE tags" + getTagNamesForPeerStub = sinon.stub( + connectionManager as any, + "getTagNamesForPeer" ); + dialPeerStub = sinon.stub(connectionManager as any, "dialPeer"); }); - it("should be called for every peer with PEER_EXCHANGE tags", async function () { - this.timeout(TEST_TIMEOUT); + afterEach(function () { + dialPeerStub.restore(); + getTagNamesForPeerStub.restore(); + getConnectionsStub.restore(); + }); - // simulate that the peer is not connected - getConnectionsStub.returns([]); + describe("For bootstrap peers", function () { + it("should be called for bootstrap peers", async function () { + this.timeout(TEST_TIMEOUT); - // simulate that the peer has a PEER_EXCHANGE tag - getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]); + // simulate that the peer is not connected + getConnectionsStub.returns([]); - // emit multiple peer:discovery events - const totalPxPeers = 5; - for (let i = 0; i < totalPxPeers; i++) { + // simulate that the peer is a bootstrap peer + getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]); + + const bootstrapPeer = await createSecp256k1PeerId(); + + // emit a peer:discovery event waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { detail: `px-peer-id-${i}` }) + new CustomEvent("peer:discovery", { detail: bootstrapPeer }) + ); + + // wait for the async function calls within attemptDial to finish + await delay(DELAY_MS); + + // check that dialPeer was called once + expect(dialPeerStub.callCount).to.equal( + 1, + "dialPeer should be called for bootstrap peers" + ); + }); + + it("should not be called more than DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED times for bootstrap peers", async function () { + this.timeout(TEST_TIMEOUT); + + // simulate that the peer is not connected + getConnectionsStub.returns([]); + + // simulate that the peer is a bootstrap peer + getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]); + + // emit first peer:discovery event + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { detail: "bootstrap-peer" }) ); await delay(500); - } - // check that dialPeer was called for each peer with PEER_EXCHANGE tags - expect(dialPeerStub.callCount).to.equal(totalPxPeers); + // simulate that the peer is connected + getConnectionsStub.returns([{ tags: [{ name: Tags.BOOTSTRAP }] }]); + + // emit multiple peer:discovery events + const totalBootstrapPeers = 5; + for (let i = 1; i <= totalBootstrapPeers; i++) { + await delay(500); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { + detail: await createSecp256k1PeerId(), + }) + ); + } + + // check that dialPeer was called only once + expect(dialPeerStub.callCount).to.equal( + 1, + "dialPeer should not be called more than once for bootstrap peers" + ); + }); + }); + + describe("For peer-exchange peers", function () { + it("should be called for peers with PEER_EXCHANGE tags", async function () { + this.timeout(TEST_TIMEOUT); + + // simulate that the peer is not connected + getConnectionsStub.returns([]); + + // simulate that the peer has a PEER_EXCHANGE tag + getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]); + + const pxPeer = await createSecp256k1PeerId(); + + // emit a peer:discovery event + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { detail: pxPeer }) + ); + + // wait for the async function calls within attemptDial to finish + await delay(DELAY_MS); + + // check that dialPeer was called once + expect(dialPeerStub.callCount).to.equal( + 1, + "dialPeer should be called for peers with PEER_EXCHANGE tags" + ); + }); + + it("should be called for every peer with PEER_EXCHANGE tags", async function () { + this.timeout(TEST_TIMEOUT); + + // simulate that the peer is not connected + getConnectionsStub.returns([]); + + // simulate that the peer has a PEER_EXCHANGE tag + getTagNamesForPeerStub.resolves([Tags.PEER_EXCHANGE]); + + // emit multiple peer:discovery events + const totalPxPeers = 5; + for (let i = 0; i < totalPxPeers; i++) { + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { + detail: await createSecp256k1PeerId(), + }) + ); + await delay(500); + } + + // check that dialPeer was called for each peer with PEER_EXCHANGE tags + expect(dialPeerStub.callCount).to.equal(totalPxPeers); + }); }); }); });