diff --git a/packages/interfaces/src/peer_exchange.ts b/packages/interfaces/src/peer_exchange.ts index 9e713fcdb6..55d14012aa 100644 --- a/packages/interfaces/src/peer_exchange.ts +++ b/packages/interfaces/src/peer_exchange.ts @@ -3,12 +3,14 @@ import type { PeerStore } from "@libp2p/interface"; import type { ConnectionManager } from "@libp2p/interface-internal"; import { IEnr } from "./enr.js"; -import { IBaseProtocol } from "./protocols.js"; +import { IBaseProtocol, ProtocolResult } from "./protocols.js"; export interface IPeerExchange extends IBaseProtocol { - query(params: PeerExchangeQueryParams): Promise; + query(params: PeerExchangeQueryParams): Promise; } +export type PeerExchangeResult = ProtocolResult<"peerInfos", PeerInfo[]>; + export interface PeerExchangeQueryParams { numPeers: number; peerId: PeerId; diff --git a/packages/peer-exchange/src/waku_peer_exchange.ts b/packages/peer-exchange/src/waku_peer_exchange.ts index 9471b72271..babb6bf688 100644 --- a/packages/peer-exchange/src/waku_peer_exchange.ts +++ b/packages/peer-exchange/src/waku_peer_exchange.ts @@ -1,10 +1,11 @@ import { BaseProtocol } from "@waku/core/lib/base_protocol"; import { EnrDecoder } from "@waku/enr"; -import type { +import { IPeerExchange, Libp2pComponents, PeerExchangeQueryParams, - PeerInfo, + PeerExchangeResult, + ProtocolError, PubsubTopic } from "@waku/interfaces"; import { isDefined } from "@waku/utils"; @@ -34,18 +35,18 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { /** * Make a peer exchange query to a peer */ - async query( - params: PeerExchangeQueryParams - ): Promise { + async query(params: PeerExchangeQueryParams): Promise { const { numPeers } = params; - const rpcQuery = PeerExchangeRPC.createRequest({ numPeers: BigInt(numPeers) }); const peer = await this.peerStore.get(params.peerId); if (!peer) { - throw new Error(`Peer ${params.peerId.toString()} not found`); + return { + peerInfos: null, + error: ProtocolError.NO_PEER_AVAILABLE + }; } const stream = await this.getStream(peer); @@ -65,15 +66,17 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { }); const { response } = PeerExchangeRPC.decode(bytes); - if (!response) { log.error( "PeerExchangeRPC message did not contains a `response` field" ); - return; + return { + peerInfos: null, + error: ProtocolError.EMPTY_PAYLOAD + }; } - return Promise.all( + const peerInfos = await Promise.all( response.peerInfos .map((peerInfo) => peerInfo.enr) .filter(isDefined) @@ -81,9 +84,17 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { return { ENR: await EnrDecoder.fromRLP(enr) }; }) ); + + return { + peerInfos, + error: null + }; } catch (err) { log.error("Failed to decode push reply", err); - return; + return { + peerInfos: null, + error: ProtocolError.DECODE_FAILED + }; } } } diff --git a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts index dc49339823..bbf3ca6ff6 100644 --- a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts +++ b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts @@ -7,7 +7,12 @@ import type { PeerId, PeerInfo } from "@libp2p/interface"; -import { Libp2pComponents, PubsubTopic, Tags } from "@waku/interfaces"; +import { + Libp2pComponents, + PeerExchangeResult, + PubsubTopic, + Tags +} from "@waku/interfaces"; import { encodeRelayShard, Logger } from "@waku/utils"; import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js"; @@ -160,15 +165,15 @@ export class PeerExchangeDiscovery }, queryInterval * currentAttempt); }; - private async query(peerId: PeerId): Promise { - const peerInfos = await this.peerExchange.query({ + private async query(peerId: PeerId): Promise { + const { error, peerInfos } = await this.peerExchange.query({ numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES, peerId }); - if (!peerInfos) { - log.error("Peer exchange query failed, no peer info returned"); - return; + if (error) { + log.error("Peer exchange query failed", error); + return { error, peerInfos: null }; } for (const _peerInfo of peerInfos) { @@ -214,6 +219,8 @@ export class PeerExchangeDiscovery }) ); } + + return { error: null, peerInfos }; } private abortQueriesForPeer(peerIdStr: string): void { diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index a5f6f241f7..3b00f57f84 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -20,8 +20,8 @@ import { IRelay, Libp2p, ProtocolCreateOptions, + ProtocolError, PubsubTopic, - SendError, SendResult } from "@waku/interfaces"; import { isWireSizeUnderCap, toAsyncIterator } from "@waku/utils"; @@ -107,7 +107,7 @@ class Relay implements IRelay { log.error("Failed to send waku relay: topic not configured"); return { recipients, - errors: [SendError.TOPIC_NOT_CONFIGURED] + errors: [ProtocolError.TOPIC_NOT_CONFIGURED] }; } @@ -116,7 +116,7 @@ class Relay implements IRelay { log.error("Failed to encode message, aborting publish"); return { recipients, - errors: [SendError.ENCODE_FAILED] + errors: [ProtocolError.ENCODE_FAILED] }; } @@ -124,7 +124,7 @@ class Relay implements IRelay { log.error("Failed to send waku relay: message is bigger that 1MB"); return { recipients, - errors: [SendError.SIZE_TOO_BIG] + errors: [ProtocolError.SIZE_TOO_BIG] }; }