diff --git a/package-lock.json b/package-lock.json index 3f4bb5b45d..681770ea4b 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10424,9 +10424,9 @@ } }, "node_modules/fast-check": { - "version": "3.14.0", - "resolved": "https://registry.npmjs.org/fast-check/-/fast-check-3.14.0.tgz", - "integrity": "sha512-9Z0zqASzDNjXBox/ileV/fd+4P+V/f3o4shM6QawvcdLFh8yjPG4h5BrHUZ8yzY6amKGDTAmRMyb/JZqe+dCgw==", + "version": "3.15.0", + "resolved": "https://registry.npmjs.org/fast-check/-/fast-check-3.15.0.tgz", + "integrity": "sha512-iBz6c+EXL6+nI931x/sbZs1JYTZtLG6Cko0ouS8LRTikhDR7+wZk4TYzdRavlnByBs2G6+nuuJ7NYL9QplNt8Q==", "funding": [ { "type": "individual", @@ -26602,6 +26602,7 @@ "chai-as-promised": "^7.1.1", "debug": "^4.3.4", "dockerode": "^3.3.5", + "fast-check": "^3.15.0", "p-retry": "^6.1.0", "p-timeout": "^6.1.0", "portfinder": "^1.0.32", @@ -30071,6 +30072,7 @@ "datastore-core": "^9.2.6", "debug": "^4.3.4", "dockerode": "^3.3.5", + "fast-check": "^3.15.0", "interface-datastore": "^8.2.5", "libp2p": "^0.46.14", "mocha": "^10.2.0", @@ -33452,9 +33454,9 @@ "version": "0.1.8" }, "fast-check": { - "version": "3.14.0", - "resolved": "https://registry.npmjs.org/fast-check/-/fast-check-3.14.0.tgz", - "integrity": "sha512-9Z0zqASzDNjXBox/ileV/fd+4P+V/f3o4shM6QawvcdLFh8yjPG4h5BrHUZ8yzY6amKGDTAmRMyb/JZqe+dCgw==", + "version": "3.15.0", + "resolved": "https://registry.npmjs.org/fast-check/-/fast-check-3.15.0.tgz", + "integrity": "sha512-iBz6c+EXL6+nI931x/sbZs1JYTZtLG6Cko0ouS8LRTikhDR7+wZk4TYzdRavlnByBs2G6+nuuJ7NYL9QplNt8Q==", "requires": { "pure-rand": "^6.0.0" } diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index bed4d8fc89..41f19f42bd 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -1,6 +1,5 @@ import type { Libp2p } from "@libp2p/interface"; import type { Stream } from "@libp2p/interface/connection"; -import type { PeerId } from "@libp2p/interface/peer-id"; import { Peer, PeerStore } from "@libp2p/interface/peer-store"; import type { IBaseProtocol, @@ -9,14 +8,14 @@ import type { PubsubTopic } from "@waku/interfaces"; import { DefaultPubsubTopic } from "@waku/interfaces"; -import { shardInfoToPubsubTopics } from "@waku/utils"; +import { Logger, shardInfoToPubsubTopics } from "@waku/utils"; import { getConnectedPeersForProtocol, getPeersForProtocol, - selectPeerForProtocol + sortPeersByLatency } from "@waku/utils/libp2p"; -import { filterPeers } from "./filterPeers.js"; +import { filterPeersByDiscovery } from "./filterPeers.js"; import { StreamManager } from "./stream_manager.js"; /** @@ -30,7 +29,8 @@ export class BaseProtocol implements IBaseProtocol { constructor( public multicodec: string, - private components: Libp2pComponents + private components: Libp2pComponents, + private log: Logger ) { this.addLibp2pEventListener = components.events.addEventListener.bind( components.events @@ -64,22 +64,14 @@ export class BaseProtocol implements IBaseProtocol { return getPeersForProtocol(this.peerStore, [this.multicodec]); } - protected async getPeer(peerId?: PeerId): Promise { - const { peer } = await selectPeerForProtocol( - this.peerStore, - [this.multicodec], - peerId - ); - return peer; - } - /** - * Retrieves a list of connected peers based on the specified criteria. + * Retrieves a list of connected peers that support the protocol. The list is sorted by latency. * * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. - * @returns A Promise that resolves to an array of peers based on the specified criteria. - */ + + * @returns A list of peers that support the protocol sorted by latency. + */ protected async getPeers( { numPeers, @@ -99,8 +91,26 @@ export class BaseProtocol implements IBaseProtocol { [this.multicodec] ); - // Filter the peers based on the specified criteria - return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers); + // Filter the peers based on discovery & number of peers requested + const filteredPeers = await filterPeersByDiscovery( + allPeersForProtocol, + numPeers, + maxBootstrapPeers + ); + + // Sort the peers by latency + const sortedFilteredPeers = await sortPeersByLatency( + this.peerStore, + filteredPeers + ); + + if (sortedFilteredPeers.length === 0) { + this.log.warn( + "No peers found. Ensure you have a connection to the network." + ); + } + + return sortedFilteredPeers; } initializePubsubTopic(options?: ProtocolCreateOptions): PubsubTopic[] { diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 06caa77b51..5428d84bdf 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -279,7 +279,7 @@ class Filter extends BaseProtocol implements IReceiver { } constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(FilterCodecs.SUBSCRIBE, libp2p.components); + super(FilterCodecs.SUBSCRIBE, libp2p.components, log); this.pubsubTopics = this.initializePubsubTopic(options); diff --git a/packages/core/src/lib/filterPeers.spec.ts b/packages/core/src/lib/filterPeers.spec.ts index de51da5593..0ded7a6d76 100644 --- a/packages/core/src/lib/filterPeers.spec.ts +++ b/packages/core/src/lib/filterPeers.spec.ts @@ -4,9 +4,9 @@ import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { Tags } from "@waku/interfaces"; import { expect } from "chai"; -import { filterPeers } from "./filterPeers.js"; +import { filterPeersByDiscovery } from "./filterPeers.js"; -describe("filterPeers function", function () { +describe("filterPeersByDiscovery function", function () { it("should return all peers when numPeers is 0", async function () { const peer1 = await createSecp256k1PeerId(); const peer2 = await createSecp256k1PeerId(); @@ -27,7 +27,7 @@ describe("filterPeers function", function () { } ] as unknown as Peer[]; - const result = await filterPeers(mockPeers, 0, 10); + const result = await filterPeersByDiscovery(mockPeers, 0, 10); expect(result.length).to.deep.equal(mockPeers.length); }); @@ -56,7 +56,7 @@ describe("filterPeers function", function () { } ] as unknown as Peer[]; - const result = await filterPeers(mockPeers, 0, 0); + const result = await filterPeersByDiscovery(mockPeers, 0, 0); // result should have no bootstrap peers, and a total of 2 peers expect(result.length).to.equal(2); @@ -95,7 +95,7 @@ describe("filterPeers function", function () { } ] as unknown as Peer[]; - const result = await filterPeers(mockPeers, 0, 1); + const result = await filterPeersByDiscovery(mockPeers, 0, 1); // result should have 1 bootstrap peers, and a total of 4 peers expect(result.length).to.equal(4); @@ -134,7 +134,7 @@ describe("filterPeers function", function () { } ] as unknown as Peer[]; - const result = await filterPeers(mockPeers, 5, 2); + const result = await filterPeersByDiscovery(mockPeers, 5, 2); // check that result has at least 2 bootstrap peers and no more than 5 peers expect(result.length).to.be.at.least(2); diff --git a/packages/core/src/lib/filterPeers.ts b/packages/core/src/lib/filterPeers.ts index 298c9f15bc..f611109c77 100644 --- a/packages/core/src/lib/filterPeers.ts +++ b/packages/core/src/lib/filterPeers.ts @@ -2,23 +2,31 @@ import { Peer } from "@libp2p/interface/peer-store"; import { Tags } from "@waku/interfaces"; /** - * Retrieves a list of peers based on the specified criteria. + * Retrieves a list of peers based on the specified criteria: + * 1. If numPeers is 0, return all peers + * 2. Bootstrap peers are prioritized + * 3. Non-bootstrap peers are randomly selected to fill up to numPeers * * @param peers - The list of peers to filter from. - * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. + * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned, irrespective of `maxBootstrapPeers`. * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. * @returns A Promise that resolves to an array of peers based on the specified criteria. */ -export async function filterPeers( +export async function filterPeersByDiscovery( peers: Peer[], numPeers: number, maxBootstrapPeers: number ): Promise { // Collect the bootstrap peers up to the specified maximum - const bootstrapPeers = peers + let bootstrapPeers = peers .filter((peer) => peer.tags.has(Tags.BOOTSTRAP)) .slice(0, maxBootstrapPeers); + // If numPeers is less than the number of bootstrap peers, adjust the bootstrapPeers array + if (numPeers > 0 && numPeers < bootstrapPeers.length) { + bootstrapPeers = bootstrapPeers.slice(0, numPeers); + } + // Collect non-bootstrap peers const nonBootstrapPeers = peers.filter( (peer) => !peer.tags.has(Tags.BOOTSTRAP) diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 773e033565..ea8b308f7d 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -48,7 +48,7 @@ class LightPush extends BaseProtocol implements ILightPush { private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(LightPushCodec, libp2p.components); + super(LightPushCodec, libp2p.components, log); this.pubsubTopics = this.initializePubsubTopic(options); } diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index 9f95b5ba24..77cbb6af9e 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -24,7 +24,7 @@ class Metadata extends BaseProtocol { private readonly shardInfo: ShardingParams; private libp2pComponents: Libp2pComponents; constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) { - super(MetadataCodec, libp2p.components); + super(MetadataCodec, libp2p.components, log); this.libp2pComponents = libp2p; this.shardInfo = shardInfo; void libp2p.registrar.handle(MetadataCodec, (streamData) => { @@ -70,7 +70,10 @@ class Metadata extends BaseProtocol { async query(peerId: PeerId): Promise { const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo); - const peer = await this.getPeer(peerId); + const peer = await this.peerStore.get(peerId); + if (!peer) { + throw new Error(`Peer ${peerId.toString()} not found`); + } const stream = await this.getStream(peer); diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 1e06533311..a78c2e6f70 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -78,7 +78,7 @@ class Store extends BaseProtocol implements IStore { private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(StoreCodec, libp2p.components); + super(StoreCodec, libp2p.components, log); this.pubsubTopics = this.initializePubsubTopic(options); } diff --git a/packages/interfaces/src/peer_exchange.ts b/packages/interfaces/src/peer_exchange.ts index 8183d6fe13..b194c9c6a9 100644 --- a/packages/interfaces/src/peer_exchange.ts +++ b/packages/interfaces/src/peer_exchange.ts @@ -11,7 +11,7 @@ export interface IPeerExchange extends IBaseProtocol { export interface PeerExchangeQueryParams { numPeers: number; - peerId?: PeerId; + peerId: PeerId; } export interface PeerExchangeResponse { diff --git a/packages/peer-exchange/src/waku_peer_exchange.ts b/packages/peer-exchange/src/waku_peer_exchange.ts index 21ac4aab4d..473864474a 100644 --- a/packages/peer-exchange/src/waku_peer_exchange.ts +++ b/packages/peer-exchange/src/waku_peer_exchange.ts @@ -27,7 +27,7 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { * @param components - libp2p components */ constructor(components: Libp2pComponents) { - super(PeerExchangeCodec, components); + super(PeerExchangeCodec, components, log); } /** @@ -42,7 +42,10 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { numPeers: BigInt(numPeers) }); - const peer = await this.getPeer(params.peerId); + const peer = await this.peerStore.get(params.peerId); + if (!peer) { + throw new Error(`Peer ${params.peerId.toString()} not found`); + } const stream = await this.getStream(peer); diff --git a/packages/tests/package.json b/packages/tests/package.json index bd802264ca..a1e6e15c55 100644 --- a/packages/tests/package.json +++ b/packages/tests/package.json @@ -60,6 +60,7 @@ "chai-as-promised": "^7.1.1", "debug": "^4.3.4", "dockerode": "^3.3.5", + "fast-check": "^3.15.0", "p-retry": "^6.1.0", "p-timeout": "^6.1.0", "portfinder": "^1.0.32", diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts new file mode 100644 index 0000000000..f3f2d4db33 --- /dev/null +++ b/packages/tests/tests/getPeers.spec.ts @@ -0,0 +1,329 @@ +import type { Connection } from "@libp2p/interface/connection"; +import type { PeerStore } from "@libp2p/interface/peer-store"; +import type { Peer } from "@libp2p/interface/peer-store"; +import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; +import { + createLightNode, + Libp2pComponents, + type LightNode, + Tags, + utf8ToBytes +} from "@waku/sdk"; +import { expect } from "chai"; +import fc from "fast-check"; +import Sinon from "sinon"; + +describe("getPeers", function () { + let peerStore: PeerStore; + let connectionManager: Libp2pComponents["connectionManager"]; + let waku: LightNode; + const lowPingBytes = utf8ToBytes("50"); + const midPingBytes = utf8ToBytes("100"); + const highPingBytes = utf8ToBytes("200"); + + let lowPingBootstrapPeer: Peer, + lowPingNonBootstrapPeer: Peer, + midPingBootstrapPeer: Peer, + midPingNonBootstrapPeer: Peer, + highPingBootstrapPeer: Peer, + highPingNonBootstrapPeer: Peer, + differentCodecPeer: Peer, + anotherDifferentCodecPeer: Peer; + + let bootstrapPeers: Peer[]; + let nonBootstrapPeers: Peer[]; + let allPeers: Peer[]; + + beforeEach(async function () { + this.timeout(10_000); + waku = await createLightNode(); + peerStore = waku.libp2p.peerStore; + connectionManager = waku.libp2p.components.connectionManager; + + const [ + lowPingBootstrapPeerId, + lowPingNonBootstrapPeerId, + midPingBootstrapPeerId, + midPingNonBootstrapPeerId, + highPingBootstrapPeerId, + highPingNonBootstrapPeerId, + differentCodecPeerId, + anotherDifferentCodecPeerId + ] = await Promise.all([ + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId() + ]); + + lowPingBootstrapPeer = { + id: lowPingBootstrapPeerId, + protocols: [waku.lightPush.multicodec], + metadata: new Map().set("ping", lowPingBytes), + tags: new Map().set(Tags.BOOTSTRAP, {}) + } as Peer; + lowPingNonBootstrapPeer = { + id: lowPingNonBootstrapPeerId, + protocols: [waku.lightPush.multicodec], + metadata: new Map().set("ping", lowPingBytes), + tags: new Map().set(Tags.PEER_EXCHANGE, {}) + } as Peer; + midPingBootstrapPeer = { + id: midPingBootstrapPeerId, + protocols: [waku.lightPush.multicodec], + metadata: new Map().set("ping", midPingBytes), + tags: new Map().set(Tags.BOOTSTRAP, {}) + } as Peer; + midPingNonBootstrapPeer = { + id: midPingNonBootstrapPeerId, + protocols: [waku.lightPush.multicodec], + metadata: new Map().set("ping", midPingBytes), + tags: new Map().set(Tags.PEER_EXCHANGE, {}) + } as Peer; + highPingBootstrapPeer = { + id: highPingBootstrapPeerId, + protocols: [waku.lightPush.multicodec], + metadata: new Map().set("ping", highPingBytes), + tags: new Map().set(Tags.BOOTSTRAP, {}) + } as Peer; + highPingNonBootstrapPeer = { + id: highPingNonBootstrapPeerId, + protocols: [waku.lightPush.multicodec], + metadata: new Map().set("ping", highPingBytes), + tags: new Map().set(Tags.PEER_EXCHANGE, {}) + } as Peer; + differentCodecPeer = { + id: differentCodecPeerId, + protocols: ["different/1"], + metadata: new Map().set("ping", lowPingBytes), + tags: new Map().set(Tags.BOOTSTRAP, {}) + } as Peer; + anotherDifferentCodecPeer = { + id: anotherDifferentCodecPeerId, + protocols: ["different/2"], + metadata: new Map().set("ping", lowPingBytes), + tags: new Map().set(Tags.BOOTSTRAP, {}) + } as Peer; + + bootstrapPeers = [ + lowPingBootstrapPeer, + midPingBootstrapPeer, + highPingBootstrapPeer + ]; + + nonBootstrapPeers = [ + lowPingNonBootstrapPeer, + midPingNonBootstrapPeer, + highPingNonBootstrapPeer + ]; + + allPeers = [ + ...bootstrapPeers, + ...nonBootstrapPeers, + differentCodecPeer, + anotherDifferentCodecPeer + ]; + + Sinon.stub(peerStore, "get").callsFake(async (peerId) => { + return allPeers.find((peer) => peer.id.equals(peerId))!; + }); + + Sinon.stub(peerStore, "forEach").callsFake(async (callback) => { + for (const peer of allPeers) { + callback(peer); + } + }); + + // assume all peers have an opened connection + Sinon.stub(connectionManager, "getConnections").callsFake(() => { + const connections: Connection[] = []; + for (const peer of allPeers) { + connections.push({ + status: "open", + remotePeer: peer.id + } as unknown as Connection); + } + return connections; + }); + }); + + this.afterEach(function () { + Sinon.restore(); + }); + + describe("getPeers with varying maxBootstrapPeers", function () { + const maxBootstrapPeersValues = [1, 2, 3, 4, 5, 6, 7]; + + maxBootstrapPeersValues.forEach((maxBootstrapPeers) => { + describe(`maxBootstrapPeers=${maxBootstrapPeers}`, function () { + it(`numPeers=1 -- returns one bootstrap peer `, async function () { + const result = (await (waku.lightPush as any).getPeers({ + numPeers: 1, + maxBootstrapPeers + })) as Peer[]; + + // Should only have 1 peer + expect(result).to.have.lengthOf(1); + + // The peer should be a bootstrap peer + expect(result[0].tags.has(Tags.BOOTSTRAP)).to.be.true; + + // Peer should be of the same protocol + expect(result[0].protocols.includes(waku.lightPush.multicodec)).to.be + .true; + + // Peer should have the lowest ping + expect(result[0].metadata.get("ping")).to.equal(lowPingBytes); + }); + + it(`numPeers=2 -- returns total 2 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () { + const result = (await (waku.lightPush as any).getPeers({ + numPeers: 2, + maxBootstrapPeers + })) as Peer[]; + + // Should only have 2 peers + expect(result).to.have.lengthOf(2); + + // Should only have ${maxBootstrapPeers} bootstrap peers + expect( + result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length + ).to.be.lessThanOrEqual(maxBootstrapPeers); + + // Should return peers with the same protocol + expect( + result.every((peer: Peer) => + peer.protocols.includes(waku.lightPush.multicodec) + ) + ).to.be.true; + + // All peers should be sorted by latency + // 0th index should be the lowest ping of all peers returned + expect(result[0].metadata.get("ping")).to.equal(lowPingBytes); + }); + + it(`numPeers=3 -- returns total 3 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () { + const result = (await (waku.lightPush as any).getPeers({ + numPeers: 3, + maxBootstrapPeers + })) as Peer[]; + + // Should only have 3 peers + expect(result).to.have.lengthOf(3); + + // Should only have ${maxBootstrapPeers} bootstrap peers + expect( + result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length + ).to.be.lessThanOrEqual(maxBootstrapPeers); + + // Should return peers with the same protocol + expect( + result.every((peer: Peer) => + peer.protocols.includes(waku.lightPush.multicodec) + ) + ).to.be.true; + + // All peers should be sorted by latency + // 0th index should be the lowest ping of all peers returned + expect(result[0].metadata.get("ping")).to.equal(lowPingBytes); + }); + + it(`numPeers=4 -- returns total 4 peers, with max ${maxBootstrapPeers} bootstrap peers`, async function () { + const result = (await (waku.lightPush as any).getPeers({ + numPeers: 4, + maxBootstrapPeers + })) as Peer[]; + + // Should only have 4 peers + expect(result).to.have.lengthOf(4); + + // Should only have ${maxBootstrapPeers} bootstrap peers + expect( + result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length + ).to.be.lessThanOrEqual(maxBootstrapPeers); + + // Should return peers with the same protocol + expect( + result.every((peer: Peer) => + peer.protocols.includes(waku.lightPush.multicodec) + ) + ).to.be.true; + + // All peers should be sorted by latency + // 0th index should be the lowest ping of all peers returned + expect(result[0].metadata.get("ping")).to.equal(lowPingBytes); + }); + + it(`numPeers=0 -- returns all peers including all non-bootstrap with maxBootstrapPeers: ${maxBootstrapPeers}`, async function () { + const result = (await (waku.lightPush as any).getPeers({ + numPeers: 0, + maxBootstrapPeers + })) as Peer[]; + + // Should have all non-bootstrap peers + ${maxBootstrapPeers} bootstrap peers + // Unless bootstrapPeers.length < maxBootstrapPeers + // Then it should be all non-bootstrap peers + bootstrapPeers.length + if (maxBootstrapPeers > bootstrapPeers.length) { + expect(result).to.have.lengthOf( + nonBootstrapPeers.length + bootstrapPeers.length + ); + } else { + expect(result).to.have.lengthOf( + nonBootstrapPeers.length + maxBootstrapPeers + ); + } + + // All peers should be bootstrap peers + expect( + result.filter((peer: Peer) => peer.tags.has(Tags.BOOTSTRAP)).length + ).to.be.lessThanOrEqual(maxBootstrapPeers); + + // Peers should be of the same protocol + expect( + result.every((peer: Peer) => + peer.protocols.includes(waku.lightPush.multicodec) + ) + ).to.be.true; + + // All peers returned should be sorted by latency + // 0th index should be the lowest ping of all peers returned + expect(result[0].metadata.get("ping")).to.equal(lowPingBytes); + }); + }); + }); + }); + + describe("getPeers property-based tests", function () { + it("should return the correct number of peers based on numPeers and maxBootstrapPeers", async function () { + await fc.assert( + fc.asyncProperty( + //max bootstrap peers + fc.integer({ min: 1, max: 100 }), + //numPeers + fc.integer({ min: 0, max: 100 }), + async (maxBootstrapPeers, numPeers) => { + const result = (await (waku.lightPush as any).getPeers({ + numPeers, + maxBootstrapPeers + })) as Peer[]; + + if (numPeers === 0) { + // Expect all peers when numPeers is 0 + expect(result.length).to.be.greaterThanOrEqual(1); + } else { + // Expect up to numPeers peers + expect(result.length).to.be.lessThanOrEqual(numPeers); + } + } + ), + { + verbose: true + } + ); + }); + }); +}); diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index 5f82879f01..9d2b0e9f93 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -1,20 +1,14 @@ -import type { PeerStore } from "@libp2p/interface/peer-store"; -import type { Peer } from "@libp2p/interface/peer-store"; -import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; -import { LightPushCodec } from "@waku/core"; -import { DefaultPubsubTopic, LightNode } from "@waku/interfaces"; -import { Protocols } from "@waku/interfaces"; +import { + DefaultPubsubTopic, + type LightNode, + Protocols +} from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { toAsyncIterator } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; -import { - getConnectedPeersForProtocol, - selectPeerForProtocol -} from "@waku/utils/libp2p"; import chai, { expect } from "chai"; import chaiAsPromised from "chai-as-promised"; -import sinon from "sinon"; import { delay, @@ -120,182 +114,3 @@ describe("Util: toAsyncIterator: Filter", () => { expect(result.done).to.eq(true); }); }); - -const TestCodec = "test/1"; - -describe("selectPeerForProtocol", () => { - let peerStore: PeerStore; - const protocols = [TestCodec]; - - let lowPingPeer: Peer, - midPingPeer: Peer, - highPingPeer: Peer, - differentCodecPeer: Peer, - anotherDifferentCodecPeer: Peer; - - beforeEach(async function () { - this.timeout(10000); - const waku = await createLightNode(); - await waku.start(); - await delay(3000); - peerStore = waku.libp2p.peerStore; - - const [ - lowPingPeerId, - midPingPeerId, - highPingPeerId, - differentCodecPeerId, - anotherDifferentCodecPeerId - ] = await Promise.all([ - createSecp256k1PeerId(), - createSecp256k1PeerId(), - createSecp256k1PeerId(), - createSecp256k1PeerId(), - createSecp256k1PeerId() - ]); - - lowPingPeer = { - id: lowPingPeerId, - protocols: [TestCodec], - metadata: new Map().set("ping", utf8ToBytes("50")) - } as Peer; - - midPingPeer = { - id: midPingPeerId, - protocols: [TestCodec], - metadata: new Map().set("ping", utf8ToBytes("100")) - } as Peer; - - highPingPeer = { - id: highPingPeerId, - protocols: [TestCodec], - metadata: new Map().set("ping", utf8ToBytes("500")) - } as Peer; - - differentCodecPeer = { - id: differentCodecPeerId, - protocols: ["DifferentCodec"] - } as Peer; - - anotherDifferentCodecPeer = { - id: anotherDifferentCodecPeerId, - protocols: ["AnotherDifferentCodec"] - } as Peer; - }); - - afterEach(() => { - sinon.restore(); - }); - - it("should return the peer with the lowest ping", async function () { - const mockPeers = [highPingPeer, lowPingPeer, midPingPeer]; - - sinon.stub(peerStore, "get").callsFake(async (peerId) => { - return mockPeers.find((peer) => peer.id.equals(peerId))!; - }); - - sinon.stub(peerStore, "forEach").callsFake(async (callback) => { - for (const peer of mockPeers) { - callback(peer); - } - }); - - const result = await selectPeerForProtocol(peerStore, protocols); - - expect(result.peer).to.deep.equal(lowPingPeer); - expect(result.protocol).to.equal(TestCodec); - }); - - it("should return the peer with the provided peerId", async function () { - const targetPeer = await createSecp256k1PeerId(); - const mockPeer = { id: targetPeer, protocols: [TestCodec] } as Peer; - sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer); - - const result = await selectPeerForProtocol( - peerStore, - protocols, - targetPeer - ); - expect(result.peer).to.deep.equal(mockPeer); - }); - - it("should return a random peer when all peers have the same latency", async function () { - const mockPeers = [highPingPeer, highPingPeer, highPingPeer]; - - sinon.stub(peerStore, "get").callsFake(async (peerId) => { - return mockPeers.find((peer) => peer.id.equals(peerId))!; - }); - - sinon.stub(peerStore, "forEach").callsFake(async (callback) => { - for (const peer of mockPeers) { - callback(peer); - } - }); - - const result = await selectPeerForProtocol(peerStore, protocols); - - expect(mockPeers).to.deep.include(result.peer); - }); - - it("should throw an error when no peer matches the given protocols", async function () { - const mockPeers = [differentCodecPeer, anotherDifferentCodecPeer]; - - sinon.stub(peerStore, "forEach").callsFake(async (callback) => { - for (const peer of mockPeers) { - callback(peer); - } - }); - - await expect( - selectPeerForProtocol(peerStore, protocols) - ).to.be.rejectedWith( - `Failed to find known peer that registers protocols: ${protocols}` - ); - }); - - it("should throw an error when the selected peer does not register the required protocols", async function () { - const targetPeer = await createSecp256k1PeerId(); - const mockPeer = { id: targetPeer, protocols: ["DifferentCodec"] } as Peer; - sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer); - - await expect( - selectPeerForProtocol(peerStore, protocols, targetPeer) - ).to.be.rejectedWith( - `Peer does not register required protocols (${targetPeer.toString()}): ${protocols}` - ); - }); -}); - -describe("getConnectedPeersForProtocol", function () { - let waku: LightNode; - let nwaku: NimGoNode; - - beforeEach(async function () { - this.timeout(15000); - nwaku = new NimGoNode(makeLogFileName(this)); - await nwaku.start({ - filter: true, - lightpush: true, - relay: true - }); - waku = await createLightNode(); - await waku.start(); - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Filter]); - }); - - afterEach(async function () { - this.timeout(10000); - await tearDownNodes(nwaku, waku); - }); - - it("returns all connected peers that support the protocol", async function () { - const peers = await getConnectedPeersForProtocol( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - [LightPushCodec] - ); - - expect(peers.length).to.eq(1); - }); -}); diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 883160cc89..25376356a6 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -1,5 +1,4 @@ import type { Connection } from "@libp2p/interface/connection"; -import type { PeerId } from "@libp2p/interface/peer-id"; import type { Peer, PeerStore } from "@libp2p/interface/peer-store"; import { bytesToUtf8 } from "../bytes/index.js"; @@ -16,35 +15,39 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined { } /** - * Returns the peer with the lowest latency. + * Function to sort peers by latency from lowest to highest * @param peerStore - The Libp2p PeerStore * @param peers - The list of peers to choose from - * @returns The peer with the lowest latency, or undefined if no peer could be reached + * @returns Sorted array of peers by latency */ -export async function selectLowestLatencyPeer( +export async function sortPeersByLatency( peerStore: PeerStore, peers: Peer[] -): Promise { - if (peers.length === 0) return; +): Promise { + if (peers.length === 0) return []; const results = await Promise.all( peers.map(async (peer) => { - const pingBytes = (await peerStore.get(peer.id)).metadata.get("ping"); - if (!pingBytes) return { peer, ping: Infinity }; + try { + const pingBytes = (await peerStore.get(peer.id)).metadata.get("ping"); + if (!pingBytes) return { peer, ping: Infinity }; - const ping = Number(bytesToUtf8(pingBytes)) ?? Infinity; - return { peer, ping }; + const ping = Number(bytesToUtf8(pingBytes)); + return { peer, ping }; + } catch (error) { + return { peer, ping: Infinity }; + } }) ); - const lowestLatencyResult = results.sort((a, b) => a.ping - b.ping)[0]; - if (!lowestLatencyResult) { - return undefined; - } + // filter out null values + const validResults = results.filter( + (result): result is { peer: Peer; ping: number } => result !== null + ); - return lowestLatencyResult.ping !== Infinity - ? lowestLatencyResult.peer - : undefined; + return validResults + .sort((a, b) => a.ping - b.ping) + .map((result) => result.peer); } /** @@ -87,53 +90,6 @@ export async function getConnectedPeersForProtocol( return peersWithNulls.filter((peer): peer is Peer => peer !== null); } -/** - * Returns a peer that supports the given protocol. - * If peerId is provided, the peer with that id is returned. - * Otherwise, the peer with the lowest latency is returned. - * If no peer is found from the above criteria, a random peer is returned. - */ -export async function selectPeerForProtocol( - peerStore: PeerStore, - protocols: string[], - peerId?: PeerId -): Promise<{ peer: Peer; protocol: string }> { - let peer: Peer | undefined; - if (peerId) { - peer = await peerStore.get(peerId); - if (!peer) { - throw new Error( - `Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}` - ); - } - } else { - const peers = await getPeersForProtocol(peerStore, protocols); - peer = await selectLowestLatencyPeer(peerStore, peers); - if (!peer) { - peer = selectRandomPeer(peers); - if (!peer) - throw new Error( - `Failed to find known peer that registers protocols: ${protocols}` - ); - } - } - - let protocol; - for (const codec of protocols) { - if (peer.protocols.includes(codec)) { - protocol = codec; - // Do not break as we want to keep the last value - } - } - if (!protocol) { - throw new Error( - `Peer does not register required protocols (${peer.id.toString()}): ${protocols}` - ); - } - - return { peer, protocol }; -} - export function selectConnection( connections: Connection[] ): Connection | undefined {