chore: fix peer discovery peer-exchange (#1069)

* fix: discovery for peer-exchange

use the bootstrap node as a starter to send a
peer-exchange query to, and emit the response
peers received from it for further connection to
libp2p using the peer-discovery interface

* init: test for libp2p bootstrap/discovery for
peer-exchange

* temp-add: console.logs for easier debugging

* add: peer discovery test & rm: console.logs

* chore: rm  and redundant spec test

* add: interval for peer exchange queries
we set an interval to query a peer every 5 minutes
for peer exchange, and add new peers if found

* address: reviews
- add `type` for imports not using values
- better handling for peer-exchange query interval

* chore: fix tsc for peer-exchange
use node16 for module resolution

* chore: add extra exports to fix typedoc warnings
ref: https://github.com/TypeStrong/typedoc/issues/1739
This commit is contained in:
Danish Arora 2023-01-04 14:35:44 +05:30 committed by GitHub
parent d022d8700b
commit e0e8e655f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 1648 additions and 1042 deletions

2457
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -1,4 +1,5 @@
import type { ConnectionManager } from "@libp2p/interface-connection-manager";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerStore } from "@libp2p/interface-peer-store";
import type { Registrar } from "@libp2p/interface-registrar";
@ -14,6 +15,7 @@ export interface IPeerExchange extends PointToPointProtocol {
export interface PeerExchangeQueryParams {
numPeers: number;
peerId?: PeerId;
}
export interface PeerExchangeResponse {

View File

@ -1 +1,10 @@
export * from "./waku_peer_exchange.js";
export {
wakuPeerExchange,
PeerExchangeCodec,
WakuPeerExchange,
} from "./waku_peer_exchange.js";
export {
wakuPeerExchangeDiscovery,
PeerExchangeDiscovery,
Options,
} from "./waku_peer_exchange_discovery.js";

View File

@ -26,11 +26,18 @@ export const PeerExchangeCodec = "/vac/waku/peer-exchange/2.0.0-alpha1";
const log = debug("waku:peer-exchange");
/**
* Implementation of the Peer Exchange protocol (https://rfc.vac.dev/spec/34/)
*/
export class WakuPeerExchange implements IPeerExchange {
private callback:
| ((response: PeerExchangeResponse) => Promise<void>)
| undefined;
/**
* @param components - libp2p components
* @param createOptions - Options for the protocol
*/
constructor(
public components: PeerExchangeComponents,
public createOptions?: ProtocolOptions
@ -40,6 +47,9 @@ export class WakuPeerExchange implements IPeerExchange {
.catch((e) => log("Failed to register peer exchange protocol", e));
}
/**
* Make a peer exchange query to a peer
*/
async query(
params: PeerExchangeQueryParams,
callback: (response: PeerExchangeResponse) => Promise<void>
@ -52,7 +62,7 @@ export class WakuPeerExchange implements IPeerExchange {
numPeers: BigInt(numPeers),
});
const peer = await this.getPeer();
const peer = await this.getPeer(params.peerId);
const stream = await this.newStream(peer);
@ -65,6 +75,9 @@ export class WakuPeerExchange implements IPeerExchange {
);
}
/**
* Handle a peer exchange query response
*/
private handler(streamData: IncomingStreamData): void {
const { stream } = streamData;
pipe(stream, lp.decode(), async (source) => {
@ -94,6 +107,11 @@ export class WakuPeerExchange implements IPeerExchange {
}).catch((err) => log("Failed to handle peer exchange request", err));
}
/**
*
* @param peerId - Optional peer ID to select a peer
* @returns A peer to query
*/
private async getPeer(peerId?: PeerId): Promise<Peer> {
const res = await selectPeerForProtocol(
this.components.peerStore,
@ -106,6 +124,10 @@ export class WakuPeerExchange implements IPeerExchange {
return res.peer;
}
/**
* @param peer - Peer to open a stream with
* @returns A new stream
*/
private async newStream(peer: Peer): Promise<Stream> {
const connections = this.components.connectionManager.getConnections(
peer.id
@ -118,15 +140,26 @@ export class WakuPeerExchange implements IPeerExchange {
return connection.newStream(PeerExchangeCodec);
}
/**
* @returns All peers that support the peer exchange protocol
*/
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.components.peerStore, [PeerExchangeCodec]);
}
/**
* @returns The libp2p peer store
*/
get peerStore(): PeerStore {
return this.components.peerStore;
}
}
/**
*
* @param init - Options for the protocol
* @returns A function that creates a new peer exchange protocol
*/
export function wakuPeerExchange(
init: Partial<ProtocolOptions> = {}
): (components: PeerExchangeComponents) => WakuPeerExchange {

View File

@ -3,17 +3,21 @@ import {
PeerDiscoveryEvents,
symbol,
} from "@libp2p/interface-peer-discovery";
import type { PeerId } from "@libp2p/interface-peer-id";
import { PeerInfo } from "@libp2p/interface-peer-info";
import { PeerProtocolsChangeData } from "@libp2p/interface-peer-store";
import { EventEmitter } from "@libp2p/interfaces/events";
import { PeerExchangeComponents } from "@waku/interfaces";
import debug from "debug";
import { PeerExchangeCodec } from "./waku_peer_exchange";
import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js";
const log = debug("waku:peer-exchange-discovery");
interface Options {
const DEFAULT_PEER_EXCHANGE_REQUEST_NODES = 10;
const PEER_EXCHANGE_QUERY_INTERVAL = 5 * 60 * 1000;
export interface Options {
/**
* Tag a bootstrap peer with this name before "discovering" it (default: 'bootstrap')
*/
@ -39,22 +43,52 @@ export class PeerExchangeDiscovery
implements PeerDiscovery
{
private readonly components: PeerExchangeComponents;
private readonly peerExchange: WakuPeerExchange;
private readonly options: Options;
private isStarted: boolean;
private intervals: Map<PeerId, NodeJS.Timeout> = new Map();
private readonly eventHandler = async (
event: CustomEvent<PeerProtocolsChangeData>
): Promise<void> => {
const { protocols } = event.detail;
if (!protocols.includes(PeerExchangeCodec)) return;
const { protocols, peerId } = event.detail;
if (!protocols.includes(PeerExchangeCodec) || this.intervals.get(peerId))
return;
const interval = setInterval(async () => {
await this.peerExchange.query(
{
numPeers: DEFAULT_PEER_EXCHANGE_REQUEST_NODES,
peerId,
},
async (response) => {
const { peerInfos } = response;
for (const _peerInfo of peerInfos) {
const { ENR } = _peerInfo;
if (!ENR) {
log("no ENR");
continue;
}
const { peerId, multiaddrs } = ENR;
if (!peerId) {
log("no peerId");
continue;
}
if (!multiaddrs || multiaddrs.length === 0) {
log("no multiaddrs");
continue;
}
// check if peer is already in peerStore
const existingPeer = await this.components.peerStore.get(peerId);
if (existingPeer) {
log("peer already in peerStore");
continue;
}
const { peerId } = event.detail;
const peer = await this.components.peerStore.get(peerId);
const peerInfo = {
id: peerId,
multiaddrs: peer.addresses.map((address) => address.multiaddr),
protocols: [],
};
await this.components.peerStore.tagPeer(
peerId,
DEFAULT_BOOTSTRAP_TAG_NAME,
@ -63,12 +97,28 @@ export class PeerExchangeDiscovery
ttl: this.options.tagTTL ?? DEFAULT_BOOTSTRAP_TAG_TTL,
}
);
this.dispatchEvent(new CustomEvent<PeerInfo>("peer", { detail: peerInfo }));
this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", {
detail: {
id: peerId,
multiaddrs,
protocols: [],
},
})
);
}
}
);
}, PEER_EXCHANGE_QUERY_INTERVAL);
this.intervals.set(peerId, interval);
};
constructor(components: PeerExchangeComponents, options: Options = {}) {
super();
this.components = components;
this.peerExchange = new WakuPeerExchange(components);
this.options = options;
this.isStarted = false;
}
@ -96,6 +146,7 @@ export class PeerExchangeDiscovery
if (!this.isStarted) return;
log("Stopping peer exchange node discovery");
this.isStarted = false;
this.intervals.forEach((interval) => clearInterval(interval));
this.components.peerStore.removeEventListener(
"change:protocols",
this.eventHandler
@ -110,3 +161,10 @@ export class PeerExchangeDiscovery
return "@waku/peer-exchange";
}
}
export function wakuPeerExchangeDiscovery(): (
components: PeerExchangeComponents
) => PeerExchangeDiscovery {
return (components: PeerExchangeComponents) =>
new PeerExchangeDiscovery(components);
}

View File

@ -1,54 +1,10 @@
{
"extends": "../../tsconfig",
"compilerOptions": {
"incremental": true,
"target": "es2020",
"outDir": "dist",
"outDir": "dist/",
"rootDir": "src",
"moduleResolution": "node",
"module": "es2020",
"declaration": true,
"sourceMap": true,
"esModuleInterop": true /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */,
"resolveJsonModule": true /* Include modules imported with .json extension. */,
"tsBuildInfoFile": "dist/.tsbuildinfo",
"strict": true /* Enable all strict type-checking options. */,
/* Strict Type-Checking Options */
"noImplicitAny": true /* Raise error on expressions and declarations with an implied 'any' type. */,
"strictNullChecks": true /* Enable strict null checks. */,
"strictFunctionTypes": true /* Enable strict checking of function types. */,
"strictPropertyInitialization": true /* Enable strict checking of property initialization in classes. */,
"noImplicitThis": true /* Raise error on 'this' expressions with an implied 'any' type. */,
"alwaysStrict": true /* Parse in strict mode and emit "use strict" for each source file. */,
/* Additional Checks */
"noUnusedLocals": true /* Report errors on unused locals. */,
"noUnusedParameters": true /* Report errors on unused parameters. */,
"noImplicitReturns": true /* Report error when not all code paths in function return a value. */,
"noFallthroughCasesInSwitch": true /* Report errors for fallthrough cases in switch statement. */,
"forceConsistentCasingInFileNames": true,
/* Debugging Options */
"traceResolution": false /* Report module resolution log messages. */,
"listEmittedFiles": false /* Print names of generated files part of the compilation. */,
"listFiles": false /* Print names of files part of the compilation. */,
"pretty": true /* Stylize errors and messages using color and context. */,
// Due to broken types in indirect dependencies
"skipLibCheck": true,
/* Experimental Options */
// "experimentalDecorators": true /* Enables experimental support for ES7 decorators. */,
// "emitDecoratorMetadata": true /* Enables experimental support for emitting type metadata for decorators. */,
"lib": ["es2020", "dom"],
"types": ["node", "mocha"],
"typeRoots": ["node_modules/@types", "src/types"]
"tsBuildInfoFile": "dist/.tsbuildinfo"
},
"include": ["src", ".eslintrc.js"],
"exclude": ["src/**/*.spec.ts", "src/test_utils", "dist", "bundle"],
"compileOnSave": false,
"ts-node": {
"files": true
}
"include": ["src"],
"exclude": ["src/**/*.spec.ts", "src/test_utils"]
}

View File

@ -7,15 +7,38 @@ import {
import { createLightNode } from "@waku/create";
import type { LightNode, PeerExchangeResponse } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { wakuPeerExchangeDiscovery } from "@waku/peer-exchange";
import { expect } from "chai";
describe("Peer Exchange: Node", () => {
import { delay } from "../src/delay.js";
describe("Peer Exchange", () => {
let waku: LightNode;
afterEach(async function () {
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});
it("Test Fleet: Queries successfully [Live Data]", async function () {
it("Auto discovery", async function () {
this.timeout(120_000);
waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: getPredefinedBootstrapNodes(Fleet.Test) }),
wakuPeerExchangeDiscovery(),
],
},
});
await waku.start();
await delay(1000);
await waitForRemotePeer(waku, [Protocols.PeerExchange]);
const pxPeers = await waku.peerExchange.peers();
expect(pxPeers.length).to.be.greaterThan(0);
});
it("Manual query on test fleet", async function () {
this.timeout(150_000);
// skipping in CI as this test demonstrates Peer Exchange working with the test fleet