diff --git a/src/lib/select_peer.ts b/src/lib/select_peer.ts index 3ee0bfd2c5..9c34fbce44 100644 --- a/src/lib/select_peer.ts +++ b/src/lib/select_peer.ts @@ -1,4 +1,8 @@ +import type { PeerId } from "@libp2p/interface-peer-id"; import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; +import debug from "debug"; + +const log = debug("waku:select-peer"); /** * Returns a pseudo-random peer that supports the given protocol. @@ -29,3 +33,45 @@ export async function getPeersForProtocol( }); return peers; } + +export async function selectPeerForProtocol( + peerStore: PeerStore, + protocols: string[], + peerId?: PeerId +): Promise<{ peer: Peer; protocol: string } | undefined> { + let peer; + if (peerId) { + peer = await peerStore.get(peerId); + if (!peer) { + log( + `Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}` + ); + return; + } + } else { + const peers = await getPeersForProtocol(peerStore, protocols); + peer = selectRandomPeer(peers); + if (!peer) { + log("Failed to find known peer that registers protocols", protocols); + return; + } + } + + let protocol; + for (const codec of protocols) { + if (peer.protocols.includes(codec)) { + protocol = codec; + // Do not break as we want to keep the last value + } + } + log(`Using codec ${protocol}`); + if (!protocol) { + log( + `Peer does not register required protocols: ${peer.id.toString()}`, + protocols + ); + return; + } + + return { peer, protocol }; +} diff --git a/src/lib/waku_filter/index.ts b/src/lib/waku_filter/index.ts index fdcf1abed5..e2fa36ca06 100644 --- a/src/lib/waku_filter/index.ts +++ b/src/lib/waku_filter/index.ts @@ -11,7 +11,11 @@ import type { Libp2p } from "libp2p"; import { WakuMessage as WakuMessageProto } from "../../proto/message"; import { DefaultPubSubTopic } from "../constants"; import { selectConnection } from "../select_connection"; -import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { + getPeersForProtocol, + selectPeerForProtocol, + selectRandomPeer, +} from "../select_peer"; import { hexToBytes } from "../utils"; import { DecryptionMethod, WakuMessage } from "../waku_message"; @@ -228,23 +232,15 @@ export class WakuFilter { } private async getPeer(peerId?: PeerId): Promise { - let peer; - if (peerId) { - peer = await this.libp2p.peerStore.get(peerId); - if (!peer) { - throw new Error( - `Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}` - ); - } - } else { - peer = await this.randomPeer(); - if (!peer) { - throw new Error( - "Failed to find known peer that registers waku filter protocol" - ); - } + const res = await selectPeerForProtocol( + this.libp2p.peerStore, + [FilterCodec], + peerId + ); + if (!res) { + throw new Error(`Failed to select peer for ${FilterCodec}`); } - return peer; + return res.peer; } /** diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index 48e5b5a111..0c3db3bf20 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -10,7 +10,11 @@ import { Uint8ArrayList } from "uint8arraylist"; import { PushResponse } from "../../proto/light_push"; import { DefaultPubSubTopic } from "../constants"; import { selectConnection } from "../select_connection"; -import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { + getPeersForProtocol, + selectPeerForProtocol, + selectRandomPeer, +} from "../select_peer"; import { WakuMessage } from "../waku_message"; import { PushRPC } from "./push_rpc"; @@ -51,16 +55,16 @@ export class WakuLightPush { message: WakuMessage, opts?: PushOptions ): Promise { - let peer; - if (opts?.peerId) { - peer = await this.libp2p.peerStore.get(opts.peerId); - if (!peer) throw "Peer is unknown"; - } else { - peer = await this.randomPeer(); + const res = await selectPeerForProtocol( + this.libp2p.peerStore, + [LightPushCodec], + opts?.peerId + ); + + if (!res) { + throw new Error("Failed to get a peer"); } - if (!peer) throw "No peer available"; - if (!peer.protocols.includes(LightPushCodec)) - throw "Peer does not register waku light push protocol"; + const { peer } = res; const connections = this.libp2p.connectionManager.getConnections(peer.id); const connection = selectConnection(connections); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 0304496cbb..a2c89b6f38 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -11,7 +11,7 @@ import * as protoV2Beta4 from "../../proto/store_v2beta4"; import { HistoryResponse } from "../../proto/store_v2beta4"; import { DefaultPubSubTopic, StoreCodecs } from "../constants"; import { selectConnection } from "../select_connection"; -import { getPeersForProtocol, selectRandomPeer } from "../select_peer"; +import { getPeersForProtocol, selectPeerForProtocol } from "../select_peer"; import { hexToBytes } from "../utils"; import { DecryptionMethod, @@ -152,29 +152,17 @@ export class WakuStore { ...options, }); - let peer; - if (opts.peerId) { - peer = await this.libp2p.peerStore.get(opts.peerId); - if (!peer) - throw `Failed to retrieve connection details for provided peer in peer store: ${opts.peerId.toString()}`; - } else { - peer = await this.randomPeer(); - if (!peer) - throw "Failed to find known peer that registers waku store protocol"; - } + const res = await selectPeerForProtocol( + this.libp2p.peerStore, + Object.values(StoreCodecs), + opts?.peerId + ); - let storeCodec = ""; - for (const codec of Object.values(StoreCodecs)) { - if (peer.protocols.includes(codec)) { - storeCodec = codec; - // Do not break as we want to keep the last value - } + if (!res) { + throw new Error("Failed to get a peer"); } - log(`Use store codec ${storeCodec}`); - if (!storeCodec) - throw `Peer does not register waku store protocol: ${peer.id.toString()}`; + const { peer, protocol } = res; - Object.assign(opts, { storeCodec }); const connections = this.libp2p.connectionManager.getConnections(peer.id); const connection = selectConnection(connections); @@ -199,7 +187,7 @@ export class WakuStore { const messages: WakuMessage[] = []; let cursor = undefined; while (true) { - const stream = await connection.newStream(storeCodec); + const stream = await connection.newStream(protocol); const queryOpts = Object.assign(opts, { cursor }); const historyRpcQuery = HistoryRPC.createQuery(queryOpts); log("Querying store peer", connections[0].remoteAddr.toString()); @@ -317,13 +305,4 @@ export class WakuStore { return getPeersForProtocol(this.libp2p.peerStore, codecs); } - - /** - * Returns a random peer that supports store protocol from the address - * book (`libp2p.peerStore`). Waku may or may not be currently connected to - * this peer. - */ - async randomPeer(): Promise { - return selectRandomPeer(await this.peers()); - } }