diff --git a/packages/core/src/lib/stream_manager/stream_manager.ts b/packages/core/src/lib/stream_manager/stream_manager.ts index 2c21656fff..20ac373ac8 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.ts @@ -24,27 +24,32 @@ export class StreamManager { } public async getStream(peerId: PeerId): Promise { - const peerIdStr = peerId.toString(); - const scheduledStream = this.streamPool.get(peerIdStr); + try { + const peerIdStr = peerId.toString(); + const scheduledStream = this.streamPool.get(peerIdStr); - if (scheduledStream) { - this.streamPool.delete(peerIdStr); - await scheduledStream; - } + if (scheduledStream) { + this.streamPool.delete(peerIdStr); + await scheduledStream; + } - const stream = - this.getOpenStreamForCodec(peerId) || (await this.createStream(peerId)); + const stream = + this.getOpenStreamForCodec(peerId) || (await this.createStream(peerId)); - if (!stream) { + if (!stream) { + return; + } + + this.log.info( + `Using stream for peerId=${peerIdStr} multicodec=${this.multicodec}` + ); + + this.lockStream(peerIdStr, stream); + return stream; + } catch (error) { + this.log.error(`Failed to getStream:`, error); return; } - - this.log.info( - `Using stream for peerId=${peerIdStr} multicodec=${this.multicodec}` - ); - - this.lockStream(peerIdStr, stream); - return stream; } private async createStream( diff --git a/packages/discovery/src/dns/dns.spec.ts b/packages/discovery/src/dns/dns.spec.ts index 2925948f63..c2c76f2564 100644 --- a/packages/discovery/src/dns/dns.spec.ts +++ b/packages/discovery/src/dns/dns.spec.ts @@ -53,7 +53,6 @@ class MockDNS implements DnsClient { public resolveTXT(fqdn: string): Promise { if (this.fqdnThrows.includes(fqdn)) { this.hasThrown = true; - console.log("throwing"); throw "Mock DNS throws."; } diff --git a/packages/discovery/src/index.ts b/packages/discovery/src/index.ts index eb9ba8ea2c..ab53dc80be 100644 --- a/packages/discovery/src/index.ts +++ b/packages/discovery/src/index.ts @@ -1,20 +1,13 @@ -// DNS Discovery export { PeerDiscoveryDns, wakuDnsDiscovery } from "./dns/dns_discovery.js"; export { enrTree } from "./dns/constants.js"; export { DnsNodeDiscovery } from "./dns/dns.js"; -// Peer Exchange Discovery -export { - wakuPeerExchange, - PeerExchangeCodec, - WakuPeerExchange -} from "./peer-exchange/waku_peer_exchange.js"; export { wakuPeerExchangeDiscovery, - PeerExchangeDiscovery -} from "./peer-exchange/waku_peer_exchange_discovery.js"; + PeerExchangeDiscovery, + PeerExchangeCodec +} from "./peer-exchange/index.js"; -// Local Peer Cache Discovery export { LocalPeerCacheDiscovery, wakuLocalPeerCacheDiscovery diff --git a/packages/discovery/src/peer-exchange/constants.ts b/packages/discovery/src/peer-exchange/constants.ts new file mode 100644 index 0000000000..520cc1b758 --- /dev/null +++ b/packages/discovery/src/peer-exchange/constants.ts @@ -0,0 +1,10 @@ +import { Tags } from "@waku/interfaces"; + +// amount of peers available per specification +export const DEFAULT_PEER_EXCHANGE_REQUEST_NODES = 60; + +export const DEFAULT_PEER_EXCHANGE_TAG_NAME = Tags.PEER_EXCHANGE; +export const DEFAULT_PEER_EXCHANGE_TAG_VALUE = 50; +export const DEFAULT_PEER_EXCHANGE_TAG_TTL = 30_000; + +export const PeerExchangeCodec = "/vac/waku/peer-exchange/2.0.0-alpha1"; diff --git a/packages/discovery/src/peer-exchange/index.ts b/packages/discovery/src/peer-exchange/index.ts index c89b609a2c..2ec7948bc9 100644 --- a/packages/discovery/src/peer-exchange/index.ts +++ b/packages/discovery/src/peer-exchange/index.ts @@ -1,10 +1,5 @@ -export { - wakuPeerExchange, - PeerExchangeCodec, - WakuPeerExchange -} from "./waku_peer_exchange.js"; export { wakuPeerExchangeDiscovery, - PeerExchangeDiscovery, - Options -} from "./waku_peer_exchange_discovery.js"; + PeerExchangeDiscovery +} from "./peer_exchange_discovery.js"; +export { PeerExchangeCodec } from "./constants.js"; diff --git a/packages/discovery/src/peer-exchange/peer_exchange.spec.ts b/packages/discovery/src/peer-exchange/peer_exchange.spec.ts new file mode 100644 index 0000000000..722d6287eb --- /dev/null +++ b/packages/discovery/src/peer-exchange/peer_exchange.spec.ts @@ -0,0 +1,321 @@ +import { EnrDecoder } from "@waku/enr"; +import { ProtocolError } from "@waku/interfaces"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { PeerExchange } from "./peer_exchange.js"; +import { PeerExchangeRPC } from "./rpc.js"; + +describe("PeerExchange", () => { + let peerExchange: PeerExchange; + let mockComponents: any; + let mockStreamManager: any; + let mockPeerStore: any; + let mockStream: any; + let mockPeerId: any; + + beforeEach(() => { + mockPeerId = { + toString: () => "test-peer-id", + equals: (other: any) => other && other.toString() === "test-peer-id" + }; + + mockStream = { + sink: sinon.stub(), + source: (async function* () { + const data = new Uint8Array([0, 0, 0, 4, 1, 2, 3, 4]); + yield data; + })() + }; + + mockStreamManager = { + getStream: sinon.stub().resolves(mockStream) + }; + + mockPeerStore = { + has: sinon.stub().resolves(true) + }; + + mockComponents = { + peerStore: mockPeerStore, + events: { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub() + } + }; + + peerExchange = new PeerExchange(mockComponents as any); + + (peerExchange as any).streamManager = mockStreamManager; + }); + + afterEach(() => { + sinon.restore(); + }); + + describe("constructor", () => { + it("should initialize with libp2p components", () => { + const components = { + peerStore: {}, + events: { + addEventListener: sinon.stub(), + removeEventListener: sinon.stub() + } + } as any; + const instance = new PeerExchange(components); + expect(instance).to.be.instanceOf(PeerExchange); + }); + }); + + describe("query", () => { + let queryParams: any; + + beforeEach(() => { + queryParams = { + numPeers: 5, + peerId: mockPeerId + }; + }); + + it("should successfully query peers and return peer infos", async () => { + const mockResponse = { + peerInfos: [ + { enr: new Uint8Array([1, 2, 3]) }, + { enr: new Uint8Array([4, 5, 6]) } + ] + }; + + const mockRpcResponse = { + response: mockResponse + }; + + const mockRpcQuery = { + encode: sinon.stub().returns(new Uint8Array([1, 2, 3])) + }; + sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any); + sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any); + + const mockEnr = { toString: () => "mock-enr" }; + sinon.stub(EnrDecoder, "fromRLP").resolves(mockEnr as any); + + const result = await peerExchange.query(queryParams); + + expect(result.error).to.be.null; + expect(result.peerInfos).to.have.length(2); + expect(result.peerInfos![0]).to.have.property("ENR"); + expect(result.peerInfos![1]).to.have.property("ENR"); + }); + + it("should handle empty peer infos gracefully", async () => { + const mockResponse = { + peerInfos: [] + }; + + const mockRpcResponse = { + response: mockResponse + }; + + const mockRpcQuery = { + encode: sinon.stub().returns(new Uint8Array([1, 2, 3])) + }; + sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any); + sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any); + + const result = await peerExchange.query(queryParams); + + expect(result.error).to.be.null; + expect(result.peerInfos).to.have.length(0); + }); + + it("should filter out undefined ENRs", async () => { + const mockResponse = { + peerInfos: [ + { enr: new Uint8Array([1, 2, 3]) }, + { enr: undefined }, + { enr: new Uint8Array([4, 5, 6]) } + ] + }; + + const mockRpcResponse = { + response: mockResponse + }; + + const mockRpcQuery = { + encode: sinon.stub().returns(new Uint8Array([1, 2, 3])) + }; + sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any); + sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any); + + const mockEnr = { toString: () => "mock-enr" }; + sinon.stub(EnrDecoder, "fromRLP").resolves(mockEnr as any); + + const result = await peerExchange.query(queryParams); + + expect(result.error).to.be.null; + expect(result.peerInfos).to.have.length(2); + }); + + it("should return NO_PEER_AVAILABLE when peer is not in peer store", async () => { + mockPeerStore.has.resolves(false); + + const result = await peerExchange.query(queryParams); + + expect(result.error).to.equal(ProtocolError.NO_PEER_AVAILABLE); + expect(result.peerInfos).to.be.null; + }); + + it("should return NO_STREAM_AVAILABLE when stream creation fails", async () => { + mockStreamManager.getStream.returns(undefined); + + const result = await peerExchange.query(queryParams); + + expect(result.error).to.equal(ProtocolError.NO_STREAM_AVAILABLE); + expect(result.peerInfos).to.be.null; + }); + + it("should return EMPTY_PAYLOAD when response field is missing", async () => { + const mockRpcResponse = { + response: undefined + }; + + const mockRpcQuery = { + encode: sinon.stub().returns(new Uint8Array([1, 2, 3])) + }; + sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any); + sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any); + + const result = await peerExchange.query(queryParams); + + expect(result.error).to.equal(ProtocolError.EMPTY_PAYLOAD); + expect(result.peerInfos).to.be.null; + }); + + it("should return DECODE_FAILED when RPC decode fails", async () => { + const mockRpcQuery = { + encode: sinon.stub().returns(new Uint8Array([1, 2, 3])) + }; + sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any); + sinon.stub(PeerExchangeRPC, "decode").throws(new Error("Decode failed")); + + const result = await peerExchange.query(queryParams); + + expect(result.error).to.equal(ProtocolError.DECODE_FAILED); + expect(result.peerInfos).to.be.null; + }); + + it("should return DECODE_FAILED when ENR decoding fails", async () => { + const mockResponse = { + peerInfos: [{ enr: new Uint8Array([1, 2, 3]) }] + }; + + const mockRpcResponse = { + response: mockResponse + }; + + const mockRpcQuery = { + encode: sinon.stub().returns(new Uint8Array([1, 2, 3])) + }; + sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any); + sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any); + sinon.stub(EnrDecoder, "fromRLP").rejects(new Error("ENR decode failed")); + + const result = await peerExchange.query(queryParams); + + expect(result.error).to.equal(ProtocolError.DECODE_FAILED); + expect(result.peerInfos).to.be.null; + }); + + it("should handle malformed response data", async () => { + const mockRpcQuery = { + encode: sinon.stub().returns(new Uint8Array([1, 2, 3])) + }; + sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any); + + sinon.stub(PeerExchangeRPC, "decode").throws(new Error("Malformed data")); + + const result = await peerExchange.query(queryParams); + + expect(result.error).to.equal(ProtocolError.DECODE_FAILED); + expect(result.peerInfos).to.be.null; + }); + + it("should handle large number of peers request", async () => { + const largeQueryParams = { + numPeers: 1000, + peerId: mockPeerId + }; + + const mockResponse = { + peerInfos: Array(1000).fill({ enr: new Uint8Array([1, 2, 3]) }) + }; + + const mockRpcResponse = { + response: mockResponse + }; + + const mockRpcQuery = { + encode: sinon.stub().returns(new Uint8Array([1, 2, 3])) + }; + sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any); + sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any); + + const mockEnr = { toString: () => "mock-enr" }; + sinon.stub(EnrDecoder, "fromRLP").resolves(mockEnr as any); + + const result = await peerExchange.query(largeQueryParams); + + expect(result.error).to.be.null; + expect(result.peerInfos).to.have.length(1000); + }); + + it("should handle zero peers request", async () => { + const zeroQueryParams = { + numPeers: 0, + peerId: mockPeerId + }; + + const mockResponse = { + peerInfos: [] + }; + + const mockRpcResponse = { + response: mockResponse + }; + + const mockRpcQuery = { + encode: sinon.stub().returns(new Uint8Array([1, 2, 3])) + }; + sinon.stub(PeerExchangeRPC, "createRequest").returns(mockRpcQuery as any); + sinon.stub(PeerExchangeRPC, "decode").returns(mockRpcResponse as any); + + const result = await peerExchange.query(zeroQueryParams); + + expect(result.error).to.be.null; + expect(result.peerInfos).to.have.length(0); + }); + + it("should create RPC request with correct parameters", async () => { + const mockRpcQuery = { + encode: sinon.stub().returns(new Uint8Array([1, 2, 3])) + }; + const createRequestStub = sinon + .stub(PeerExchangeRPC, "createRequest") + .returns(mockRpcQuery as any); + sinon + .stub(PeerExchangeRPC, "decode") + .returns({ response: { peerInfos: [] } } as any); + + await peerExchange.query(queryParams); + + expect(createRequestStub.calledOnce).to.be.true; + expect(createRequestStub.firstCall.args[0]).to.deep.equal({ + numPeers: BigInt(queryParams.numPeers) + }); + }); + + it("should create PeerExchange instance with components", () => { + const instance = new PeerExchange(mockComponents as any); + expect(instance).to.be.instanceOf(PeerExchange); + }); + }); +}); diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts b/packages/discovery/src/peer-exchange/peer_exchange.ts similarity index 82% rename from packages/discovery/src/peer-exchange/waku_peer_exchange.ts rename to packages/discovery/src/peer-exchange/peer_exchange.ts index 290f64722e..0e0e90cc2a 100644 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange.ts +++ b/packages/discovery/src/peer-exchange/peer_exchange.ts @@ -14,16 +14,15 @@ import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { Uint8ArrayList } from "uint8arraylist"; +import { PeerExchangeCodec } from "./constants.js"; import { PeerExchangeRPC } from "./rpc.js"; -export const PeerExchangeCodec = "/vac/waku/peer-exchange/2.0.0-alpha1"; - const log = new Logger("peer-exchange"); /** * Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/) */ -export class WakuPeerExchange implements IPeerExchange { +export class PeerExchange implements IPeerExchange { private readonly streamManager: StreamManager; /** @@ -45,8 +44,8 @@ export class WakuPeerExchange implements IPeerExchange { numPeers: BigInt(numPeers) }); - const peer = await this.components.peerStore.get(peerId); - if (!peer) { + const hasPeer = await this.components.peerStore.has(peerId); + if (!hasPeer) { return { peerInfos: null, error: ProtocolError.NO_PEER_AVAILABLE @@ -56,7 +55,9 @@ export class WakuPeerExchange implements IPeerExchange { const stream = await this.streamManager.getStream(peerId); if (!stream) { - log.error(`Failed to get a stream for remote peer:${peerId.toString()}`); + log.error( + `Failed to get a stream for remote peer:${peerId?.toString?.()}` + ); return { peerInfos: null, error: ProtocolError.NO_STREAM_AVAILABLE @@ -110,13 +111,3 @@ export class WakuPeerExchange implements IPeerExchange { } } } - -/** - * - * @returns A function that creates a new peer exchange protocol - */ -export function wakuPeerExchange(): ( - components: Libp2pComponents -) => WakuPeerExchange { - return (components: Libp2pComponents) => new WakuPeerExchange(components); -} diff --git a/packages/discovery/src/peer-exchange/peer_exchange_discovery.spec.ts b/packages/discovery/src/peer-exchange/peer_exchange_discovery.spec.ts new file mode 100644 index 0000000000..cc881d561a --- /dev/null +++ b/packages/discovery/src/peer-exchange/peer_exchange_discovery.spec.ts @@ -0,0 +1,386 @@ +import { TypedEventEmitter } from "@libp2p/interface"; +import { peerDiscoverySymbol as symbol } from "@libp2p/interface"; +import type { + IdentifyResult, + PeerDiscoveryEvents, + PeerId +} from "@libp2p/interface"; +import { + type IPeerExchange, + type Libp2pComponents, + ProtocolError +} from "@waku/interfaces"; +import { expect } from "chai"; +import sinon from "sinon"; + +import { PeerExchangeCodec } from "./constants.js"; +import { + PeerExchangeDiscovery, + wakuPeerExchangeDiscovery +} from "./peer_exchange_discovery.js"; + +describe("PeerExchangeDiscovery", () => { + let peerExchangeDiscovery: PeerExchangeDiscovery; + let mockComponents: Libp2pComponents; + let mockEvents: TypedEventEmitter; + let mockConnectionManager: any; + let mockPeerStore: any; + let mockPeerId: PeerId; + + let clock: sinon.SinonFakeTimers; + + beforeEach(() => { + clock = sinon.useFakeTimers(); + + mockPeerId = { + toString: sinon.stub().returns("peer-id-1"), + toBytes: sinon.stub().returns(new Uint8Array([1, 2, 3])) + } as unknown as PeerId; + + mockEvents = new TypedEventEmitter(); + mockConnectionManager = { + getConnections: sinon.stub().returns([{ remotePeer: mockPeerId }]) + }; + mockPeerStore = { + get: sinon.stub().resolves({ + id: mockPeerId, + protocols: [PeerExchangeCodec] + }), + merge: sinon.stub().resolves(undefined), + has: sinon.stub().resolves(true) + }; + + mockComponents = { + events: mockEvents, + connectionManager: mockConnectionManager, + peerStore: mockPeerStore + } as unknown as Libp2pComponents; + + peerExchangeDiscovery = new PeerExchangeDiscovery(mockComponents, {}); + }); + + afterEach(() => { + clock.restore(); + sinon.restore(); + }); + + describe("constructor", () => { + it("should initialize with default options", () => { + const discovery = new PeerExchangeDiscovery(mockComponents); + expect(discovery).to.be.instanceOf(PeerExchangeDiscovery); + expect(discovery[symbol]).to.be.true; + expect(discovery[Symbol.toStringTag]).to.equal("@waku/peer-exchange"); + }); + + it("should initialize with custom TTL", () => { + const customTTL = 60000; + const discovery = new PeerExchangeDiscovery(mockComponents, { + TTL: customTTL + }); + expect(discovery).to.be.instanceOf(PeerExchangeDiscovery); + }); + }); + + describe("start", () => { + it("should start peer exchange discovery", () => { + const addEventListenerSpy = sinon.spy(mockEvents, "addEventListener"); + + peerExchangeDiscovery.start(); + + expect(addEventListenerSpy.called).to.be.true; + }); + + it("should not start if already started", () => { + const addEventListenerSpy = sinon.spy(mockEvents, "addEventListener"); + + peerExchangeDiscovery.start(); + peerExchangeDiscovery.start(); + + expect(addEventListenerSpy.calledOnce).to.be.true; + }); + }); + + describe("stop", () => { + it("should stop peer exchange discovery", () => { + const removeEventListenerSpy = sinon.spy( + mockEvents, + "removeEventListener" + ); + + peerExchangeDiscovery.start(); + peerExchangeDiscovery.stop(); + + expect(removeEventListenerSpy.called).to.be.true; + }); + + it("should not stop if not started", () => { + const removeEventListenerSpy = sinon.spy( + mockEvents, + "removeEventListener" + ); + + peerExchangeDiscovery.stop(); + + expect(removeEventListenerSpy.called).to.be.false; + }); + }); + + describe("handleDiscoveredPeer", () => { + beforeEach(() => { + peerExchangeDiscovery.start(); + }); + + it("should handle peer identify event", async () => { + const runQuerySpy = sinon.spy(peerExchangeDiscovery as any, "runQuery"); + const mockIdentifyResult: IdentifyResult = { + peerId: mockPeerId, + protocols: [PeerExchangeCodec], + listenAddrs: [], + connection: {} as any + }; + + const event = new CustomEvent("peer:identify", { + detail: mockIdentifyResult + }); + + await peerExchangeDiscovery["handleDiscoveredPeer"](event); + + expect(runQuerySpy.called).to.be.true; + }); + + it("should skip peers without peer exchange protocol", async () => { + const mockIdentifyResult: IdentifyResult = { + peerId: mockPeerId, + protocols: ["other-protocol"], + listenAddrs: [], + connection: {} as any + }; + + const event = new CustomEvent("peer:identify", { + detail: mockIdentifyResult + }); + + await peerExchangeDiscovery["handleDiscoveredPeer"](event); + + expect(mockPeerStore.get.called).to.be.false; + }); + }); + + describe("handlePeriodicDiscovery", () => { + beforeEach(() => { + peerExchangeDiscovery.start(); + }); + + it("should query peers that support peer exchange", async () => { + await peerExchangeDiscovery["handlePeriodicDiscovery"](); + + expect(mockConnectionManager.getConnections.called).to.be.true; + expect(mockPeerStore.get.called).to.be.true; + }); + + it("should skip peers that don't support peer exchange", async () => { + mockPeerStore.get.resolves({ + id: mockPeerId, + protocols: ["other-protocol"] + }); + + await peerExchangeDiscovery["handlePeriodicDiscovery"](); + + expect(mockConnectionManager.getConnections.called).to.be.true; + expect(mockPeerStore.get.called).to.be.true; + }); + + it("should handle peer store errors gracefully", async () => { + mockPeerStore.get.rejects(new Error("Peer store error")); + + await peerExchangeDiscovery["handlePeriodicDiscovery"](); + + expect(mockConnectionManager.getConnections.called).to.be.true; + }); + + it("should skip peers that were recently queried", async () => { + const peerIdStr = mockPeerId.toString(); + peerExchangeDiscovery["peerExpirationRecords"].set( + peerIdStr, + Date.now() + 10000 + ); + + await peerExchangeDiscovery["handlePeriodicDiscovery"](); + + expect(mockPeerStore.get.called).to.be.false; + }); + }); + + describe("runQuery", () => { + beforeEach(() => { + peerExchangeDiscovery.start(); + }); + + it("should query peer with peer exchange protocol", async () => { + const querySpy = sinon.spy(peerExchangeDiscovery as any, "query"); + await peerExchangeDiscovery["runQuery"](mockPeerId, [PeerExchangeCodec]); + + expect(querySpy.called).to.be.true; + }); + + it("should skip peers without peer exchange protocol", async () => { + const querySpy = sinon.spy(peerExchangeDiscovery as any, "query"); + await peerExchangeDiscovery["runQuery"](mockPeerId, ["other-protocol"]); + + expect(querySpy.called).to.be.false; + }); + + it("should skip already querying peers", async () => { + peerExchangeDiscovery["queryingPeers"].add(mockPeerId.toString()); + const querySpy = sinon.spy(peerExchangeDiscovery as any, "query"); + + await peerExchangeDiscovery["runQuery"](mockPeerId, [PeerExchangeCodec]); + + expect(querySpy.called).to.be.false; + }); + + it("should handle query errors gracefully", async () => { + const queryStub = sinon + .stub(peerExchangeDiscovery as any, "query") + .rejects(new Error("Query failed")); + + await peerExchangeDiscovery["runQuery"](mockPeerId, [PeerExchangeCodec]); + + expect(queryStub.called).to.be.true; + expect(peerExchangeDiscovery["queryingPeers"].has(mockPeerId.toString())) + .to.be.false; + }); + }); + + describe("query", () => { + beforeEach(() => { + peerExchangeDiscovery.start(); + }); + + it("should process successful peer exchange query", async () => { + const mockENR = { + peerInfo: { + id: mockPeerId, + multiaddrs: [] + }, + shardInfo: { clusterId: 1, shards: [1] } + }; + + const internalPeerExchange = (peerExchangeDiscovery as any)[ + "peerExchange" + ] as IPeerExchange; + sinon.stub(internalPeerExchange, "query").resolves({ + peerInfos: [{ ENR: mockENR as any }], + error: null + }); + + const dispatchEventSpy = sinon.spy( + peerExchangeDiscovery, + "dispatchEvent" + ); + + await peerExchangeDiscovery["query"](mockPeerId); + + expect(mockPeerStore.merge.called).to.be.true; + expect(dispatchEventSpy.called).to.be.true; + }); + + it("should handle query errors", async () => { + const internalPeerExchange = (peerExchangeDiscovery as any)[ + "peerExchange" + ] as IPeerExchange; + sinon.stub(internalPeerExchange, "query").resolves({ + peerInfos: null, + error: ProtocolError.NO_PEER_AVAILABLE + }); + + await peerExchangeDiscovery["query"](mockPeerId); + + expect(mockPeerStore.merge.called).to.be.false; + }); + + it("should skip peers without ENR", async () => { + const internalPeerExchange = (peerExchangeDiscovery as any)[ + "peerExchange" + ] as IPeerExchange; + sinon.stub(internalPeerExchange, "query").resolves({ + peerInfos: [{ ENR: undefined }], + error: null + }); + + await peerExchangeDiscovery["query"](mockPeerId); + + expect(mockPeerStore.merge.called).to.be.false; + }); + + it("should skip peers without peerInfo in ENR", async () => { + const internalPeerExchange = (peerExchangeDiscovery as any)[ + "peerExchange" + ] as IPeerExchange; + sinon.stub(internalPeerExchange, "query").resolves({ + peerInfos: [{ ENR: { peerInfo: undefined } as any }], + error: null + }); + + await peerExchangeDiscovery["query"](mockPeerId); + + expect(mockPeerStore.merge.called).to.be.false; + }); + + it("should handle ENR without shardInfo", async () => { + const mockENRWithoutShard = { + peerInfo: { + id: mockPeerId, + multiaddrs: [] + } + }; + + const internalPeerExchange = (peerExchangeDiscovery as any)[ + "peerExchange" + ] as IPeerExchange; + sinon.stub(internalPeerExchange, "query").resolves({ + peerInfos: [{ ENR: mockENRWithoutShard as any }], + error: null + }); + + await peerExchangeDiscovery["query"](mockPeerId); + + expect(mockPeerStore.merge.called).to.be.true; + }); + }); + + describe("continuous discovery interval", () => { + it("should start periodic discovery on start", () => { + const setIntervalSpy = sinon.spy(global, "setInterval"); + + peerExchangeDiscovery.start(); + + expect(setIntervalSpy.called).to.be.true; + }); + + it("should clear interval on stop", () => { + const clearIntervalSpy = sinon.spy(global, "clearInterval"); + + peerExchangeDiscovery.start(); + peerExchangeDiscovery.stop(); + + expect(clearIntervalSpy.called).to.be.true; + }); + }); + + describe("wakuPeerExchangeDiscovery factory", () => { + it("should create PeerExchangeDiscovery instance", () => { + const factory = wakuPeerExchangeDiscovery({ TTL: 60000 }); + const discovery = factory(mockComponents); + + expect(discovery).to.be.instanceOf(PeerExchangeDiscovery); + }); + + it("should create PeerExchangeDiscovery with default options", () => { + const factory = wakuPeerExchangeDiscovery(); + const discovery = factory(mockComponents); + + expect(discovery).to.be.instanceOf(PeerExchangeDiscovery); + }); + }); +}); diff --git a/packages/discovery/src/peer-exchange/peer_exchange_discovery.ts b/packages/discovery/src/peer-exchange/peer_exchange_discovery.ts new file mode 100644 index 0000000000..b276d998d6 --- /dev/null +++ b/packages/discovery/src/peer-exchange/peer_exchange_discovery.ts @@ -0,0 +1,246 @@ +import { TypedEventEmitter } from "@libp2p/interface"; +import { peerDiscoverySymbol as symbol } from "@libp2p/interface"; +import type { + IdentifyResult, + PeerDiscovery, + PeerDiscoveryEvents, + PeerId, + PeerInfo +} from "@libp2p/interface"; +import { + type IPeerExchange, + type Libp2pComponents, + type Libp2pEventHandler +} from "@waku/interfaces"; +import { encodeRelayShard, Logger } from "@waku/utils"; + +import { + DEFAULT_PEER_EXCHANGE_REQUEST_NODES, + DEFAULT_PEER_EXCHANGE_TAG_NAME, + DEFAULT_PEER_EXCHANGE_TAG_TTL, + DEFAULT_PEER_EXCHANGE_TAG_VALUE, + PeerExchangeCodec +} from "./constants.js"; +import { PeerExchange } from "./peer_exchange.js"; + +const log = new Logger("peer-exchange-discovery"); + +interface PeerExchangeDiscoveryOptions { + /** + * Peer TTL in milliseconds. + * This is the time after which a peer will be considered stale and will be re-queried via peer exchange. + * + * @default 30_000 + */ + TTL: number; +} + +export class PeerExchangeDiscovery + extends TypedEventEmitter + implements PeerDiscovery +{ + private readonly components: Libp2pComponents; + private readonly peerExchange: IPeerExchange; + private readonly options: PeerExchangeDiscoveryOptions; + + private isStarted: boolean = false; + private queryingPeers: Set = new Set(); + + private peerExpirationRecords: Map = new Map(); + private continuousDiscoveryInterval: NodeJS.Timeout | null = null; + + public constructor( + components: Libp2pComponents, + options: Partial = {} + ) { + super(); + + this.components = components; + this.peerExchange = new PeerExchange(components); + this.options = { + ...options, + TTL: options.TTL ?? DEFAULT_PEER_EXCHANGE_TAG_TTL + }; + + this.handleDiscoveredPeer = this.handleDiscoveredPeer.bind(this); + } + + /** + * Start Peer Exchange. + * Subscribe to "peer:identify" events and handle them. + */ + public start(): void { + if (this.isStarted) { + return; + } + + log.info("Starting peer exchange node discovery, discovering peers"); + this.isStarted = true; + + this.components.events.addEventListener( + "peer:identify", + this.handleDiscoveredPeer as Libp2pEventHandler + ); + + this.continuousDiscoveryInterval = setInterval(() => { + void this.handlePeriodicDiscovery(); + }, this.options.TTL); + } + + /** + * Stop Peer Exchange. + * Unsubscribe from "peer:identify" events. + */ + public stop(): void { + if (!this.isStarted) { + return; + } + + log.info("Stopping peer exchange node discovery"); + + this.isStarted = false; + this.queryingPeers.clear(); + this.peerExpirationRecords.clear(); + + if (this.continuousDiscoveryInterval) { + clearInterval(this.continuousDiscoveryInterval); + } + + this.components.events.removeEventListener( + "peer:identify", + this.handleDiscoveredPeer as Libp2pEventHandler + ); + } + + public get [symbol](): true { + return true; + } + + public get [Symbol.toStringTag](): string { + return "@waku/peer-exchange"; + } + + private async handleDiscoveredPeer( + event: CustomEvent + ): Promise { + void this.runQuery(event.detail.peerId, event.detail.protocols); + } + + private async handlePeriodicDiscovery(): Promise { + const connections = this.components.connectionManager.getConnections(); + + await Promise.all( + connections.map(async (connection) => { + try { + const peerIdStr = connection.remotePeer.toString(); + const shouldQuery = this.peerExpirationRecords.has(peerIdStr) + ? this.peerExpirationRecords.get(peerIdStr)! <= Date.now() + : true; + + if (!shouldQuery) { + return null; + } + + const peer = await this.components.peerStore.get( + connection.remotePeer + ); + + return this.runQuery(connection.remotePeer, peer.protocols); + } catch (error) { + log.warn("Error getting peer info", error); + return null; + } + }) + ); + } + + private async runQuery(peerId: PeerId, protocols: string[]): Promise { + if ( + !protocols.includes(PeerExchangeCodec) || + this.queryingPeers.has(peerId.toString()) + ) { + log.info( + `Skipping peer ${peerId} as it is already querying or does not support peer exchange` + ); + return; + } + + try { + this.queryingPeers.add(peerId.toString()); + await this.query(peerId); + } catch (error) { + log.error("Error querying peer", error); + } + + this.peerExpirationRecords.set( + peerId.toString(), + Date.now() + this.options.TTL + ); + + this.queryingPeers.delete(peerId.toString()); + } + + private async query(peerId: PeerId): Promise { + const peerIdStr = peerId.toString(); + log.info(`Querying peer exchange for ${peerIdStr}`); + + const { error, peerInfos } = await this.peerExchange.query({ + numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES, + peerId + }); + + if (error) { + log.error(`Peer exchange query to ${peerIdStr} failed`, error); + return; + } + + for (const { ENR } of peerInfos) { + if (!ENR) { + log.warn(`No ENR in peerInfo object from ${peerIdStr}, skipping`); + continue; + } + + const { peerInfo, shardInfo } = ENR; + + if (!peerInfo) { + log.warn(`No peerInfo in ENR from ${peerIdStr}, skipping`); + continue; + } + + // merge is smart enough to overwrite only changed parts + await this.components.peerStore.merge(peerInfo.id, { + tags: { + [DEFAULT_PEER_EXCHANGE_TAG_NAME]: { + value: DEFAULT_PEER_EXCHANGE_TAG_VALUE + } + }, + ...(shardInfo && { + metadata: { + shardInfo: encodeRelayShard(shardInfo) + } + }), + ...(peerInfo.multiaddrs && { + multiaddrs: peerInfo.multiaddrs + }) + }); + + log.info(`Discovered peer: ${peerInfo.id.toString()}`); + + this.dispatchEvent( + new CustomEvent("peer", { + detail: { + id: peerInfo.id, + multiaddrs: peerInfo.multiaddrs + } + }) + ); + } + } +} + +export function wakuPeerExchangeDiscovery( + options: Partial = {} +): (components: Libp2pComponents) => PeerExchangeDiscovery { + return (components: Libp2pComponents) => + new PeerExchangeDiscovery(components, options); +} diff --git a/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts b/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts deleted file mode 100644 index 9087f12c15..0000000000 --- a/packages/discovery/src/peer-exchange/waku_peer_exchange_discovery.ts +++ /dev/null @@ -1,317 +0,0 @@ -import { TypedEventEmitter } from "@libp2p/interface"; -import { peerDiscoverySymbol as symbol } from "@libp2p/interface"; -import type { - IdentifyResult, - PeerDiscovery, - PeerDiscoveryEvents, - PeerId, - PeerInfo -} from "@libp2p/interface"; -import { - type Libp2pComponents, - type PeerExchangeQueryResult, - ShardInfo, - Tags -} from "@waku/interfaces"; -import { decodeRelayShard, encodeRelayShard, Logger } from "@waku/utils"; - -import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js"; - -const log = new Logger("peer-exchange-discovery"); - -const DEFAULT_PEER_EXCHANGE_REQUEST_NODES = 10; -const DEFAULT_PEER_EXCHANGE_QUERY_INTERVAL_MS = 10 * 1000; -const DEFAULT_MAX_RETRIES = 3; - -export interface Options { - /** - * Tag a bootstrap peer with this name before "discovering" it (default: 'bootstrap') - */ - tagName?: string; - - /** - * The bootstrap peer tag will have this value (default: 50) - */ - tagValue?: number; - - /** - * Cause the bootstrap peer tag to be removed after this number of ms (default: 2 minutes) - */ - tagTTL?: number; - /** - * The interval between queries to a peer (default: 10 seconds) - * The interval will increase by a factor of an incrementing number (starting at 1) - * until it reaches the maximum attempts before backoff - */ - queryInterval?: number; - /** - * The number of attempts before the queries to a peer are aborted (default: 3) - */ - maxRetries?: number; -} - -interface CustomDiscoveryEvent extends PeerDiscoveryEvents { - "waku:peer-exchange:started": CustomEvent; -} - -const DEFAULT_PEER_EXCHANGE_TAG_NAME = Tags.PEER_EXCHANGE; -const DEFAULT_PEER_EXCHANGE_TAG_VALUE = 50; -const DEFAULT_PEER_EXCHANGE_TAG_TTL = 100_000_000; - -export class PeerExchangeDiscovery - extends TypedEventEmitter - implements PeerDiscovery -{ - private readonly components: Libp2pComponents; - private readonly peerExchange: WakuPeerExchange; - private readonly options: Options; - private isStarted: boolean; - private queryingPeers: Set = new Set(); - private queryAttempts: Map = new Map(); - - private readonly handleDiscoveredPeer = ( - event: CustomEvent - ): void => { - const { protocols, peerId } = event.detail; - - if ( - !protocols.includes(PeerExchangeCodec) || - this.queryingPeers.has(peerId.toString()) - ) - return; - - this.queryingPeers.add(peerId.toString()); - this.startRecurringQueries(peerId).catch((error) => - log.error(`Error querying peer ${error}`) - ); - }; - - public constructor(components: Libp2pComponents, options: Options = {}) { - super(); - this.components = components; - this.peerExchange = new WakuPeerExchange(components); - this.options = options; - this.isStarted = false; - } - - /** - * Start emitting events - */ - public start(): void { - if (this.isStarted) { - return; - } - - this.dispatchEvent( - new CustomEvent("waku:peer-exchange:started", { detail: true }) - ); - - log.info("Starting peer exchange node discovery, discovering peers"); - - // might be better to use "peer:identify" or "peer:update" - this.components.events.addEventListener( - "peer:identify", - this.handleDiscoveredPeer - ); - } - - /** - * Remove event listener - */ - public stop(): void { - if (!this.isStarted) return; - log.info("Stopping peer exchange node discovery"); - this.isStarted = false; - this.queryingPeers.clear(); - this.components.events.removeEventListener( - "peer:identify", - this.handleDiscoveredPeer - ); - } - - public get [symbol](): true { - return true; - } - - public get [Symbol.toStringTag](): string { - return "@waku/peer-exchange"; - } - - private readonly startRecurringQueries = async ( - peerId: PeerId - ): Promise => { - const peerIdStr = peerId.toString(); - const { - queryInterval = DEFAULT_PEER_EXCHANGE_QUERY_INTERVAL_MS, - maxRetries = DEFAULT_MAX_RETRIES - } = this.options; - - log.info( - `Querying peer: ${peerIdStr} (attempt ${ - this.queryAttempts.get(peerIdStr) ?? 1 - })` - ); - - await this.query(peerId); - - const currentAttempt = this.queryAttempts.get(peerIdStr) ?? 1; - - if (currentAttempt > maxRetries) { - this.abortQueriesForPeer(peerIdStr); - return; - } - - setTimeout(() => { - this.queryAttempts.set(peerIdStr, currentAttempt + 1); - this.startRecurringQueries(peerId).catch((error) => { - log.error(`Error in startRecurringQueries: ${error}`); - }); - }, queryInterval * currentAttempt); - }; - - private async query(peerId: PeerId): Promise { - const { error, peerInfos } = await this.peerExchange.query({ - numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES, - peerId - }); - - if (error) { - log.error("Peer exchange query failed", error); - return { error, peerInfos: null }; - } - - for (const _peerInfo of peerInfos) { - const { ENR } = _peerInfo; - if (!ENR) { - log.warn("No ENR in peerInfo object, skipping"); - continue; - } - - const { peerId, peerInfo, shardInfo } = ENR; - if (!peerId || !peerInfo) { - continue; - } - - const hasPeer = await this.components.peerStore.has(peerId); - if (hasPeer) { - const { hasMultiaddrDiff, hasShardDiff } = await this.checkPeerInfoDiff( - peerInfo, - shardInfo - ); - - if (hasMultiaddrDiff || hasShardDiff) { - log.info( - `Peer ${peerId.toString()} has updated multiaddrs or shardInfo, updating` - ); - - if (hasMultiaddrDiff) { - log.info( - `Peer ${peerId.toString()} has updated multiaddrs, updating` - ); - - await this.components.peerStore.patch(peerId, { - multiaddrs: peerInfo.multiaddrs - }); - } - - if (hasShardDiff && shardInfo) { - log.info( - `Peer ${peerId.toString()} has updated shardInfo, updating` - ); - await this.components.peerStore.merge(peerId, { - metadata: { - shardInfo: encodeRelayShard(shardInfo) - } - }); - - this.dispatchEvent( - new CustomEvent("peer", { - detail: { - id: peerId, - multiaddrs: peerInfo.multiaddrs - } - }) - ); - } - - continue; - } - } - - // update the tags for the peer - await this.components.peerStore.save(peerId, { - tags: { - [DEFAULT_PEER_EXCHANGE_TAG_NAME]: { - value: this.options.tagValue ?? DEFAULT_PEER_EXCHANGE_TAG_VALUE, - ttl: this.options.tagTTL ?? DEFAULT_PEER_EXCHANGE_TAG_TTL - } - }, - ...(shardInfo && { - metadata: { - shardInfo: encodeRelayShard(shardInfo) - } - }), - ...(peerInfo.multiaddrs && { - multiaddrs: peerInfo.multiaddrs - }) - }); - - log.info(`Discovered peer: ${peerId.toString()}`); - - this.dispatchEvent( - new CustomEvent("peer", { - detail: { - id: peerId, - multiaddrs: peerInfo.multiaddrs - } - }) - ); - } - - return { error: null, peerInfos }; - } - - private abortQueriesForPeer(peerIdStr: string): void { - log.info(`Aborting queries for peer: ${peerIdStr}`); - this.queryingPeers.delete(peerIdStr); - this.queryAttempts.delete(peerIdStr); - } - - private async checkPeerInfoDiff( - peerInfo: PeerInfo, - shardInfo?: ShardInfo - ): Promise<{ hasMultiaddrDiff: boolean; hasShardDiff: boolean }> { - const { id: peerId } = peerInfo; - const peer = await this.components.peerStore.get(peerId); - - const existingMultiaddrs = peer.addresses.map((a) => - a.multiaddr.toString() - ); - const newMultiaddrs = peerInfo.multiaddrs.map((ma) => ma.toString()); - const hasMultiaddrDiff = existingMultiaddrs.some( - (ma) => !newMultiaddrs.includes(ma) - ); - - let hasShardDiff: boolean = false; - const existingShardInfoBytes = peer.metadata.get("shardInfo"); - if (existingShardInfoBytes) { - const existingShardInfo = decodeRelayShard(existingShardInfoBytes); - if (existingShardInfo || shardInfo) { - hasShardDiff = - existingShardInfo.clusterId !== shardInfo?.clusterId || - existingShardInfo.shards.some( - (shard) => !shardInfo?.shards.includes(shard) - ); - } - } - - return { hasMultiaddrDiff, hasShardDiff }; - } -} - -export function wakuPeerExchangeDiscovery(): ( - components: Libp2pComponents -) => PeerExchangeDiscovery { - return (components: Libp2pComponents) => - new PeerExchangeDiscovery(components); -} diff --git a/packages/tests/tests/connection-mananger/discovery_dialer.spec.ts b/packages/tests/tests/connection-mananger/discovery_dialer.spec.ts index 1105b16768..5d9fc80bc8 100644 --- a/packages/tests/tests/connection-mananger/discovery_dialer.spec.ts +++ b/packages/tests/tests/connection-mananger/discovery_dialer.spec.ts @@ -94,7 +94,6 @@ describe.skip("DiscoveryDialer", function () { }); }); - // TODO(weboko): investigate why peer-exchange discovery is not working https://github.com/waku-org/js-waku/issues/2446 await waku.libp2p.peerStore.save(secondPeerId, { multiaddrs: [maddrs[1]] }); diff --git a/packages/tests/tests/dns-peer-discovery.spec.ts b/packages/tests/tests/dns-discovery/dns-peer-discovery.spec.ts similarity index 98% rename from packages/tests/tests/dns-peer-discovery.spec.ts rename to packages/tests/tests/dns-discovery/dns-peer-discovery.spec.ts index 3c6d88839e..a9c9a067ad 100644 --- a/packages/tests/tests/dns-peer-discovery.spec.ts +++ b/packages/tests/tests/dns-discovery/dns-peer-discovery.spec.ts @@ -15,7 +15,7 @@ import { createLightNode } from "@waku/sdk"; import { expect } from "chai"; import { MemoryDatastore } from "datastore-core/memory"; -import { delay } from "../src/index.js"; +import { delay } from "../../src/index.js"; const maxQuantity = 3; diff --git a/packages/tests/tests/dns-discovery/dns_discovery.optional.spec.ts b/packages/tests/tests/dns-discovery/dns_discovery.optional.spec.ts new file mode 100644 index 0000000000..7611c5a240 --- /dev/null +++ b/packages/tests/tests/dns-discovery/dns_discovery.optional.spec.ts @@ -0,0 +1,58 @@ +import { bootstrap } from "@libp2p/bootstrap"; +import { + DnsNodeDiscovery, + enrTree, + wakuPeerExchangeDiscovery +} from "@waku/discovery"; +import type { LightNode } from "@waku/interfaces"; +import { createLightNode } from "@waku/sdk"; +import { expect } from "chai"; + +import { afterEachCustom, tearDownNodes } from "../../src/index.js"; + +describe("DNS Discovery", function () { + let waku: LightNode; + const predefinedNodes: string[] = []; + + afterEachCustom(this, async () => { + await tearDownNodes([], waku); + }); + + it(`should discover peers other than used for bootstrapping`, async function () { + this.timeout(50_000); + + const dns = await DnsNodeDiscovery.dnsOverHttp(); + const dnsEnrs = []; + for await (const node of dns.getNextPeer([enrTree["SANDBOX"]])) { + dnsEnrs.push(node); + } + const dnsPeerMultiaddrs = dnsEnrs + .flatMap( + (enr) => enr.peerInfo?.multiaddrs.map((ma) => ma.toString()) ?? [] + ) + .filter((ma) => ma.includes("wss")); + + const networkConfig = { clusterId: 2, numShardsInCluster: 0 }; + waku = await createLightNode({ + libp2p: { + peerDiscovery: [ + bootstrap({ list: dnsPeerMultiaddrs }), + wakuPeerExchangeDiscovery() + ] + }, + networkConfig + }); + + const foundPxPeer = await new Promise((resolve) => { + waku.libp2p.addEventListener("peer:discovery", (evt) => { + const peerId = evt.detail.id.toString(); + const isBootstrapNode = predefinedNodes.find((n) => n.includes(peerId)); + if (!isBootstrapNode) { + resolve(true); + } + }); + }); + + expect(foundPxPeer).to.be.true; + }); +}); diff --git a/packages/tests/tests/peer-exchange/compliance.spec.ts b/packages/tests/tests/peer-exchange/compliance.spec.ts index 48a081c318..6027d3ac92 100644 --- a/packages/tests/tests/peer-exchange/compliance.spec.ts +++ b/packages/tests/tests/peer-exchange/compliance.spec.ts @@ -36,6 +36,11 @@ describe("Peer Exchange", function () { peerExchange: true, discv5BootstrapNode: enr }); + + const waitForNodesToMountPeerExchange = new Promise((resolve) => + setTimeout(resolve, 10_000) + ); + await waitForNodesToMountPeerExchange; }); tests({ @@ -43,22 +48,16 @@ describe("Peer Exchange", function () { waku = await createLightNode({ networkConfig: DefaultTestNetworkConfig }); - await waku.start(); - - const nwaku2Ma = await nwaku2.getMultiaddrWithId(); + const nwaku1Ma = await nwaku1.getMultiaddrWithId(); const peerExchange = new PeerExchangeDiscovery(waku.libp2p.components); - peerExchange.addEventListener("waku:peer-exchange:started", (event) => { - if (event.detail === true) { - void waku.libp2p.dialProtocol(nwaku2Ma, PeerExchangeCodec); - } - }); + await waku.libp2p.dialProtocol(nwaku1Ma, PeerExchangeCodec); return peerExchange; }, teardown: async () => { - this.timeout(15000); + this.timeout(15_000); await tearDownNodes([nwaku1, nwaku2], waku); } }); diff --git a/packages/tests/tests/peer-exchange/continuous_discovery.spec.ts b/packages/tests/tests/peer-exchange/continuous_discovery.spec.ts deleted file mode 100644 index 9034ea9e4c..0000000000 --- a/packages/tests/tests/peer-exchange/continuous_discovery.spec.ts +++ /dev/null @@ -1,127 +0,0 @@ -import { generateKeyPair } from "@libp2p/crypto/keys"; -import { type PeerId } from "@libp2p/interface"; -import { peerIdFromPrivateKey } from "@libp2p/peer-id"; -import { multiaddr } from "@multiformats/multiaddr"; -import { PeerExchangeDiscovery } from "@waku/discovery"; -import { IEnr, LightNode } from "@waku/interfaces"; -import { createLightNode, ShardInfo } from "@waku/sdk"; -import { decodeRelayShard } from "@waku/utils"; -import { expect } from "chai"; -import Sinon from "sinon"; - -describe("Peer Exchange Continuous Discovery", () => { - let peerExchangeDiscovery: PeerExchangeDiscovery; - let queryStub: Sinon.SinonStub; - let peerId: PeerId; - let randomPeerId: PeerId; - let waku: LightNode; - const shardInfo: ShardInfo = { - clusterId: 2, - shards: [1, 2] - }; - const multiaddrs = [multiaddr("/ip4/127.0.0.1/udp/1234")]; - - beforeEach(async () => { - waku = await createLightNode(); - - peerExchangeDiscovery = new PeerExchangeDiscovery(waku.libp2p.components); - queryStub = Sinon.stub( - (peerExchangeDiscovery as any).peerExchange, - "query" as any - ); - - await discoverPeerOnce(); - }); - - it("Should update multiaddrs", async () => { - const newMultiaddrs = [multiaddr("/ip4/192.168.1.1/udp/1234")]; - const newPeerInfo = { - ENR: { - peerId, - shardInfo, - peerInfo: { - multiaddrs: newMultiaddrs, - id: peerId - } - } as IEnr - }; - queryStub.resolves({ error: null, peerInfos: [newPeerInfo] }); - - const newResult = await (peerExchangeDiscovery as any).query(randomPeerId); - expect(newResult.error).to.be.null; - const newPeers = await waku.libp2p.peerStore.all(); - expect(newPeers.length).to.equal(1); - const newPeer = newPeers[0]; - expect(newPeer.addresses.length).to.equal(1); - expect(newPeer.addresses[0].multiaddr.toString()).to.equal( - newMultiaddrs[0].toString() - ); - }); - - it("Should update shard info", async () => { - const newShardInfo: ShardInfo = { - clusterId: 2, - shards: [1, 2, 3] - }; - const newPeerInfo = { - ENR: { - peerId, - shardInfo: newShardInfo, - peerInfo: { - multiaddrs: multiaddrs, - id: peerId - } - } as IEnr - }; - queryStub.resolves({ error: null, peerInfos: [newPeerInfo] }); - - const newResult = await (peerExchangeDiscovery as any).query(randomPeerId); - expect(newResult.error).to.be.null; - const newPeers = await waku.libp2p.peerStore.all(); - expect(newPeers.length).to.equal(1); - const newPeer = newPeers[0]; - expect(newPeer.addresses.length).to.equal(1); - expect(newPeer.addresses[0].multiaddr.toString()).to.equal( - multiaddrs[0].toString() - ); - - const _shardInfo = decodeRelayShard(newPeer.metadata.get("shardInfo")!); - expect(_shardInfo).to.deep.equal(newShardInfo); - }); - - async function discoverPeerOnce(): Promise { - const privateKey = await generateKeyPair("secp256k1"); - peerId = peerIdFromPrivateKey(privateKey); - - const enr: IEnr = { - peerId, - shardInfo, - peerInfo: { - multiaddrs: multiaddrs, - id: peerId - } - } as IEnr; - - const peerInfo = { - ENR: enr - }; - - queryStub.resolves({ error: null, peerInfos: [peerInfo] }); - - const privateKeyRandom = await generateKeyPair("secp256k1"); - randomPeerId = peerIdFromPrivateKey(privateKeyRandom); - - const result = await (peerExchangeDiscovery as any).query(randomPeerId); - expect(result.error).to.be.null; - - const peers = await waku.libp2p.peerStore.all(); - expect(peers.length).to.equal(1); - const peer = peers[0]; - expect(peer.addresses.length).to.equal(1); - expect(peer.addresses[0].multiaddr.toString()).to.equal( - multiaddrs[0].toString() - ); - const _shardInfo = decodeRelayShard(peer.metadata.get("shardInfo")!); - expect(_shardInfo).to.deep.equal(shardInfo); - } -}); diff --git a/packages/tests/tests/peer-exchange/pe.optional.spec.ts b/packages/tests/tests/peer-exchange/pe.optional.spec.ts deleted file mode 100644 index a9b509bbe5..0000000000 --- a/packages/tests/tests/peer-exchange/pe.optional.spec.ts +++ /dev/null @@ -1,64 +0,0 @@ -import { bootstrap } from "@libp2p/bootstrap"; -import { - DnsNodeDiscovery, - enrTree, - wakuPeerExchangeDiscovery -} from "@waku/discovery"; -import type { LightNode } from "@waku/interfaces"; -import { createLightNode } from "@waku/sdk"; -import { expect } from "chai"; - -import { afterEachCustom, tearDownNodes } from "../../src/index.js"; - -describe("Peer Exchange", () => { - describe("Auto Discovery", function () { - let waku: LightNode; - const predefinedNodes: string[] = []; - - afterEachCustom(this, async () => { - await tearDownNodes([], waku); - }); - - it(`should discover peers other than used for bootstrapping`, async function () { - this.timeout(50_000); - - const dns = await DnsNodeDiscovery.dnsOverHttp(); - const dnsEnrs = []; - for await (const node of dns.getNextPeer([enrTree["SANDBOX"]])) { - dnsEnrs.push(node); - } - const dnsPeerMultiaddrs = dnsEnrs - .flatMap( - (enr) => enr.peerInfo?.multiaddrs.map((ma) => ma.toString()) ?? [] - ) - .filter((ma) => ma.includes("wss")); - - const networkConfig = { clusterId: 2, numShardsInCluster: 0 }; - waku = await createLightNode({ - libp2p: { - peerDiscovery: [ - bootstrap({ list: dnsPeerMultiaddrs }), - wakuPeerExchangeDiscovery() - ] - }, - networkConfig - }); - - await waku.start(); - - const foundPxPeer = await new Promise((resolve) => { - waku.libp2p.addEventListener("peer:discovery", (evt) => { - const peerId = evt.detail.id.toString(); - const isBootstrapNode = predefinedNodes.find((n) => - n.includes(peerId) - ); - if (!isBootstrapNode) { - resolve(true); - } - }); - }); - - expect(foundPxPeer).to.be.true; - }); - }); -}); diff --git a/packages/tests/tests/peer-exchange/index.spec.ts b/packages/tests/tests/peer-exchange/peer_exchange.spec.ts similarity index 60% rename from packages/tests/tests/peer-exchange/index.spec.ts rename to packages/tests/tests/peer-exchange/peer_exchange.spec.ts index e34b257895..d7b6fb9d11 100644 --- a/packages/tests/tests/peer-exchange/index.spec.ts +++ b/packages/tests/tests/peer-exchange/peer_exchange.spec.ts @@ -13,6 +13,7 @@ import { DefaultTestClusterId, DefaultTestNetworkConfig, DefaultTestShardInfo, + delay, makeLogFileName, ServiceNode, tearDownNodes @@ -22,6 +23,7 @@ export const log = new Logger("test:pe"); describe("Peer Exchange", function () { this.timeout(150_000); + let ctx: Mocha.Context; let waku: LightNode; let nwaku1: ServiceNode; let nwaku2: ServiceNode; @@ -29,8 +31,9 @@ describe("Peer Exchange", function () { let dialPeerSpy: SinonSpy; beforeEachCustom(this, async () => { - nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); - nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); + ctx = this.ctx; + nwaku1 = new ServiceNode(makeLogFileName(ctx) + "1"); + nwaku2 = new ServiceNode(makeLogFileName(ctx) + "2"); await nwaku1.start({ clusterId: DefaultTestClusterId, shard: DefaultTestShardInfo.shards, @@ -46,6 +49,8 @@ describe("Peer Exchange", function () { discv5BootstrapNode: (await nwaku1.info()).enrUri, relay: true }); + + await delay(10_000); // wait for peer exchange to finish, nwaku takes ~10s }); afterEachCustom(this, async () => { @@ -62,7 +67,6 @@ describe("Peer Exchange", function () { ] } }); - await waku.start(); dialPeerSpy = Sinon.spy((waku as any).libp2p, "dial"); const pxPeersDiscovered = new Set(); @@ -87,104 +91,49 @@ describe("Peer Exchange", function () { expect(pxPeersDiscovered.size).to.equal(1); }); - // will be skipped until https://github.com/waku-org/js-waku/issues/1860 is fixed - it.skip("new peer added after a peer was already found", async function () { + it("new peer added after a peer was already found", async function () { waku = await createLightNode({ + networkConfig: DefaultTestNetworkConfig, libp2p: { - peerDiscovery: [ - bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }), - wakuPeerExchangeDiscovery() - ] + peerDiscovery: [wakuPeerExchangeDiscovery({ TTL: 1_000 })] } }); - await waku.start(); + await waku.dial(await nwaku2.getMultiaddrWithId()); - dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); - const pxPeersDiscovered = new Set(); - await new Promise((resolve) => { - waku.libp2p.addEventListener("peer:discovery", (evt) => { - return void (async () => { - const peerId = evt.detail.id; - const peer = await waku.libp2p.peerStore.get(peerId); - const tags = Array.from(peer.tags.keys()); - if (tags.includes(Tags.PEER_EXCHANGE)) { - pxPeersDiscovered.add(peerId); - if (pxPeersDiscovered.size === 1) { - resolve(); - } - } - })(); - }); - }); - - nwaku3 = new ServiceNode(makeLogFileName(this) + "3"); + nwaku3 = new ServiceNode(makeLogFileName(ctx) + "3"); await nwaku3.start({ clusterId: DefaultTestClusterId, shard: DefaultTestShardInfo.shards, discv5Discovery: true, peerExchange: true, discv5BootstrapNode: (await nwaku1.info()).enrUri, - relay: true, - lightpush: true, - filter: true + relay: true }); - await new Promise((resolve) => { + const nwaku3PeerId = (await nwaku3.getPeerId()).toString(); + const pxPeersDiscoveredPromise = new Promise((resolve) => { waku.libp2p.addEventListener("peer:discovery", (evt) => { return void (async () => { + setTimeout(() => { + resolve(false); + }, 10_000); + const peerId = evt.detail.id; const peer = await waku.libp2p.peerStore.get(peerId); const tags = Array.from(peer.tags.keys()); - if (tags.includes(Tags.PEER_EXCHANGE)) { - pxPeersDiscovered.add(peerId); - if (pxPeersDiscovered.size === 2) { - resolve(); - } - } - })(); - }); - }); - }); - // will be skipped until https://github.com/waku-org/js-waku/issues/1858 is fixed - it.skip("wrong wakuPeerExchangeDiscovery pubsub topic", async function () { - waku = await createLightNode({ - libp2p: { - peerDiscovery: [ - bootstrap({ list: [(await nwaku2.getMultiaddrWithId()).toString()] }), - wakuPeerExchangeDiscovery() - ] - } - }); - await waku.start(); - dialPeerSpy = Sinon.spy((waku as any).connectionManager, "dialPeer"); - - const pxPeersDiscovered = new Set(); - await new Promise((resolve) => { - const timeoutId = setTimeout(() => { - resolve(); - }, 40000); - - waku.libp2p.addEventListener("peer:discovery", (evt) => { - return void (async () => { - const peerId = evt.detail.id; - const peer = await waku.libp2p.peerStore.get(peerId); - const tags = Array.from(peer.tags.keys()); - if (tags.includes(Tags.PEER_EXCHANGE)) { - pxPeersDiscovered.add(peerId); - if (pxPeersDiscovered.size === 1) { - clearTimeout(timeoutId); - resolve(); - } + if ( + tags.includes(Tags.PEER_EXCHANGE) && + peerId.toString() === nwaku3PeerId + ) { + resolve(true); } })(); }); }); - expect( - pxPeersDiscovered.size, - "No peer should have been discovered" - ).to.equal(0); + const pxPeersDiscovered = await pxPeersDiscoveredPromise; + expect(pxPeersDiscovered, "Peer was found").to.be.true; }); it("peerDiscovery without wakuPeerExchangeDiscovery", async function () { diff --git a/packages/tests/tests/peer-exchange/query.spec.ts b/packages/tests/tests/peer-exchange/query.spec.ts deleted file mode 100644 index 87296e60a7..0000000000 --- a/packages/tests/tests/peer-exchange/query.spec.ts +++ /dev/null @@ -1,200 +0,0 @@ -import { bootstrap } from "@libp2p/bootstrap"; -import type { PeerId } from "@libp2p/interface"; -import { multiaddr } from "@multiformats/multiaddr"; -import type { Multiaddr } from "@multiformats/multiaddr"; -import { - PeerExchangeCodec, - WakuPeerExchange, - wakuPeerExchangeDiscovery -} from "@waku/discovery"; -import type { LightNode, PeerExchangeQueryResult } from "@waku/interfaces"; -import { createLightNode, Libp2pComponents, ProtocolError } from "@waku/sdk"; -import { Logger } from "@waku/utils"; -import { expect } from "chai"; - -import { - afterEachCustom, - beforeEachCustom, - delay, - makeLogFileName, - ServiceNode, - tearDownNodes, - waitForRemotePeerWithCodec -} from "../../src/index.js"; - -export const log = new Logger("test:pe"); - -const ShardInfo = { clusterId: 0, shards: [2] }; - -describe("Peer Exchange Query", function () { - this.timeout(30_000); - let waku: LightNode; - let nwaku1: ServiceNode; - let nwaku2: ServiceNode; - let nwaku3: ServiceNode; - let nwaku1PeerId: PeerId; - let nwaku3MA: Multiaddr; - let nwaku3PeerId: PeerId; - let components: Libp2pComponents; - let peerExchange: WakuPeerExchange; - let numPeersToRequest: number; - let queryResult: PeerExchangeQueryResult; - - beforeEachCustom( - this, - async () => { - nwaku1 = new ServiceNode(makeLogFileName(this.ctx) + "1"); - nwaku2 = new ServiceNode(makeLogFileName(this.ctx) + "2"); - nwaku3 = new ServiceNode(makeLogFileName(this.ctx) + "3"); - await nwaku1.start({ - shard: ShardInfo.shards, - clusterId: ShardInfo.clusterId, - discv5Discovery: true, - peerExchange: true, - relay: true - }); - nwaku1PeerId = await nwaku1.getPeerId(); - await nwaku2.start({ - shard: ShardInfo.shards, - clusterId: ShardInfo.clusterId, - discv5Discovery: true, - peerExchange: true, - discv5BootstrapNode: (await nwaku1.info()).enrUri, - relay: true - }); - await nwaku3.start({ - shard: ShardInfo.shards, - clusterId: ShardInfo.clusterId, - discv5Discovery: true, - peerExchange: true, - discv5BootstrapNode: (await nwaku2.info()).enrUri, - relay: true - }); - nwaku3MA = await nwaku3.getMultiaddrWithId(); - nwaku3PeerId = await nwaku3.getPeerId(); - waku = await createLightNode({ - libp2p: { - peerDiscovery: [ - bootstrap({ list: [nwaku3MA.toString()] }), - wakuPeerExchangeDiscovery() - ] - } - }); - await waku.start(); - await waku.libp2p.dialProtocol(nwaku3MA, PeerExchangeCodec); - await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, nwaku3PeerId); - - components = waku.libp2p.components as unknown as Libp2pComponents; - peerExchange = new WakuPeerExchange(components); - numPeersToRequest = 2; - - const startTime = Date.now(); - - while (true) { - if (Date.now() - startTime > 100_000) { - log.error("Timeout reached, exiting the loop."); - break; - } - await delay(2000); - try { - queryResult = await Promise.race([ - peerExchange.query({ - peerId: nwaku3PeerId, - numPeers: numPeersToRequest - }), - new Promise((resolve) => - setTimeout( - () => - resolve({ - peerInfos: null, - error: ProtocolError.GENERIC_FAIL - }), - 5000 - ) - ) - ]); - const hasErrors = queryResult?.error !== null; - const hasPeerInfos = - queryResult?.peerInfos && - queryResult.peerInfos.length === numPeersToRequest; - if (hasErrors) { - log.error("Error encountered, retrying...", queryResult.error); - continue; - } - if (!hasPeerInfos) { - log.warn( - "Peer info not available or does not match the requested number of peers, retrying..." - ); - continue; - } - break; - } catch (error) { - log.warn("Error encountered, retrying...", error); - } - } - }, - 120_000 - ); - - afterEachCustom(this, async () => { - await tearDownNodes([nwaku1, nwaku2, nwaku3], waku); - }); - - // slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911 - it.skip("connected peers and dial", async function () { - expect(queryResult.error).to.be.null; - - expect(queryResult.peerInfos?.[0].ENR).to.not.be.null; - expect(queryResult.peerInfos?.[0].ENR?.peerInfo?.multiaddrs).to.not.be.null; - - const peerWsMA = queryResult.peerInfos?.[0].ENR?.peerInfo?.multiaddrs[2]; - const localPeerWsMAAsString = peerWsMA - ?.toString() - .replace(/\/ip4\/[\d.]+\//, "/ip4/127.0.0.1/"); - const localPeerWsMA = multiaddr(localPeerWsMAAsString); - - let foundNodePeerId: PeerId | undefined = undefined; - const doesPeerIdExistInResponse = queryResult.peerInfos?.some(({ ENR }) => { - foundNodePeerId = ENR?.peerInfo?.id; - return ENR?.peerInfo?.id.toString() === nwaku1PeerId.toString(); - }); - if (!foundNodePeerId) { - throw new Error("Peer1 ID not found"); - } - expect(doesPeerIdExistInResponse, "peer not found").to.be.equal(true); - - await waku.libp2p.dialProtocol(localPeerWsMA, PeerExchangeCodec); - await waitForRemotePeerWithCodec(waku, PeerExchangeCodec, foundNodePeerId); - }); - - // slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911 - it.skip("more peers than existing", async function () { - const result = await peerExchange.query({ - peerId: nwaku3PeerId, - numPeers: 5 - }); - expect(result.error).to.be.null; - expect(result.peerInfos?.length).to.be.eq(numPeersToRequest); - }); - - // slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911 - it.skip("less peers than existing", async function () { - const result = await peerExchange.query({ - peerId: nwaku3PeerId, - numPeers: 1 - }); - expect(result.error).to.be.null; - expect(result.peerInfos?.length).to.be.eq(1); - }); - - // slow and flaky in CI: https://github.com/waku-org/js-waku/issues/1911 - it.skip("non connected peers", async function () { - // querying the non connected peer - const result = await peerExchange.query({ - peerId: nwaku1PeerId, - numPeers: numPeersToRequest - }); - expect(result.error).to.be.eq(ProtocolError.NO_PEER_AVAILABLE); - expect(result.peerInfos).to.be.null; - }); -}); diff --git a/packages/tests/tests/sharding/peer_management.spec.ts b/packages/tests/tests/sharding/peer_management.spec.ts index a02c973e3c..9dd07d1754 100644 --- a/packages/tests/tests/sharding/peer_management.spec.ts +++ b/packages/tests/tests/sharding/peer_management.spec.ts @@ -164,18 +164,15 @@ describe("Static Sharding: Peer Management", function () { libp2p: { peerDiscovery: [ bootstrap({ list: [nwaku3Ma.toString()] }), - wakuPeerExchangeDiscovery() + wakuPeerExchangeDiscovery({ TTL: 1000 }) ] } }); dialPeerSpy = Sinon.spy((waku as any).libp2p, "dial"); - await waku.start(); - const pxPeersDiscovered = new Set(); - - await new Promise((resolve) => { + const pxPeersDiscoveredPromise = new Promise((resolve) => { waku.libp2p.addEventListener("peer:discovery", (evt) => { return void (async () => { const peerId = evt.detail.id; @@ -191,7 +188,8 @@ describe("Static Sharding: Peer Management", function () { }); }); - await delay(1000); + await delay(10_000); + await pxPeersDiscoveredPromise; expect(dialPeerSpy.callCount).to.equal(3); }); });