From 928e634677b9aa2722f16d14762c12d0627fcf0d Mon Sep 17 00:00:00 2001 From: Florin Barbu Date: Sat, 10 Feb 2024 11:57:01 +0200 Subject: [PATCH] test: connection manager (#1836) * connection management tests * remove stric checks in test dev tsconfig * fix after ci run * small fix * fix conflict * fixes after ci run * debug ci failure * revert debug test --- .../connection_state.spec.ts | 174 ++++++ .../tests/connection-mananger/dials.spec.ts | 215 +++++++ .../tests/connection-mananger/events.spec.ts | 254 ++++++++ .../tests/connection-mananger/methods.spec.ts | 324 ++++++++++ .../tests/tests/connection_manager.spec.ts | 587 ------------------ 5 files changed, 967 insertions(+), 587 deletions(-) create mode 100644 packages/tests/tests/connection-mananger/connection_state.spec.ts create mode 100644 packages/tests/tests/connection-mananger/dials.spec.ts create mode 100644 packages/tests/tests/connection-mananger/events.spec.ts create mode 100644 packages/tests/tests/connection-mananger/methods.spec.ts delete mode 100644 packages/tests/tests/connection_manager.spec.ts diff --git a/packages/tests/tests/connection-mananger/connection_state.spec.ts b/packages/tests/tests/connection-mananger/connection_state.spec.ts new file mode 100644 index 0000000000..aa0eb55d93 --- /dev/null +++ b/packages/tests/tests/connection-mananger/connection_state.spec.ts @@ -0,0 +1,174 @@ +import { Multiaddr } from "@multiformats/multiaddr"; +import { EConnectionStateEvents, LightNode, Protocols } from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { createRelayNode } from "@waku/sdk/relay"; +import { expect } from "chai"; + +import { delay, NOISE_KEY_1 } from "../../src/index.js"; +import { + makeLogFileName, + ServiceNode, + tearDownNodes +} from "../../src/index.js"; + +const TEST_TIMEOUT = 30_000; + +describe("Connection state", function () { + this.timeout(TEST_TIMEOUT); + let waku: LightNode; + + let nwaku1: ServiceNode; + let nwaku2: ServiceNode; + let nwaku1PeerId: Multiaddr; + let nwaku2PeerId: Multiaddr; + + beforeEach(async () => { + this.timeout(TEST_TIMEOUT); + waku = await createLightNode({ shardInfo: { shards: [0] } }); + nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); + nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); + await nwaku1.start({ filter: true }); + await nwaku2.start({ filter: true }); + nwaku1PeerId = await nwaku1.getMultiaddrWithId(); + nwaku2PeerId = await nwaku2.getMultiaddrWithId(); + }); + + afterEach(async () => { + this.timeout(TEST_TIMEOUT); + await tearDownNodes([nwaku1, nwaku2], waku); + }); + + it("should emit `waku:online` event only when first peer is connected", async function () { + let eventCount = 0; + const connectionStatus = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + eventCount++; + resolve(status); + } + ); + }); + + await waku.dial(nwaku1PeerId, [Protocols.Filter]); + await delay(400); + expect(await connectionStatus).to.eq(true); + expect(eventCount).to.be.eq(1); + + await waku.dial(nwaku2PeerId, [Protocols.Filter]); + await delay(400); + expect(eventCount).to.be.eq(1); + }); + + it("should emit `waku:offline` event only when all peers disconnect", async function () { + await waku.dial(nwaku1PeerId, [Protocols.Filter]); + await waku.dial(nwaku2PeerId, [Protocols.Filter]); + + let eventCount = 0; + const connectionStatus = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + eventCount++; + resolve(status); + } + ); + }); + + await nwaku1.stop(); + await delay(400); + expect(eventCount).to.be.eq(0); + + await nwaku2.stop(); + expect(await connectionStatus).to.eq(false); + expect(eventCount).to.be.eq(1); + }); + + it("`waku:online` bwtween 2 js-waku relay nodes", async function () { + const waku1 = await createRelayNode({ + staticNoiseKey: NOISE_KEY_1 + }); + const waku2 = await createRelayNode({ + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }); + + let eventCount1 = 0; + const connectionStatus1 = new Promise((resolve) => { + waku1.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + eventCount1++; + resolve(status); + } + ); + }); + + let eventCount2 = 0; + const connectionStatus2 = new Promise((resolve) => { + waku2.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + eventCount2++; + resolve(status); + } + ); + }); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([waku1.dial(waku2.libp2p.peerId)]); + await delay(400); + + expect(await connectionStatus1).to.eq(true); + expect(await connectionStatus2).to.eq(true); + expect(eventCount1).to.be.eq(1); + expect(eventCount2).to.be.eq(1); + }); + + it("isConnected should return true after first peer connects", async function () { + expect(waku.isConnected()).to.be.false; + await waku.dial(nwaku1PeerId, [Protocols.Filter]); + await delay(400); + expect(waku.isConnected()).to.be.true; + }); + + it("isConnected should return false after all peers disconnect", async function () { + await waku.dial(nwaku1PeerId, [Protocols.Filter]); + await waku.dial(nwaku2PeerId, [Protocols.Filter]); + await delay(250); + expect(waku.isConnected()).to.be.true; + + await waku.libp2p.hangUp(nwaku1PeerId); + expect(waku.isConnected()).to.be.true; + await waku.libp2p.hangUp(nwaku2PeerId); + expect(waku.isConnected()).to.be.false; + }); + + it("isConnected return false after peer stops", async function () { + expect(waku.isConnected()).to.be.false; + await waku.dial(nwaku1PeerId, [Protocols.Filter]); + await delay(400); + expect(waku.isConnected()).to.be.true; + + await nwaku1.stop(); + await delay(400); + expect(waku.isConnected()).to.be.false; + }); + + it("isConnected bwtween 2 js-waku relay nodes", async function () { + const waku1 = await createRelayNode({ + staticNoiseKey: NOISE_KEY_1 + }); + const waku2 = await createRelayNode({ + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }); + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([waku1.dial(waku2.libp2p.peerId)]); + await delay(400); + expect(waku1.isConnected()).to.be.true; + expect(waku2.isConnected()).to.be.true; + }); +}); diff --git a/packages/tests/tests/connection-mananger/dials.spec.ts b/packages/tests/tests/connection-mananger/dials.spec.ts new file mode 100644 index 0000000000..4d0b8f8f17 --- /dev/null +++ b/packages/tests/tests/connection-mananger/dials.spec.ts @@ -0,0 +1,215 @@ +import type { PeerInfo } from "@libp2p/interface"; +import { CustomEvent } from "@libp2p/interface"; +import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; +import { LightNode, Tags } from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { expect } from "chai"; +import sinon, { SinonSpy, SinonStub } from "sinon"; + +import { delay } from "../../src/index.js"; +import { tearDownNodes } from "../../src/index.js"; + +const DELAY_MS = 1_000; +const TEST_TIMEOUT = 20_000; + +describe("Dials", function () { + this.timeout(TEST_TIMEOUT); + let dialPeerStub: SinonStub; + let getConnectionsStub: SinonStub; + let getTagNamesForPeerStub: SinonStub; + let isPeerTopicConfigured: SinonStub; + let waku: LightNode; + + this.beforeEach(async function () { + this.timeout(TEST_TIMEOUT); + waku = await createLightNode({ shardInfo: { shards: [0] } }); + isPeerTopicConfigured = sinon.stub( + waku.connectionManager as any, + "isPeerTopicConfigured" + ); + isPeerTopicConfigured.resolves(true); + }); + + afterEach(async () => { + this.timeout(TEST_TIMEOUT); + await tearDownNodes([], waku); + isPeerTopicConfigured.restore(); + sinon.restore(); + }); + + describe("attemptDial method", function () { + let attemptDialSpy: SinonSpy; + + beforeEach(function () { + attemptDialSpy = sinon.spy(waku.connectionManager as any, "attemptDial"); + }); + + afterEach(function () { + attemptDialSpy.restore(); + }); + + it("should be called at least once on all `peer:discovery` events", async function () { + const totalPeerIds = 5; + for (let i = 1; i <= totalPeerIds; i++) { + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { + detail: { + id: await createSecp256k1PeerId(), + multiaddrs: [] + } + }) + ); + } + + await delay(100); + + expect(attemptDialSpy.callCount).to.be.greaterThanOrEqual( + totalPeerIds, + "attemptDial should be called at least once for each peer:discovery event" + ); + }); + }); + + describe("dialPeer method", function () { + let peerStoreHasStub: SinonStub; + let dialAttemptsForPeerHasStub: SinonStub; + beforeEach(function () { + getConnectionsStub = sinon.stub( + (waku.connectionManager as any).libp2p, + "getConnections" + ); + getTagNamesForPeerStub = sinon.stub( + waku.connectionManager as any, + "getTagNamesForPeer" + ); + dialPeerStub = sinon.stub(waku.connectionManager as any, "dialPeer"); + peerStoreHasStub = sinon.stub(waku.libp2p.peerStore, "has"); + dialAttemptsForPeerHasStub = sinon.stub( + (waku.connectionManager as any).dialAttemptsForPeer, + "has" + ); + + // simulate that the peer is not connected + getConnectionsStub.returns([]); + + // simulate that the peer is a bootstrap peer + getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]); + + // simulate that the peer is not in the peerStore + peerStoreHasStub.returns(false); + + // simulate that the peer has not been dialed before + dialAttemptsForPeerHasStub.returns(false); + }); + + afterEach(function () { + dialPeerStub.restore(); + getTagNamesForPeerStub.restore(); + getConnectionsStub.restore(); + peerStoreHasStub.restore(); + dialAttemptsForPeerHasStub.restore(); + }); + + describe("For bootstrap peers", function () { + it("should be called for bootstrap peers", async function () { + const bootstrapPeer = await createSecp256k1PeerId(); + + // emit a peer:discovery event + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { + detail: { id: bootstrapPeer, multiaddrs: [] } + }) + ); + + // wait for the async function calls within attemptDial to finish + await delay(DELAY_MS); + + // check that dialPeer was called once + expect(dialPeerStub.callCount).to.equal( + 1, + "dialPeer should be called for bootstrap peers" + ); + }); + + it("should not be called more than DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED times for bootstrap peers", async function () { + // emit first peer:discovery event + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { + detail: { + id: await createSecp256k1PeerId(), + multiaddrs: [] + } + }) + ); + await delay(500); + + // simulate that the peer is connected + getConnectionsStub.returns([{ tags: [{ name: Tags.BOOTSTRAP }] }]); + + // emit multiple peer:discovery events + const totalBootstrapPeers = 5; + for (let i = 1; i <= totalBootstrapPeers; i++) { + await delay(500); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { + detail: { + id: await createSecp256k1PeerId(), + multiaddrs: [] + } + }) + ); + } + + // check that dialPeer was called only once + expect(dialPeerStub.callCount).to.equal( + 1, + "dialPeer should not be called more than once for bootstrap peers" + ); + }); + }); + + describe("For peer-exchange peers", function () { + it("should be called for peers with PEER_EXCHANGE tags", async function () { + const pxPeer = await createSecp256k1PeerId(); + + // emit a peer:discovery event + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { + detail: { + id: pxPeer, + multiaddrs: [] + } + }) + ); + + // wait for the async function calls within attemptDial to finish + await delay(DELAY_MS); + + // check that dialPeer was called once + expect(dialPeerStub.callCount).to.equal( + 1, + "dialPeer should be called for peers with PEER_EXCHANGE tags" + ); + }); + + it("should be called for every peer with PEER_EXCHANGE tags", async function () { + // emit multiple peer:discovery events + const totalPxPeers = 5; + for (let i = 0; i < totalPxPeers; i++) { + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { + detail: { + id: await createSecp256k1PeerId(), + multiaddrs: [] + } + }) + ); + await delay(500); + } + + // check that dialPeer was called for each peer with PEER_EXCHANGE tags + expect(dialPeerStub.callCount).to.equal(totalPxPeers); + }); + }); + }); +}); diff --git a/packages/tests/tests/connection-mananger/events.spec.ts b/packages/tests/tests/connection-mananger/events.spec.ts new file mode 100644 index 0000000000..dec186cadb --- /dev/null +++ b/packages/tests/tests/connection-mananger/events.spec.ts @@ -0,0 +1,254 @@ +import type { PeerId, PeerInfo } from "@libp2p/interface"; +import { CustomEvent } from "@libp2p/interface"; +import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; +import { + EConnectionStateEvents, + EPeersByDiscoveryEvents, + LightNode, + Tags +} from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { expect } from "chai"; + +import { delay } from "../../src/index.js"; +import { tearDownNodes } from "../../src/index.js"; + +const TEST_TIMEOUT = 20_000; + +describe("Events", function () { + let waku: LightNode; + this.timeout(TEST_TIMEOUT); + beforeEach(async function () { + this.timeout(TEST_TIMEOUT); + waku = await createLightNode({ shardInfo: { shards: [0] } }); + }); + + afterEach(async () => { + this.timeout(TEST_TIMEOUT); + await tearDownNodes([], waku); + }); + + describe("peer:discovery", () => { + it("should emit `peer:discovery:bootstrap` event when a peer is discovered", async function () { + const peerIdBootstrap = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: 50, + ttl: 1200000 + } + } + }); + + const peerDiscoveryBootstrap = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdBootstrap.toString()); + } + ); + }); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { + detail: { + id: peerIdBootstrap, + multiaddrs: [] + } + }) + ); + + expect(await peerDiscoveryBootstrap).to.eq(true); + }); + + it("should emit `peer:discovery:peer-exchange` event when a peer is discovered", async function () { + const peerIdPx = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdPx, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000 + } + } + }); + + const peerDiscoveryPeerExchange = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdPx.toString()); + } + ); + }); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:discovery", { + detail: { + id: peerIdPx, + multiaddrs: [] + } + }) + ); + + expect(await peerDiscoveryPeerExchange).to.eq(true); + }); + }); + + describe("peer:connect", () => { + it("should emit `peer:connected:bootstrap` event when a peer is connected", async function () { + const peerIdBootstrap = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: 50, + ttl: 1200000 + } + } + }); + + const peerConnectedBootstrap = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdBootstrap.toString()); + } + ); + }); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + ); + + expect(await peerConnectedBootstrap).to.eq(true); + }); + it("should emit `peer:connected:peer-exchange` event when a peer is connected", async function () { + const peerIdPx = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdPx, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000 + } + } + }); + + const peerConnectedPeerExchange = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdPx.toString()); + } + ); + }); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdPx }) + ); + + expect(await peerConnectedPeerExchange).to.eq(true); + }); + }); + + describe("peer:disconnect", () => { + it("should emit `waku:offline` event when all peers disconnect", async function () { + const peerIdPx = await createSecp256k1PeerId(); + const peerIdPx2 = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdPx, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000 + } + } + }); + + await waku.libp2p.peerStore.save(peerIdPx2, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000 + } + } + }); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdPx }) + ); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdPx2 }) + ); + + await delay(100); + + let eventCount = 0; + const connectionStatus = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EConnectionStateEvents.CONNECTION_STATUS, + ({ detail: status }) => { + eventCount++; + resolve(status); + } + ); + }); + + expect(waku.isConnected()).to.be.true; + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:disconnect", { detail: peerIdPx }) + ); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:disconnect", { detail: peerIdPx2 }) + ); + + expect(await connectionStatus).to.eq(false); + expect(eventCount).to.be.eq(1); + }); + it("isConnected should return false after all peers disconnect", async function () { + const peerIdPx = await createSecp256k1PeerId(); + const peerIdPx2 = await createSecp256k1PeerId(); + + await waku.libp2p.peerStore.save(peerIdPx, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000 + } + } + }); + + await waku.libp2p.peerStore.save(peerIdPx2, { + tags: { + [Tags.PEER_EXCHANGE]: { + value: 50, + ttl: 1200000 + } + } + }); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdPx }) + ); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdPx2 }) + ); + + await delay(100); + + expect(waku.isConnected()).to.be.true; + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:disconnect", { detail: peerIdPx }) + ); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:disconnect", { detail: peerIdPx2 }) + ); + + expect(waku.isConnected()).to.be.false; + }); + }); +}); diff --git a/packages/tests/tests/connection-mananger/methods.spec.ts b/packages/tests/tests/connection-mananger/methods.spec.ts new file mode 100644 index 0000000000..a3532d9f1f --- /dev/null +++ b/packages/tests/tests/connection-mananger/methods.spec.ts @@ -0,0 +1,324 @@ +import type { PeerId } from "@libp2p/interface"; +import { CustomEvent } from "@libp2p/interface"; +import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; +import { + EPeersByDiscoveryEvents, + LightNode, + PeersByDiscoveryResult, + Tags +} from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { expect } from "chai"; + +import { delay } from "../../src/index.js"; +import { tearDownNodes } from "../../src/index.js"; + +const TEST_TIMEOUT = 20_000; + +describe("Public methods", function () { + let waku: LightNode; + this.timeout(TEST_TIMEOUT); + beforeEach(async function () { + this.timeout(TEST_TIMEOUT); + waku = await createLightNode({ shardInfo: { shards: [0] } }); + }); + + afterEach(async () => { + this.timeout(TEST_TIMEOUT); + await tearDownNodes([], waku); + }); + it("addEventListener with correct event", async function () { + const peerIdBootstrap = await createSecp256k1PeerId(); + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: 50, + ttl: 1200000 + } + } + }); + const peerConnectedBootstrap = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdBootstrap.toString()); + } + ); + }); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + ); + expect(await peerConnectedBootstrap).to.eq(true); + }); + + it("addEventListener with wrong event", async function () { + const peerIdBootstrap = await createSecp256k1PeerId(); + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: 50, + ttl: 1200000 + } + } + }); + const peerConnectedBootstrap = new Promise((resolve) => { + waku.connectionManager.addEventListener( + // setting PEER_CONNECT_PEER_EXCHANGE while the tag is BOOTSTRAP + EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdBootstrap.toString()); + } + ); + }); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + ); + const timeoutPromise = new Promise((resolve) => + setTimeout(() => resolve(false), TEST_TIMEOUT - 100) + ); + + const result = await Promise.race([peerConnectedBootstrap, timeoutPromise]); + + // If the timeout promise resolves first, the result will be false, and we expect it to be false (test passes) + // If the peerConnectedBootstrap resolves first, we expect its result to be true (which will now make the test fail if it's not true) + expect(result).to.eq(false); + }); + + it("removeEventListener with correct event", async function () { + const peerIdBootstrap = await createSecp256k1PeerId(); + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: 50, + ttl: 1200000 + } + } + }); + + let wasCalled = false; + + const eventListener = (event: CustomEvent): void => { + if (event.detail.toString() === peerIdBootstrap.toString()) { + wasCalled = true; + } + }; + + waku.connectionManager.addEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + eventListener + ); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + ); + await delay(200); + expect(wasCalled).to.eq(true); + + wasCalled = false; // resetting flag back to false and remove the listener + waku.connectionManager.removeEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + eventListener + ); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + ); + await delay(200); + expect(wasCalled).to.eq(false); + }); + + it("removeEventListener with wrong event", async function () { + const peerIdBootstrap = await createSecp256k1PeerId(); + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: 50, + ttl: 1200000 + } + } + }); + + let wasCalled = false; + + waku.connectionManager.addEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + ({ detail: receivedPeerId }) => { + if (receivedPeerId.toString() === peerIdBootstrap.toString()) { + wasCalled = true; + } + } + ); + + waku.connectionManager.removeEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE, + ({ detail: receivedPeerId }) => { + if (receivedPeerId.toString() === peerIdBootstrap.toString()) { + wasCalled = true; + } + } + ); + + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + ); + await delay(200); + expect(wasCalled).to.eq(true); + }); + + it("getPeersByDiscovery", async function () { + const peerIdBootstrap = await createSecp256k1PeerId(); + const peers_before = await waku.connectionManager.getPeersByDiscovery(); + expect(peers_before.DISCOVERED[Tags.BOOTSTRAP]).to.deep.eq([]); + + const ttl = 1200000; + const tag_value = 50; + + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: tag_value, + ttl: ttl + } + } + }); + + const currentTime = Date.now(); // Get the current time at the point peer connect + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + ); + + const peers_after = ( + await waku.connectionManager.getPeersByDiscovery() + ); + const bootstrap_peer = peers_after.DISCOVERED[Tags.BOOTSTRAP]; + + expect(bootstrap_peer).to.not.deep.eq([]); + expect(bootstrap_peer[0].id.toString()).to.eq(peerIdBootstrap.toString()); + expect(bootstrap_peer[0].tags.has("bootstrap")).to.be.true; + expect(bootstrap_peer[0].tags.get("bootstrap")!.value).to.equal(tag_value); + // Assert that the expiry is within the expected range, considering TTL + // Note: We allow a small margin for the execution time of the code + const marginOfError = 1000; // 1 second in milliseconds, adjust as needed + const expiry = (bootstrap_peer[0].tags.get("bootstrap") as any).expiry; + expect(Number(expiry)).to.be.closeTo(currentTime + ttl, marginOfError); + }); + + it("listenerCount", async function () { + const peerIdBootstrap = await createSecp256k1PeerId(); + waku.connectionManager.addEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + ({ detail: receivedPeerId }) => { + receivedPeerId.toString() === peerIdBootstrap.toString(); + } + ); + + expect( + waku.connectionManager.listenerCount( + EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP + ) + ).to.eq(0); + expect( + waku.connectionManager.listenerCount( + EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE + ) + ).to.eq(0); + expect( + waku.connectionManager.listenerCount( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP + ) + ).to.eq(1); + expect( + waku.connectionManager.listenerCount( + EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE + ) + ).to.eq(0); + }); + + // Will be skipped until https://github.com/waku-org/js-waku/issues/1835 is fixed + it.skip("dispatchEvent via connectionManager", async function () { + const peerIdBootstrap = await createSecp256k1PeerId(); + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: 50, + ttl: 1200000 + } + } + }); + const peerConnectedBootstrap = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdBootstrap.toString()); + } + ); + }); + waku.connectionManager.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + ); + expect(await peerConnectedBootstrap).to.eq(true); + }); + + it("safeDispatchEvent", async function () { + const peerIdBootstrap = await createSecp256k1PeerId(); + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: 50, + ttl: 1200000 + } + } + }); + const peerConnectedBootstrap = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdBootstrap.toString()); + } + ); + }); + + waku.connectionManager.safeDispatchEvent( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + { detail: peerIdBootstrap } + ); + + expect(await peerConnectedBootstrap).to.eq(true); + }); + + it("stop", async function () { + const peerIdBootstrap = await createSecp256k1PeerId(); + await waku.libp2p.peerStore.save(peerIdBootstrap, { + tags: { + [Tags.BOOTSTRAP]: { + value: 50, + ttl: 1200000 + } + } + }); + + const peerConnectedBootstrap = new Promise((resolve) => { + waku.connectionManager.addEventListener( + EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, + ({ detail: receivedPeerId }) => { + resolve(receivedPeerId.toString() === peerIdBootstrap.toString()); + } + ); + }); + + waku.connectionManager.stop(); + waku.libp2p.dispatchEvent( + new CustomEvent("peer:connect", { detail: peerIdBootstrap }) + ); + + const timeoutPromise = new Promise((resolve) => + setTimeout(() => resolve(false), TEST_TIMEOUT - 100) + ); + + const result = await Promise.race([peerConnectedBootstrap, timeoutPromise]); + + // If the timeout promise resolves first, the result will be false, and we expect it to be false (test passes) + // If the peerConnectedBootstrap resolves first, we expect its result to be true (which will now make the test fail if it's not true) + expect(result).to.eq(false); + }); +}); diff --git a/packages/tests/tests/connection_manager.spec.ts b/packages/tests/tests/connection_manager.spec.ts deleted file mode 100644 index 170c72da3f..0000000000 --- a/packages/tests/tests/connection_manager.spec.ts +++ /dev/null @@ -1,587 +0,0 @@ -import type { PeerId, PeerInfo } from "@libp2p/interface"; -import { CustomEvent } from "@libp2p/interface"; -import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; -import { Multiaddr } from "@multiformats/multiaddr"; -import { - EConnectionStateEvents, - EPeersByDiscoveryEvents, - LightNode, - Protocols, - Tags -} from "@waku/interfaces"; -import { createLightNode } from "@waku/sdk"; -import { expect } from "chai"; -import sinon, { SinonSpy, SinonStub } from "sinon"; - -import { delay } from "../src/index.js"; -import { makeLogFileName, ServiceNode, tearDownNodes } from "../src/index.js"; - -const TEST_TIMEOUT = 10_000; -const DELAY_MS = 1_000; - -describe("ConnectionManager", function () { - this.timeout(20_000); - let waku: LightNode; - - beforeEach(async function () { - waku = await createLightNode({ - shardInfo: { shards: [0] } - }); - }); - - afterEach(async () => { - this.timeout(15000); - await tearDownNodes([], waku); - }); - - describe("Events", () => { - describe("peer:discovery", () => { - it("should emit `peer:discovery:bootstrap` event when a peer is discovered", async function () { - this.timeout(TEST_TIMEOUT); - - const peerIdBootstrap = await createSecp256k1PeerId(); - - await waku.libp2p.peerStore.save(peerIdBootstrap, { - tags: { - [Tags.BOOTSTRAP]: { - value: 50, - ttl: 1200000 - } - } - }); - - const peerDiscoveryBootstrap = new Promise((resolve) => { - waku.connectionManager.addEventListener( - EPeersByDiscoveryEvents.PEER_DISCOVERY_BOOTSTRAP, - ({ detail: receivedPeerId }) => { - resolve(receivedPeerId.toString() === peerIdBootstrap.toString()); - } - ); - }); - - waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { - detail: { - id: peerIdBootstrap, - multiaddrs: [] - } - }) - ); - - expect(await peerDiscoveryBootstrap).to.eq(true); - }); - - it("should emit `peer:discovery:peer-exchange` event when a peer is discovered", async function () { - const peerIdPx = await createSecp256k1PeerId(); - - await waku.libp2p.peerStore.save(peerIdPx, { - tags: { - [Tags.PEER_EXCHANGE]: { - value: 50, - ttl: 1200000 - } - } - }); - - const peerDiscoveryPeerExchange = new Promise((resolve) => { - waku.connectionManager.addEventListener( - EPeersByDiscoveryEvents.PEER_DISCOVERY_PEER_EXCHANGE, - ({ detail: receivedPeerId }) => { - resolve(receivedPeerId.toString() === peerIdPx.toString()); - } - ); - }); - - waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { - detail: { - id: peerIdPx, - multiaddrs: [] - } - }) - ); - - expect(await peerDiscoveryPeerExchange).to.eq(true); - }); - }); - - describe("peer:connect", () => { - it("should emit `peer:connected:bootstrap` event when a peer is connected", async function () { - this.timeout(TEST_TIMEOUT); - - const peerIdBootstrap = await createSecp256k1PeerId(); - - await waku.libp2p.peerStore.save(peerIdBootstrap, { - tags: { - [Tags.BOOTSTRAP]: { - value: 50, - ttl: 1200000 - } - } - }); - - const peerConnectedBootstrap = new Promise((resolve) => { - waku.connectionManager.addEventListener( - EPeersByDiscoveryEvents.PEER_CONNECT_BOOTSTRAP, - ({ detail: receivedPeerId }) => { - resolve(receivedPeerId.toString() === peerIdBootstrap.toString()); - } - ); - }); - - waku.libp2p.dispatchEvent( - new CustomEvent("peer:connect", { detail: peerIdBootstrap }) - ); - - expect(await peerConnectedBootstrap).to.eq(true); - }); - it("should emit `peer:connected:peer-exchange` event when a peer is connected", async function () { - const peerIdPx = await createSecp256k1PeerId(); - - await waku.libp2p.peerStore.save(peerIdPx, { - tags: { - [Tags.PEER_EXCHANGE]: { - value: 50, - ttl: 1200000 - } - } - }); - - const peerConnectedPeerExchange = new Promise((resolve) => { - waku.connectionManager.addEventListener( - EPeersByDiscoveryEvents.PEER_CONNECT_PEER_EXCHANGE, - ({ detail: receivedPeerId }) => { - resolve(receivedPeerId.toString() === peerIdPx.toString()); - } - ); - }); - - waku.libp2p.dispatchEvent( - new CustomEvent("peer:connect", { detail: peerIdPx }) - ); - - expect(await peerConnectedPeerExchange).to.eq(true); - }); - }); - - describe("peer:disconnect", () => { - it("should emit `waku:offline` event when all peers disconnect", async function () { - const peerIdPx = await createSecp256k1PeerId(); - const peerIdPx2 = await createSecp256k1PeerId(); - - await waku.libp2p.peerStore.save(peerIdPx, { - tags: { - [Tags.PEER_EXCHANGE]: { - value: 50, - ttl: 1200000 - } - } - }); - - await waku.libp2p.peerStore.save(peerIdPx2, { - tags: { - [Tags.PEER_EXCHANGE]: { - value: 50, - ttl: 1200000 - } - } - }); - - waku.libp2p.dispatchEvent( - new CustomEvent("peer:connect", { detail: peerIdPx }) - ); - waku.libp2p.dispatchEvent( - new CustomEvent("peer:connect", { detail: peerIdPx2 }) - ); - - await delay(100); - - let eventCount = 0; - const connectionStatus = new Promise((resolve) => { - waku.connectionManager.addEventListener( - EConnectionStateEvents.CONNECTION_STATUS, - ({ detail: status }) => { - eventCount++; - resolve(status); - } - ); - }); - - expect(waku.isConnected()).to.be.true; - - waku.libp2p.dispatchEvent( - new CustomEvent("peer:disconnect", { detail: peerIdPx }) - ); - waku.libp2p.dispatchEvent( - new CustomEvent("peer:disconnect", { detail: peerIdPx2 }) - ); - - expect(await connectionStatus).to.eq(false); - expect(eventCount).to.be.eq(1); - }); - it("isConnected should return false after all peers disconnect", async function () { - const peerIdPx = await createSecp256k1PeerId(); - const peerIdPx2 = await createSecp256k1PeerId(); - - await waku.libp2p.peerStore.save(peerIdPx, { - tags: { - [Tags.PEER_EXCHANGE]: { - value: 50, - ttl: 1200000 - } - } - }); - - await waku.libp2p.peerStore.save(peerIdPx2, { - tags: { - [Tags.PEER_EXCHANGE]: { - value: 50, - ttl: 1200000 - } - } - }); - - waku.libp2p.dispatchEvent( - new CustomEvent("peer:connect", { detail: peerIdPx }) - ); - waku.libp2p.dispatchEvent( - new CustomEvent("peer:connect", { detail: peerIdPx2 }) - ); - - await delay(100); - - expect(waku.isConnected()).to.be.true; - - waku.libp2p.dispatchEvent( - new CustomEvent("peer:disconnect", { detail: peerIdPx }) - ); - waku.libp2p.dispatchEvent( - new CustomEvent("peer:disconnect", { detail: peerIdPx2 }) - ); - - expect(waku.isConnected()).to.be.false; - }); - }); - }); - - describe("Dials", () => { - let dialPeerStub: SinonStub; - let getConnectionsStub: SinonStub; - let getTagNamesForPeerStub: SinonStub; - let isPeerTopicConfigured: SinonStub; - let waku: LightNode; - - this.beforeEach(async function () { - this.timeout(15000); - waku = await createLightNode({ shardInfo: { shards: [0] } }); - isPeerTopicConfigured = sinon.stub( - waku.connectionManager as any, - "isPeerTopicConfigured" - ); - isPeerTopicConfigured.resolves(true); - }); - - afterEach(async () => { - this.timeout(15000); - await tearDownNodes([], waku); - isPeerTopicConfigured.restore(); - sinon.restore(); - }); - - describe("attemptDial method", function () { - let attemptDialSpy: SinonSpy; - - beforeEach(function () { - attemptDialSpy = sinon.spy( - waku.connectionManager as any, - "attemptDial" - ); - }); - - afterEach(function () { - attemptDialSpy.restore(); - }); - - it("should be called at least once on all `peer:discovery` events", async function () { - this.timeout(TEST_TIMEOUT); - - const totalPeerIds = 5; - for (let i = 1; i <= totalPeerIds; i++) { - waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { - detail: { - id: await createSecp256k1PeerId(), - multiaddrs: [] - } - }) - ); - } - - await delay(100); - - expect(attemptDialSpy.callCount).to.be.greaterThanOrEqual( - totalPeerIds, - "attemptDial should be called at least once for each peer:discovery event" - ); - }); - }); - - describe("dialPeer method", function () { - let peerStoreHasStub: SinonStub; - let dialAttemptsForPeerHasStub: SinonStub; - beforeEach(function () { - getConnectionsStub = sinon.stub( - (waku.connectionManager as any).libp2p, - "getConnections" - ); - getTagNamesForPeerStub = sinon.stub( - waku.connectionManager as any, - "getTagNamesForPeer" - ); - dialPeerStub = sinon.stub(waku.connectionManager as any, "dialPeer"); - peerStoreHasStub = sinon.stub(waku.libp2p.peerStore, "has"); - dialAttemptsForPeerHasStub = sinon.stub( - (waku.connectionManager as any).dialAttemptsForPeer, - "has" - ); - - // simulate that the peer is not connected - getConnectionsStub.returns([]); - - // simulate that the peer is a bootstrap peer - getTagNamesForPeerStub.resolves([Tags.BOOTSTRAP]); - - // simulate that the peer is not in the peerStore - peerStoreHasStub.returns(false); - - // simulate that the peer has not been dialed before - dialAttemptsForPeerHasStub.returns(false); - }); - - afterEach(function () { - dialPeerStub.restore(); - getTagNamesForPeerStub.restore(); - getConnectionsStub.restore(); - peerStoreHasStub.restore(); - dialAttemptsForPeerHasStub.restore(); - }); - - describe("For bootstrap peers", function () { - it("should be called for bootstrap peers", async function () { - this.timeout(TEST_TIMEOUT); - - const bootstrapPeer = await createSecp256k1PeerId(); - - // emit a peer:discovery event - waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { - detail: { id: bootstrapPeer, multiaddrs: [] } - }) - ); - - // wait for the async function calls within attemptDial to finish - await delay(DELAY_MS); - - // check that dialPeer was called once - expect(dialPeerStub.callCount).to.equal( - 1, - "dialPeer should be called for bootstrap peers" - ); - }); - - it("should not be called more than DEFAULT_MAX_BOOTSTRAP_PEERS_ALLOWED times for bootstrap peers", async function () { - this.timeout(TEST_TIMEOUT); - - // emit first peer:discovery event - waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { - detail: { - id: await createSecp256k1PeerId(), - multiaddrs: [] - } - }) - ); - await delay(500); - - // simulate that the peer is connected - getConnectionsStub.returns([{ tags: [{ name: Tags.BOOTSTRAP }] }]); - - // emit multiple peer:discovery events - const totalBootstrapPeers = 5; - for (let i = 1; i <= totalBootstrapPeers; i++) { - await delay(500); - waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { - detail: { - id: await createSecp256k1PeerId(), - multiaddrs: [] - } - }) - ); - } - - // check that dialPeer was called only once - expect(dialPeerStub.callCount).to.equal( - 1, - "dialPeer should not be called more than once for bootstrap peers" - ); - }); - }); - - describe("For peer-exchange peers", function () { - it("should be called for peers with PEER_EXCHANGE tags", async function () { - this.timeout(TEST_TIMEOUT); - - const pxPeer = await createSecp256k1PeerId(); - - // emit a peer:discovery event - waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { - detail: { - id: pxPeer, - multiaddrs: [] - } - }) - ); - - // wait for the async function calls within attemptDial to finish - await delay(DELAY_MS); - - // check that dialPeer was called once - expect(dialPeerStub.callCount).to.equal( - 1, - "dialPeer should be called for peers with PEER_EXCHANGE tags" - ); - }); - - it("should be called for every peer with PEER_EXCHANGE tags", async function () { - this.timeout(TEST_TIMEOUT); - - // emit multiple peer:discovery events - const totalPxPeers = 5; - for (let i = 0; i < totalPxPeers; i++) { - waku.libp2p.dispatchEvent( - new CustomEvent("peer:discovery", { - detail: { - id: await createSecp256k1PeerId(), - multiaddrs: [] - } - }) - ); - await delay(500); - } - - // check that dialPeer was called for each peer with PEER_EXCHANGE tags - expect(dialPeerStub.callCount).to.equal(totalPxPeers); - }); - }); - }); - }); - - describe("Connection state", () => { - this.timeout(20_000); - let nwaku1: ServiceNode; - let nwaku2: ServiceNode; - let nwaku1PeerId: Multiaddr; - let nwaku2PeerId: Multiaddr; - - beforeEach(async () => { - this.timeout(20_000); - nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); - nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); - await nwaku1.start({ - filter: true - }); - - await nwaku2.start({ - filter: true - }); - - nwaku1PeerId = await nwaku1.getMultiaddrWithId(); - nwaku2PeerId = await nwaku2.getMultiaddrWithId(); - }); - - afterEach(async () => { - this.timeout(15000); - await tearDownNodes([nwaku1, nwaku2], []); - }); - - it("should emit `waku:online` event only when first peer is connected", async function () { - this.timeout(20_000); - - let eventCount = 0; - const connectionStatus = new Promise((resolve) => { - waku.connectionManager.addEventListener( - EConnectionStateEvents.CONNECTION_STATUS, - ({ detail: status }) => { - eventCount++; - resolve(status); - } - ); - }); - - // await waku.start(); - await waku.dial(nwaku1PeerId, [Protocols.Filter]); - await waku.dial(nwaku2PeerId, [Protocols.Filter]); - - await delay(250); - - expect(await connectionStatus).to.eq(true); - expect(eventCount).to.be.eq(1); - }); - - it("isConnected should return true after first peer connects", async function () { - this.timeout(20_000); - expect(waku.isConnected()).to.be.false; - - // await waku.start(); - await waku.dial(nwaku1PeerId, [Protocols.Filter]); - await waku.dial(nwaku2PeerId, [Protocols.Filter]); - - await delay(250); - - expect(waku.isConnected()).to.be.true; - }); - - it("should emit `waku:offline` event only when all peers disconnect", async function () { - this.timeout(20_000); - expect(waku.isConnected()).to.be.false; - - await waku.dial(nwaku1PeerId, [Protocols.Filter]); - await waku.dial(nwaku2PeerId, [Protocols.Filter]); - - await delay(250); - - let eventCount = 0; - const connectionStatus = new Promise((resolve) => { - waku.connectionManager.addEventListener( - EConnectionStateEvents.CONNECTION_STATUS, - ({ detail: status }) => { - eventCount++; - resolve(status); - } - ); - }); - - await waku.libp2p.hangUp(nwaku1PeerId); - await waku.libp2p.hangUp(nwaku2PeerId); - expect(await connectionStatus).to.eq(false); - expect(eventCount).to.be.eq(1); - }); - - it("isConnected should return false after all peers disconnect", async function () { - this.timeout(20_000); - expect(waku.isConnected()).to.be.false; - - await waku.dial(nwaku1PeerId, [Protocols.Filter]); - await waku.dial(nwaku2PeerId, [Protocols.Filter]); - - await delay(250); - expect(waku.isConnected()).to.be.true; - - await waku.libp2p.hangUp(nwaku1PeerId); - await waku.libp2p.hangUp(nwaku2PeerId); - expect(waku.isConnected()).to.be.false; - }); - }); -});