From 6f09fbf4ed181cb2fe5a15643cf2bebdc889ec64 Mon Sep 17 00:00:00 2001 From: Danish Arora <35004822+danisharora099@users.noreply.github.com> Date: Fri, 8 Sep 2023 21:36:55 +0530 Subject: [PATCH] feat: use the lowest latency peer for protocols (#1540) * maintain pings in a hashmap * convert `KeepAliveManager` into a singleton * chore: fix an unrelated cyclic dependency error * update `selectPeerForProtocol` to return peer with the lowest latency * use the new KeepAliveManager API * use the new API for `selectPeerForProtocol` * add tests * use PeerData to hold the ping instead of a new variable * improve tests for readability * move back KeepAliveManager from singleton * reenable all tests * minor improvements * improve error handling * convert .then() syntax to async/await --- package-lock.json | 7 +- packages/core/src/lib/connection_manager.ts | 6 +- packages/core/src/lib/keep_alive_manager.ts | 36 ++++- packages/tests/package.json | 7 +- packages/tests/tests/utils.spec.ts | 157 +++++++++++++++++++- packages/utils/src/libp2p/index.ts | 52 ++++++- 6 files changed, 247 insertions(+), 18 deletions(-) diff --git a/package-lock.json b/package-lock.json index 8480b34169..f891bac055 100644 --- a/package-lock.json +++ b/package-lock.json @@ -7370,7 +7370,8 @@ }, "node_modules/chai-as-promised": { "version": "7.1.1", - "license": "WTFPL", + "resolved": "https://registry.npmjs.org/chai-as-promised/-/chai-as-promised-7.1.1.tgz", + "integrity": "sha512-azL6xMoi+uxu6z4rhWQ1jbdUhOMhis2PvscD/xjLqNMkv3BPPp2JyyuTHOrf9BOosGpNQ11v6BKv/g57RXbiaA==", "dependencies": { "check-error": "^1.0.2" }, @@ -27384,6 +27385,7 @@ "@waku/interfaces": "*", "@waku/utils": "*", "app-root-path": "^3.1.0", + "chai-as-promised": "^7.1.1", "debug": "^4.3.4", "dockerode": "^3.3.5", "p-timeout": "^6.1.0", @@ -31445,6 +31447,7 @@ "@waku/utils": "*", "app-root-path": "^3.1.0", "chai": "^4.3.7", + "chai-as-promised": "^7.1.1", "cspell": "^7.3.2", "datastore-core": "^9.2.2", "debug": "^4.3.4", @@ -32756,6 +32759,8 @@ }, "chai-as-promised": { "version": "7.1.1", + "resolved": "https://registry.npmjs.org/chai-as-promised/-/chai-as-promised-7.1.1.tgz", + "integrity": "sha512-azL6xMoi+uxu6z4rhWQ1jbdUhOMhis2PvscD/xjLqNMkv3BPPp2JyyuTHOrf9BOosGpNQ11v6BKv/g57RXbiaA==", "requires": { "check-error": "^1.0.2" } diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 30a051aaf7..293c5bb11f 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -340,7 +340,11 @@ export class ConnectionManager void (async () => { const peerId = evt.detail; - this.keepAliveManager.start(peerId, this.libp2p.services.ping); + this.keepAliveManager.start( + peerId, + this.libp2p.services.ping, + this.libp2p.peerStore + ); const isBootstrap = (await this.getTagNamesForPeer(peerId)).includes( Tags.BOOTSTRAP diff --git a/packages/core/src/lib/keep_alive_manager.ts b/packages/core/src/lib/keep_alive_manager.ts index b72c6743a5..6227c08257 100644 --- a/packages/core/src/lib/keep_alive_manager.ts +++ b/packages/core/src/lib/keep_alive_manager.ts @@ -1,10 +1,12 @@ import type { PeerId } from "@libp2p/interface/peer-id"; +import type { PeerStore } from "@libp2p/interface/peer-store"; import type { IRelay } from "@waku/interfaces"; import type { KeepAliveOptions } from "@waku/interfaces"; +import { utf8ToBytes } from "@waku/utils/bytes"; import debug from "debug"; import type { PingService } from "libp2p/ping"; -import { createEncoder } from "../index.js"; +import { createEncoder } from "./message/version_0.js"; export const RelayPingContentTopic = "/relay-ping/1/ping/null"; const log = debug("waku:keep-alive"); @@ -22,8 +24,12 @@ export class KeepAliveManager { this.relay = relay; } - public start(peerId: PeerId, libp2pPing: PingService): void { - // Just in case a timer already exist for this peer + public start( + peerId: PeerId, + libp2pPing: PingService, + peerStore: PeerStore + ): void { + // Just in case a timer already exists for this peer this.stop(peerId); const { pingKeepAlive: pingPeriodSecs, relayKeepAlive: relayPeriodSecs } = @@ -33,10 +39,28 @@ export class KeepAliveManager { if (pingPeriodSecs !== 0) { const interval = setInterval(() => { - libp2pPing.ping(peerId).catch((e) => { - log(`Ping failed (${peerIdStr})`, e); - }); + void (async () => { + try { + // ping the peer for keep alive + // also update the peer store with the latency + const ping = await libp2pPing.ping(peerId); + log(`Ping succeeded (${peerIdStr})`, ping); + + try { + await peerStore.patch(peerId, { + metadata: { + ping: utf8ToBytes(ping.toString()) + } + }); + } catch (e) { + log("Failed to update ping", e); + } + } catch (e) { + log(`Ping failed (${peerIdStr})`, e); + } + })(); }, pingPeriodSecs * 1000); + this.pingKeepAliveTimers.set(peerIdStr, interval); } diff --git a/packages/tests/package.json b/packages/tests/package.json index aa5c510b98..fcc3d61ea5 100644 --- a/packages/tests/package.json +++ b/packages/tests/package.json @@ -57,6 +57,7 @@ "@waku/interfaces": "*", "@waku/utils": "*", "app-root-path": "^3.1.0", + "chai-as-promised": "^7.1.1", "debug": "^4.3.4", "dockerode": "^3.3.5", "p-timeout": "^6.1.0", @@ -66,20 +67,20 @@ }, "devDependencies": { "@libp2p/bootstrap": "^9.0.2", - "@types/sinon": "^10.0.16", "@types/chai": "^4.3.5", "@types/dockerode": "^3.3.19", "@types/mocha": "^10.0.1", + "@types/sinon": "^10.0.16", "@types/tail": "^2.2.1", "@typescript-eslint/eslint-plugin": "^5.57.0", "@typescript-eslint/parser": "^6.6.0", - "@waku/sdk": "*", "@waku/dns-discovery": "*", "@waku/message-encryption": "*", "@waku/peer-exchange": "*", + "@waku/sdk": "*", "chai": "^4.3.7", - "datastore-core": "^9.2.2", "cspell": "^7.3.2", + "datastore-core": "^9.2.2", "debug": "^4.3.4", "interface-datastore": "^8.2.3", "libp2p": "^0.46.9", diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index 42935d60ec..fc31cbc8a1 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -1,3 +1,6 @@ +import type { PeerStore } from "@libp2p/interface/peer-store"; +import type { Peer } from "@libp2p/interface/peer-store"; +import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { createDecoder, createEncoder, @@ -9,11 +12,16 @@ import { Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { toAsyncIterator } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; -import { expect } from "chai"; +import { selectPeerForProtocol } from "@waku/utils/libp2p"; +import chai, { expect } from "chai"; +import chaiAsPromised from "chai-as-promised"; +import sinon from "sinon"; -import { makeLogFileName, NOISE_KEY_1 } from "../src/index.js"; +import { delay, makeLogFileName, NOISE_KEY_1 } from "../src/index.js"; import { NimGoNode } from "../src/node/node.js"; +chai.use(chaiAsPromised); + const TestContentTopic = "/test/1/waku-filter"; const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); const TestDecoder = createDecoder(TestContentTopic); @@ -106,3 +114,148 @@ describe("Util: toAsyncIterator: Filter", () => { expect(result.done).to.eq(true); }); }); + +const TestCodec = "test/1"; + +describe("selectPeerForProtocol", () => { + let peerStore: PeerStore; + const protocols = [TestCodec]; + + let lowPingPeer: Peer, + midPingPeer: Peer, + highPingPeer: Peer, + differentCodecPeer: Peer, + anotherDifferentCodecPeer: Peer; + + beforeEach(async function () { + this.timeout(10000); + const waku = await createLightNode(); + await waku.start(); + await delay(3000); + peerStore = waku.libp2p.peerStore; + + const [ + lowPingPeerId, + midPingPeerId, + highPingPeerId, + differentCodecPeerId, + anotherDifferentCodecPeerId + ] = await Promise.all([ + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId(), + createSecp256k1PeerId() + ]); + + lowPingPeer = { + id: lowPingPeerId, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("50")) + } as Peer; + + midPingPeer = { + id: midPingPeerId, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("100")) + } as Peer; + + highPingPeer = { + id: highPingPeerId, + protocols: [TestCodec], + metadata: new Map().set("ping", utf8ToBytes("500")) + } as Peer; + + differentCodecPeer = { + id: differentCodecPeerId, + protocols: ["DifferentCodec"] + } as Peer; + + anotherDifferentCodecPeer = { + id: anotherDifferentCodecPeerId, + protocols: ["AnotherDifferentCodec"] + } as Peer; + }); + + afterEach(() => { + sinon.restore(); + }); + + it("should return the peer with the lowest ping", async function () { + const mockPeers = [highPingPeer, lowPingPeer, midPingPeer]; + + sinon.stub(peerStore, "get").callsFake(async (peerId) => { + return mockPeers.find((peer) => peer.id.equals(peerId))!; + }); + + sinon.stub(peerStore, "forEach").callsFake(async (callback) => { + for (const peer of mockPeers) { + callback(peer); + } + }); + + const result = await selectPeerForProtocol(peerStore, protocols); + + expect(result.peer).to.deep.equal(lowPingPeer); + expect(result.protocol).to.equal(TestCodec); + }); + + it("should return the peer with the provided peerId", async function () { + const targetPeer = await createSecp256k1PeerId(); + const mockPeer = { id: targetPeer, protocols: [TestCodec] } as Peer; + sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer); + + const result = await selectPeerForProtocol( + peerStore, + protocols, + targetPeer + ); + expect(result.peer).to.deep.equal(mockPeer); + }); + + it("should return a random peer when all peers have the same latency", async function () { + const mockPeers = [highPingPeer, highPingPeer, highPingPeer]; + + sinon.stub(peerStore, "get").callsFake(async (peerId) => { + return mockPeers.find((peer) => peer.id.equals(peerId))!; + }); + + sinon.stub(peerStore, "forEach").callsFake(async (callback) => { + for (const peer of mockPeers) { + callback(peer); + } + }); + + const result = await selectPeerForProtocol(peerStore, protocols); + + expect(mockPeers).to.deep.include(result.peer); + }); + + it("should throw an error when no peer matches the given protocols", async function () { + const mockPeers = [differentCodecPeer, anotherDifferentCodecPeer]; + + sinon.stub(peerStore, "forEach").callsFake(async (callback) => { + for (const peer of mockPeers) { + callback(peer); + } + }); + + await expect( + selectPeerForProtocol(peerStore, protocols) + ).to.be.rejectedWith( + `Failed to find known peer that registers protocols: ${protocols}` + ); + }); + + it("should throw an error when the selected peer does not register the required protocols", async function () { + const targetPeer = await createSecp256k1PeerId(); + const mockPeer = { id: targetPeer, protocols: ["DifferentCodec"] } as Peer; + sinon.stub(peerStore, "get").withArgs(targetPeer).resolves(mockPeer); + + await expect( + selectPeerForProtocol(peerStore, protocols, targetPeer) + ).to.be.rejectedWith( + `Peer does not register required protocols (${targetPeer.toString()}): ${protocols}` + ); + }); +}); diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 6697ff10fe..4f37f15c0f 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -3,6 +3,8 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { Peer, PeerStore } from "@libp2p/interface/peer-store"; import debug from "debug"; +import { bytesToUtf8 } from "../bytes/index.js"; + const log = debug("waku:libp2p-utils"); /** @@ -16,6 +18,38 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined { return peers[index]; } +/** + * Returns the peer with the lowest latency. + * @param peerStore - The Libp2p PeerStore + * @param peers - The list of peers to choose from + * @returns The peer with the lowest latency, or undefined if no peer could be reached + */ +export async function selectLowestLatencyPeer( + peerStore: PeerStore, + peers: Peer[] +): Promise { + if (peers.length === 0) return; + + const results = await Promise.all( + peers.map(async (peer) => { + const pingBytes = (await peerStore.get(peer.id)).metadata.get("ping"); + if (!pingBytes) return { peer, ping: Infinity }; + + const ping = Number(bytesToUtf8(pingBytes)) ?? Infinity; + return { peer, ping }; + }) + ); + + const lowestLatencyResult = results.sort((a, b) => a.ping - b.ping)[0]; + if (!lowestLatencyResult) { + return undefined; + } + + return lowestLatencyResult.ping !== Infinity + ? lowestLatencyResult.peer + : undefined; +} + /** * Returns the list of peers that supports the given protocol. */ @@ -35,12 +69,18 @@ export async function getPeersForProtocol( return peers; } +/** + * Returns a peer that supports the given protocol. + * If peerId is provided, the peer with that id is returned. + * Otherwise, the peer with the lowest latency is returned. + * If no peer is found from the above criteria, a random peer is returned. + */ export async function selectPeerForProtocol( peerStore: PeerStore, protocols: string[], peerId?: PeerId ): Promise<{ peer: Peer; protocol: string }> { - let peer; + let peer: Peer | undefined; if (peerId) { peer = await peerStore.get(peerId); if (!peer) { @@ -50,11 +90,13 @@ export async function selectPeerForProtocol( } } else { const peers = await getPeersForProtocol(peerStore, protocols); - peer = selectRandomPeer(peers); + peer = await selectLowestLatencyPeer(peerStore, peers); if (!peer) { - throw new Error( - `Failed to find known peer that registers protocols: ${protocols}` - ); + peer = selectRandomPeer(peers); + if (!peer) + throw new Error( + `Failed to find known peer that registers protocols: ${protocols}` + ); } }