Merge pull request #1244 from waku-org/chore/peer-exchange

This commit is contained in:
fryorcraken.eth 2023-03-21 10:00:29 +11:00 committed by GitHub
commit b8f7db21c7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 35 additions and 31 deletions

2
package-lock.json generated
View File

@ -29327,6 +29327,7 @@
"@libp2p/interfaces": "^3.3.1", "@libp2p/interfaces": "^3.3.1",
"@waku/enr": "0.0.6", "@waku/enr": "0.0.6",
"@waku/proto": "0.0.3", "@waku/proto": "0.0.3",
"@waku/utils": "*",
"debug": "^4.3.4", "debug": "^4.3.4",
"it-all": "^2.0.0", "it-all": "^2.0.0",
"it-length-prefixed": "^8.0.4", "it-length-prefixed": "^8.0.4",
@ -34240,6 +34241,7 @@
"@waku/enr": "0.0.6", "@waku/enr": "0.0.6",
"@waku/interfaces": "0.0.8", "@waku/interfaces": "0.0.8",
"@waku/proto": "0.0.3", "@waku/proto": "0.0.3",
"@waku/utils": "*",
"chai": "^4.3.7", "chai": "^4.3.7",
"cspell": "^6.28.0", "cspell": "^6.28.0",
"debug": "^4.3.4", "debug": "^4.3.4",

View File

@ -10,6 +10,7 @@ import {
ProtocolCreateOptions, ProtocolCreateOptions,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto"; import { proto_store as proto } from "@waku/proto";
import { isDefined } from "@waku/utils";
import { concat, utf8ToBytes } from "@waku/utils/bytes"; import { concat, utf8ToBytes } from "@waku/utils/bytes";
import debug from "debug"; import debug from "debug";
import all from "it-all"; import all from "it-all";
@ -345,10 +346,6 @@ async function* paginate<T extends IDecodedMessage>(
} }
} }
export function isDefined<T>(msg: T | undefined): msg is T {
return !!msg;
}
export async function createCursor( export async function createCursor(
message: IDecodedMessage, message: IDecodedMessage,
pubsubTopic: string = DefaultPubSubTopic pubsubTopic: string = DefaultPubSubTopic

View File

@ -6,7 +6,7 @@ import { IEnr } from "./enr.js";
import { PointToPointProtocol } from "./protocols.js"; import { PointToPointProtocol } from "./protocols.js";
export interface IPeerExchange extends PointToPointProtocol { export interface IPeerExchange extends PointToPointProtocol {
query(params: PeerExchangeQueryParams): Promise<PeerInfo[]>; query(params: PeerExchangeQueryParams): Promise<PeerInfo[] | undefined>;
} }
export interface PeerExchangeQueryParams { export interface PeerExchangeQueryParams {

View File

@ -54,6 +54,7 @@
"@libp2p/interfaces": "^3.3.1", "@libp2p/interfaces": "^3.3.1",
"@waku/enr": "0.0.6", "@waku/enr": "0.0.6",
"@waku/proto": "0.0.3", "@waku/proto": "0.0.3",
"@waku/utils": "*",
"debug": "^4.3.4", "debug": "^4.3.4",
"it-all": "^2.0.0", "it-all": "^2.0.0",
"it-length-prefixed": "^8.0.4", "it-length-prefixed": "^8.0.4",

View File

@ -7,6 +7,7 @@ import type {
PeerExchangeQueryParams, PeerExchangeQueryParams,
PeerInfo, PeerInfo,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { isDefined } from "@waku/utils";
import debug from "debug"; import debug from "debug";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
@ -47,7 +48,9 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
/** /**
* Make a peer exchange query to a peer * Make a peer exchange query to a peer
*/ */
async query(params: PeerExchangeQueryParams): Promise<PeerInfo[]> { async query(
params: PeerExchangeQueryParams
): Promise<PeerInfo[] | undefined> {
const { numPeers } = params; const { numPeers } = params;
const rpcQuery = PeerExchangeRPC.createRequest({ const rpcQuery = PeerExchangeRPC.createRequest({
@ -72,28 +75,24 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
bytes.append(chunk); bytes.append(chunk);
}); });
const decoded = PeerExchangeRPC.decode(bytes).response; const { response } = PeerExchangeRPC.decode(bytes);
if (!decoded) { if (!response) {
throw new Error("Failed to decode response"); log("PeerExchangeRPC message did not contains a `response` field");
return;
} }
const enrs = await Promise.all( return Promise.all(
decoded.peerInfos.map( response.peerInfos
(peerInfo) => peerInfo.enr && EnrDecoder.fromRLP(peerInfo.enr) .map((peerInfo) => peerInfo.enr)
) .filter(isDefined)
.map(async (enr) => {
return { ENR: await EnrDecoder.fromRLP(enr) };
})
); );
const peerInfos = enrs.map((enr) => {
return {
ENR: enr,
};
});
return peerInfos;
} catch (err) { } catch (err) {
log("Failed to decode push reply", err); log("Failed to decode push reply", err);
throw new Error("Failed to decode push reply"); return;
} }
} }
} }

View File

@ -155,10 +155,15 @@ export class PeerExchangeDiscovery
peerId, peerId,
}); });
if (!peerInfos) {
log("Peer exchange query failed, no peer info returned");
return;
}
for (const _peerInfo of peerInfos) { for (const _peerInfo of peerInfos) {
const { ENR } = _peerInfo; const { ENR } = _peerInfo;
if (!ENR) { if (!ENR) {
log("no ENR"); log("No ENR in peerInfo object, skipping");
continue; continue;
} }

View File

@ -4,6 +4,7 @@ import type { PeerId } from "@libp2p/interface-peer-id";
import { peerIdFromString } from "@libp2p/peer-id"; import { peerIdFromString } from "@libp2p/peer-id";
import { Multiaddr, multiaddr } from "@multiformats/multiaddr"; import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
import { DefaultPubSubTopic } from "@waku/core"; import { DefaultPubSubTopic } from "@waku/core";
import { isDefined } from "@waku/utils";
import { bytesToHex, hexToBytes } from "@waku/utils/bytes"; import { bytesToHex, hexToBytes } from "@waku/utils/bytes";
import appRoot from "app-root-path"; import appRoot from "app-root-path";
import debug from "debug"; import debug from "debug";
@ -272,12 +273,6 @@ export class Nwaku {
): Promise<MessageRpcResponse[]> { ): Promise<MessageRpcResponse[]> {
this.checkProcess(); this.checkProcess();
const isDefined = (
msg: MessageRpcResponse | undefined
): msg is MessageRpcResponse => {
return !!msg;
};
const msgs = await this.rpcCall<MessageRpcResponse[]>( const msgs = await this.rpcCall<MessageRpcResponse[]>(
"get_waku_v2_relay_v1_messages", "get_waku_v2_relay_v1_messages",
[pubsubTopic] [pubsubTopic]

View File

@ -4,6 +4,7 @@ import {
getPredefinedBootstrapNodes, getPredefinedBootstrapNodes,
} from "@waku/core/lib/predefined_bootstrap_nodes"; } from "@waku/core/lib/predefined_bootstrap_nodes";
import { createLightNode } from "@waku/create"; import { createLightNode } from "@waku/create";
import { PeerInfo } from "@waku/interfaces";
import type { LightNode } from "@waku/interfaces"; import type { LightNode } from "@waku/interfaces";
import { import {
PeerExchangeCodec, PeerExchangeCodec,
@ -117,9 +118,9 @@ describe("Peer Exchange", () => {
const numPeersToRequest = 1; const numPeersToRequest = 1;
const peerInfos = await peerExchange.query({ const peerInfos = (await peerExchange.query({
numPeers: numPeersToRequest, numPeers: numPeersToRequest,
}); })) as PeerInfo[];
expect(peerInfos.length).to.be.greaterThan(0); expect(peerInfos.length).to.be.greaterThan(0);
expect(peerInfos.length).to.be.lessThanOrEqual(numPeersToRequest); expect(peerInfos.length).to.be.lessThanOrEqual(numPeersToRequest);

View File

@ -1 +1,2 @@
export * from "./is_defined.js";
export * from "./random_subset.js"; export * from "./random_subset.js";

View File

@ -0,0 +1,3 @@
export function isDefined<T>(value: T | undefined): value is T {
return Boolean(value);
}