diff --git a/packages/tests/src/utils/custom_mocha_hooks.ts b/packages/tests/src/utils/custom_mocha_hooks.ts index b2c4a43722..1f55855a23 100644 --- a/packages/tests/src/utils/custom_mocha_hooks.ts +++ b/packages/tests/src/utils/custom_mocha_hooks.ts @@ -7,7 +7,7 @@ const log = new Logger("test:mocha-hook"); function withGracefulTimeout( asyncOperation: () => Promise, doneCallback: (error?: unknown) => void, - timeoutDuration: number = MOCHA_HOOK_MAX_TIMEOUT + timeoutDuration = MOCHA_HOOK_MAX_TIMEOUT ): void { let operationCompleted = false; @@ -45,24 +45,26 @@ function withGracefulTimeout( export const beforeEachCustom = function ( suite: Suite, - cb: () => Promise + cb: () => Promise, + timeout = MOCHA_HOOK_MAX_TIMEOUT ): void { const timeoutBefore = suite.timeout(); - suite.timeout(MOCHA_HOOK_MAX_TIMEOUT); + suite.timeout(timeout); suite.beforeEach((done) => { - withGracefulTimeout(cb, done); + withGracefulTimeout(cb, done, timeout); }); suite.timeout(timeoutBefore); // restore timeout to the original value }; export const afterEachCustom = function ( suite: Suite, - cb: () => Promise + cb: () => Promise, + timeout = MOCHA_HOOK_MAX_TIMEOUT ): void { const timeoutBefore = suite.timeout(); - suite.timeout(MOCHA_HOOK_MAX_TIMEOUT); + suite.timeout(timeout); suite.afterEach((done) => { - withGracefulTimeout(cb, done); + withGracefulTimeout(cb, done, timeout); }); suite.timeout(timeoutBefore); // restore timeout to the original value }; diff --git a/packages/tests/tests/peer-exchange/compliance.spec.ts b/packages/tests/tests/peer-exchange/compliance.spec.ts new file mode 100644 index 0000000000..6fed552a2d --- /dev/null +++ b/packages/tests/tests/peer-exchange/compliance.spec.ts @@ -0,0 +1,64 @@ +import tests from "@libp2p/interface-compliance-tests/peer-discovery"; +import type { LightNode } from "@waku/interfaces"; +import { PeerExchangeCodec, PeerExchangeDiscovery } from "@waku/peer-exchange"; +import { createLightNode } from "@waku/sdk"; +import { singleShardInfoToPubsubTopic } from "@waku/utils"; + +import { + beforeEachCustom, + makeLogFileName, + ServiceNode, + tearDownNodes +} from "../../src/index.js"; + +const pubsubTopic = [singleShardInfoToPubsubTopic({ clusterId: 0, shard: 2 })]; + +describe("Peer Exchange", function () { + describe("Compliance Test", function () { + this.timeout(100_000); + + let waku: LightNode; + let nwaku1: ServiceNode; + let nwaku2: ServiceNode; + + beforeEachCustom(this, async () => { + nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); + nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); + }); + + tests({ + async setup() { + await nwaku1.start({ + relay: true, + discv5Discovery: true, + peerExchange: true + }); + + const enr = (await nwaku1.info()).enrUri; + + await nwaku2.start({ + relay: true, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: enr + }); + + waku = await createLightNode(); + await waku.start(); + + const nwaku2Ma = await nwaku2.getMultiaddrWithId(); + + // we do this because we want peer-exchange discovery to get initialised before we dial the peer which contains info about the other peer + setTimeout(() => { + void waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec); + }, 1000); + + return new PeerExchangeDiscovery(waku.libp2p.components, pubsubTopic); + }, + teardown: async () => { + this.timeout(15000); + await tearDownNodes([nwaku1, nwaku2], waku); + } + }); + }); +}); diff --git a/packages/tests/tests/peer-exchange/index.spec.ts b/packages/tests/tests/peer-exchange/index.spec.ts new file mode 100644 index 0000000000..84aea0fb86 --- /dev/null +++ b/packages/tests/tests/peer-exchange/index.spec.ts @@ -0,0 +1,236 @@ +import { bootstrap } from "@libp2p/bootstrap"; +import type { PeerId } from "@libp2p/interface"; +import type { LightNode, PeersByDiscoveryResult } from "@waku/interfaces"; +import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; +import { createLightNode, Tags } from "@waku/sdk"; +import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; +import { expect } from "chai"; +import Sinon, { SinonSpy } from "sinon"; + +import { + afterEachCustom, + beforeEachCustom, + makeLogFileName, + ServiceNode, + tearDownNodes +} from "../../src/index.js"; + +export const log = new Logger("test:pe"); +const pubsubTopic = [singleShardInfoToPubsubTopic({ clusterId: 0, shard: 2 })]; + +describe("Peer Exchange", function () { + this.timeout(150_000); + let waku: LightNode; + let nwaku1: ServiceNode; + let nwaku2: ServiceNode; + let nwaku3: ServiceNode; + let dialPeerSpy: SinonSpy; + let nwaku1PeerId: PeerId; + + beforeEachCustom(this, async () => { + nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); + nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); + await nwaku1.start({ + pubsubTopic: pubsubTopic, + discv5Discovery: true, + peerExchange: true, + relay: true + }); + await nwaku2.start({ + pubsubTopic: pubsubTopic, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: (await nwaku1.info()).enrUri, + relay: true + }); + nwaku1PeerId = await nwaku1.getPeerId(); + }); + + afterEachCustom(this, async () => { + await tearDownNodes([nwaku1, nwaku2, nwaku3], waku); + }); + + it("getPeersByDiscovery", async function () { + waku = await createLightNode({ + libp2p: { + peerDiscovery: [ + bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }), + wakuPeerExchangeDiscovery(pubsubTopic) + ] + } + }); + await waku.start(); + dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); + const pxPeersDiscovered = new Set(); + await new Promise((resolve) => { + waku.libp2p.addEventListener("peer:discovery", (evt) => { + return void (async () => { + const peerId = evt.detail.id; + const peer = await waku.libp2p.peerStore.get(peerId); + const tags = Array.from(peer.tags.keys()); + if (tags.includes(Tags.PEER_EXCHANGE)) { + pxPeersDiscovered.add(peerId); + if (pxPeersDiscovered.size === 1) { + resolve(); + } + } + })(); + }); + }); + expect(dialPeerSpy.callCount).to.equal(1); + + const peers_after = ( + await waku.connectionManager.getPeersByDiscovery() + ); + const discovered_peer_exchange = peers_after.DISCOVERED[Tags.PEER_EXCHANGE]; + const discovered_bootstram = peers_after.DISCOVERED[Tags.BOOTSTRAP]; + const connected_peer_exchange = peers_after.CONNECTED[Tags.PEER_EXCHANGE]; + const connected_bootstram = peers_after.CONNECTED[Tags.BOOTSTRAP]; + expect(discovered_peer_exchange.length).to.eq(1); + expect(discovered_peer_exchange[0].id.toString()).to.eq( + nwaku1PeerId.toString() + ); + expect(discovered_peer_exchange[0].tags.has("peer-exchange")).to.be.true; + expect(discovered_bootstram.length).to.eq(1); + expect(connected_peer_exchange.length).to.eq(0); + expect(connected_bootstram.length).to.eq(1); + }); + + // will be skipped until https://github.com/waku-org/js-waku/issues/1860 is fixed + it.skip("new peer added after a peer was already found", async function () { + waku = await createLightNode({ + libp2p: { + peerDiscovery: [ + bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }), + wakuPeerExchangeDiscovery(pubsubTopic) + ] + } + }); + await waku.start(); + + dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); + const pxPeersDiscovered = new Set(); + await new Promise((resolve) => { + waku.libp2p.addEventListener("peer:discovery", (evt) => { + return void (async () => { + const peerId = evt.detail.id; + const peer = await waku.libp2p.peerStore.get(peerId); + const tags = Array.from(peer.tags.keys()); + if (tags.includes(Tags.PEER_EXCHANGE)) { + pxPeersDiscovered.add(peerId); + if (pxPeersDiscovered.size === 1) { + resolve(); + } + } + })(); + }); + }); + + nwaku3 = new ServiceNode(makeLogFileName(this) + "3"); + await nwaku3.start({ + pubsubTopic: pubsubTopic, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: (await nwaku1.info()).enrUri, + relay: true, + lightpush: true, + filter: true + }); + + await new Promise((resolve) => { + waku.libp2p.addEventListener("peer:discovery", (evt) => { + return void (async () => { + const peerId = evt.detail.id; + const peer = await waku.libp2p.peerStore.get(peerId); + const tags = Array.from(peer.tags.keys()); + if (tags.includes(Tags.PEER_EXCHANGE)) { + pxPeersDiscovered.add(peerId); + if (pxPeersDiscovered.size === 2) { + resolve(); + } + } + })(); + }); + }); + }); + + // will be skipped until https://github.com/waku-org/js-waku/issues/1858 is fixed + it.skip("wrong wakuPeerExchangeDiscovery pubsub topic", async function () { + waku = await createLightNode({ + libp2p: { + peerDiscovery: [ + bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }), + wakuPeerExchangeDiscovery(["wrong"]) + ] + } + }); + await waku.start(); + dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); + + const pxPeersDiscovered = new Set(); + await new Promise((resolve) => { + const timeoutId = setTimeout(() => { + resolve(); + }, 40000); + + waku.libp2p.addEventListener("peer:discovery", (evt) => { + return void (async () => { + const peerId = evt.detail.id; + const peer = await waku.libp2p.peerStore.get(peerId); + const tags = Array.from(peer.tags.keys()); + if (tags.includes(Tags.PEER_EXCHANGE)) { + pxPeersDiscovered.add(peerId); + if (pxPeersDiscovered.size === 1) { + clearTimeout(timeoutId); + resolve(); + } + } + })(); + }); + }); + + expect( + pxPeersDiscovered.size, + "No peer should have been discovered" + ).to.equal(0); + }); + + it("peerDiscovery without wakuPeerExchangeDiscovery", async function () { + waku = await createLightNode({ + libp2p: { + peerDiscovery: [ + bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }) + ] + } + }); + await waku.start(); + dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); + + const pxPeersDiscovered = new Set(); + await new Promise((resolve) => { + const timeoutId = setTimeout(() => { + resolve(); + }, 40000); + + waku.libp2p.addEventListener("peer:discovery", (evt) => { + return void (async () => { + const peerId = evt.detail.id; + const peer = await waku.libp2p.peerStore.get(peerId); + const tags = Array.from(peer.tags.keys()); + if (tags.includes(Tags.PEER_EXCHANGE)) { + pxPeersDiscovered.add(peerId); + if (pxPeersDiscovered.size === 1) { + clearTimeout(timeoutId); + resolve(); + } + } + })(); + }); + }); + + expect( + pxPeersDiscovered.size, + "No peer should have been discovered" + ).to.equal(0); + }); +}); diff --git a/packages/tests/tests/peer_exchange.optional.spec.ts b/packages/tests/tests/peer-exchange/pe.optional.spec.ts similarity index 96% rename from packages/tests/tests/peer_exchange.optional.spec.ts rename to packages/tests/tests/peer-exchange/pe.optional.spec.ts index 15282f6518..2a1824d74f 100644 --- a/packages/tests/tests/peer_exchange.optional.spec.ts +++ b/packages/tests/tests/peer-exchange/pe.optional.spec.ts @@ -8,7 +8,7 @@ import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange"; import { createLightNode, DefaultPubsubTopic } from "@waku/sdk"; import { expect } from "chai"; -import { afterEachCustom, tearDownNodes } from "../src"; +import { afterEachCustom, tearDownNodes } from "../../src"; describe("Peer Exchange", () => { describe("Auto Discovery", function () { diff --git a/packages/tests/tests/peer-exchange/query.spec.ts b/packages/tests/tests/peer-exchange/query.spec.ts new file mode 100644 index 0000000000..2ed1e782ce --- /dev/null +++ b/packages/tests/tests/peer-exchange/query.spec.ts @@ -0,0 +1,163 @@ +import { bootstrap } from "@libp2p/bootstrap"; +import type { PeerId } from "@libp2p/interface"; +import { multiaddr } from "@multiformats/multiaddr"; +import type { Multiaddr } from "@multiformats/multiaddr"; +import type { LightNode, PeerInfo } from "@waku/interfaces"; +import { + PeerExchangeCodec, + WakuPeerExchange, + wakuPeerExchangeDiscovery +} from "@waku/peer-exchange"; +import { createLightNode, Libp2pComponents } from "@waku/sdk"; +import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; +import { expect } from "chai"; + +import { + afterEachCustom, + beforeEachCustom, + delay, + makeLogFileName, + ServiceNode, + tearDownNodes, + waitForRemotePeerWithCodec +} from "../../src/index.js"; + +export const log = new Logger("test:pe"); + +const pubsubTopic = [singleShardInfoToPubsubTopic({ clusterId: 0, shard: 2 })]; + +describe("Peer Exchange Query", function () { + this.timeout(30_000); + let waku: LightNode; + let nwaku1: ServiceNode; + let nwaku2: ServiceNode; + let nwaku3: ServiceNode; + let nwaku1PeerId: PeerId; + let nwaku3MA: Multiaddr; + let nwaku3PeerId: PeerId; + let components: Libp2pComponents; + let peerExchange: WakuPeerExchange; + let numPeersToRequest: number; + let peerInfos: PeerInfo[]; + + beforeEachCustom( + this, + async () => { + nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); + nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); + nwaku3 = new ServiceNode(makeLogFileName(this.ctx) + "3"); + await nwaku1.start({ + pubsubTopic: pubsubTopic, + discv5Discovery: true, + peerExchange: true, + relay: true + }); + nwaku1PeerId = await nwaku1.getPeerId(); + await nwaku2.start({ + pubsubTopic: pubsubTopic, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: (await nwaku1.info()).enrUri, + relay: true + }); + await nwaku3.start({ + pubsubTopic: pubsubTopic, + discv5Discovery: true, + peerExchange: true, + discv5BootstrapNode: (await nwaku2.info()).enrUri, + relay: true + }); + nwaku3MA = await nwaku3.getMultiaddrWithId(); + nwaku3PeerId = await nwaku3.getPeerId(); + waku = await createLightNode({ + libp2p: { + peerDiscovery: [ + bootstrap({ list: [nwaku3MA.toString()] }), + wakuPeerExchangeDiscovery(pubsubTopic) + ] + } + }); + await waku.start(); + await waku.libp2p.dialProtocol(nwaku3MA, PeerExchangeCodec); + await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, nwaku3PeerId); + + components = waku.libp2p.components as unknown as Libp2pComponents; + peerExchange = new WakuPeerExchange(components, pubsubTopic); + numPeersToRequest = 2; + + // querying the connected peer + peerInfos = []; + while (peerInfos.length != numPeersToRequest) { + try { + peerInfos = (await peerExchange.query({ + peerId: nwaku3PeerId, + numPeers: numPeersToRequest + })) as PeerInfo[]; + } catch (error) { + log.error("Error encountered, retrying..."); + } + await delay(2000); + } + }, + 100000 + ); + + afterEachCustom(this, async () => { + await tearDownNodes([nwaku1, nwaku2, nwaku3], waku); + }); + + it("connected peers and dial", async function () { + expect(peerInfos[0].ENR).to.not.be.null; + expect(peerInfos[0].ENR?.peerInfo?.multiaddrs).to.not.be.null; + + const peerWsMA = peerInfos[0].ENR?.peerInfo?.multiaddrs[2]; + const localPeerWsMAAsString = peerWsMA + ?.toString() + .replace(/\/ip4\/[\d.]+\//, "/ip4/127.0.0.1/"); + const localPeerWsMA = multiaddr(localPeerWsMAAsString); + + let foundNodePeerId: PeerId | undefined = undefined; + const doesPeerIdExistInResponse = peerInfos.some(({ ENR }) => { + foundNodePeerId = ENR?.peerInfo?.id; + return ENR?.peerInfo?.id.toString() === nwaku1PeerId.toString(); + }); + if (!foundNodePeerId) { + throw new Error("Peer1 ID not found"); + } + expect(doesPeerIdExistInResponse, "peer not found").to.be.equal(true); + + await waku.libp2p.dialProtocol(localPeerWsMA, PeerExchangeCodec); + await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, foundNodePeerId); + }); + + it("more peers than existing", async function () { + const peerInfo = await peerExchange.query({ + peerId: nwaku3PeerId, + numPeers: 5 + }); + expect(peerInfo?.length).to.be.eq(numPeersToRequest); + }); + + it("less peers than existing", async function () { + const peerInfo = await peerExchange.query({ + peerId: nwaku3PeerId, + numPeers: 1 + }); + expect(peerInfo?.length).to.be.eq(1); + }); + + it("non connected peers", async function () { + // querying the non connected peer + try { + await peerExchange.query({ + peerId: nwaku1PeerId, + numPeers: numPeersToRequest + }); + throw new Error("Query on not connected peer succeeded unexpectedly."); + } catch (error) { + if (!(error instanceof Error && error.message === "Not Found")) { + throw error; + } + } + }); +}); diff --git a/packages/tests/tests/peer_exchange.node.spec.ts b/packages/tests/tests/peer_exchange.node.spec.ts deleted file mode 100644 index 07a417dcd1..0000000000 --- a/packages/tests/tests/peer_exchange.node.spec.ts +++ /dev/null @@ -1,172 +0,0 @@ -import type { PeerId } from "@libp2p/interface"; -import tests from "@libp2p/interface-compliance-tests/peer-discovery"; -import type { Multiaddr } from "@multiformats/multiaddr"; -import type { LightNode, PeerInfo } from "@waku/interfaces"; -import { - PeerExchangeCodec, - PeerExchangeDiscovery, - WakuPeerExchange -} from "@waku/peer-exchange"; -import { - createLightNode, - DEFAULT_CLUSTER_ID, - DefaultPubsubTopic, - Libp2pComponents -} from "@waku/sdk"; -import { expect } from "chai"; - -import { - afterEachCustom, - beforeEachCustom, - delay, - makeLogFileName, - ServiceNode, - tearDownNodes, - waitForRemotePeerWithCodec -} from "../src/index.js"; - -describe("Peer Exchange", function () { - describe("Locally Run Nodes", () => { - let waku: LightNode; - let nwaku1: ServiceNode; - let nwaku2: ServiceNode; - - beforeEachCustom(this, async () => { - nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); - nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); - }); - - afterEachCustom(this, async () => { - await tearDownNodes([nwaku1, nwaku2], waku); - }); - - it.skip("nwaku interop", async function () { - this.timeout(100_000); - - await nwaku1.start({ - relay: true, - discv5Discovery: true, - peerExchange: true, - clusterId: DEFAULT_CLUSTER_ID - }); - - const enr = (await nwaku1.info()).enrUri; - - await nwaku2.start({ - relay: true, - discv5Discovery: true, - peerExchange: true, - discv5BootstrapNode: enr, - clusterId: DEFAULT_CLUSTER_ID - }); - - const nwaku1PeerId = await nwaku1.getPeerId(); - const nwaku2PeerId = await nwaku2.getPeerId(); - const nwaku2Ma = await nwaku2.getMultiaddrWithId(); - - waku = await createLightNode({ shardInfo: { shards: [0] } }); - await waku.start(); - await waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec); - await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, nwaku2PeerId); - - const components = waku.libp2p.components as unknown as Libp2pComponents; - const peerExchange = new WakuPeerExchange(components, [ - DefaultPubsubTopic - ]); - - const numPeersToRequest = 1; - - let peerInfos: PeerInfo[] = []; - while (peerInfos.length <= 0) { - peerInfos = (await peerExchange.query({ - peerId: nwaku2PeerId, - numPeers: numPeersToRequest - })) as PeerInfo[]; - await delay(3000); - } - - expect(peerInfos.length).to.be.greaterThan(0); - expect(peerInfos.length).to.be.lessThanOrEqual(numPeersToRequest); - expect(peerInfos[0].ENR).to.not.be.null; - expect(peerInfos[0].ENR?.peerInfo?.multiaddrs).to.not.be.null; - - let foundNodeMultiaddrs: Multiaddr[] = []; - let foundNodePeerId: PeerId | undefined = undefined; - const doesPeerIdExistInResponse = peerInfos.some(({ ENR }) => { - foundNodeMultiaddrs = ENR?.peerInfo?.multiaddrs ?? []; - foundNodePeerId = ENR?.peerInfo?.id; - return ENR?.peerInfo?.id.toString() === nwaku1PeerId.toString(); - }); - - if (!foundNodePeerId) { - throw new Error("Peer ID not found"); - } - - if (!foundNodePeerId) { - throw new Error("Peer ID not found"); - } - - expect(doesPeerIdExistInResponse).to.be.equal(true); - - await waku.libp2p.dialProtocol(foundNodeMultiaddrs, PeerExchangeCodec); - await waitForRemotePeerWithCodec( - waku, - PeerExchangeCodec, - foundNodePeerId - ); - - expect(await waku.libp2p.peerStore.has(nwaku1PeerId)).to.eq(true); - expect(waku.libp2p.getConnections()).has.length(2); - }); - }); - - describe("Compliance Test", function () { - this.timeout(100_000); - - let waku: LightNode; - let nwaku1: ServiceNode; - let nwaku2: ServiceNode; - - beforeEachCustom(this, async () => { - nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); - nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); - }); - - tests({ - async setup() { - await nwaku1.start({ - relay: true, - discv5Discovery: true, - peerExchange: true - }); - - const enr = (await nwaku1.info()).enrUri; - - await nwaku2.start({ - relay: true, - discv5Discovery: true, - peerExchange: true, - discv5BootstrapNode: enr - }); - - waku = await createLightNode(); - await waku.start(); - - const nwaku2Ma = await nwaku2.getMultiaddrWithId(); - - // we do this because we want peer-exchange discovery to get initialised before we dial the peer which contains info about the other peer - setTimeout(() => { - void waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec); - }, 1000); - - return new PeerExchangeDiscovery(waku.libp2p.components, [ - DefaultPubsubTopic - ]); - }, - teardown: async () => { - this.timeout(15000); - await tearDownNodes([nwaku1, nwaku2], waku); - } - }); - }); -});