Implement libp2p bootstrap interface

This commit is contained in:
Franck Royer 2022-06-22 15:12:14 +10:00
parent 081f62b07b
commit 4cf197e54d
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
5 changed files with 797 additions and 578 deletions

View File

@ -14,7 +14,10 @@
],
"globals": { "BigInt": true, "console": true, "WebAssembly": true },
"rules": {
"@typescript-eslint/explicit-function-return-type": ["error"],
"@typescript-eslint/explicit-function-return-type": [
"error",
{ "allowExpressions": true }
],
"@typescript-eslint/explicit-module-boundary-types": "off",
"eslint-comments/disable-enable-pair": [
"error",

1143
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -69,10 +69,14 @@
"@chainsafe/libp2p-gossipsub": "^2.0.0",
"@chainsafe/libp2p-noise": "^7.0.0",
"@ethersproject/rlp": "^5.5.0",
"@libp2p/interface-peer-discovery": "^1.0.0",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-peer-info": "^1.0.1",
"@libp2p/interface-peer-store": "^1.0.0",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/mplex": "^3.0.0",
"@libp2p/peer-id": "^1.1.10",
"@libp2p/websockets": "^3.0.0",
"@multiformats/multiaddr": "^10.2.0",
"@noble/secp256k1": "^1.3.4",
"debug": "^4.3.4",
@ -83,10 +87,7 @@
"it-pipe": "^1.1.0",
"js-sha3": "^0.8.0",
"libp2p": "^0.37.3",
"libp2p-bootstrap": "^0.14.0",
"libp2p-crypto": "^0.21.2",
"libp2p-mplex": "^0.10.4",
"libp2p-websockets": "^0.16.1",
"multiformats": "^9.6.5",
"protons-runtime": "^1.0.4",
"uint8arrays": "^3.0.0",

View File

@ -1,3 +1,11 @@
import type {
PeerDiscovery,
PeerDiscoveryEvents,
} from "@libp2p/interface-peer-discovery";
import { symbol } from "@libp2p/interface-peer-discovery";
import type { PeerInfo } from "@libp2p/interface-peer-info";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import { peerIdFromString } from "@libp2p/peer-id/src";
import { Multiaddr } from "@multiformats/multiaddr";
import debug from "debug";
@ -5,7 +13,7 @@ import { DnsNodeDiscovery, NodeCapabilityCount } from "./dns";
import { getPredefinedBootstrapNodes } from "./predefined";
import { getPseudoRandomSubset } from "./random_subset";
const dbg = debug("waku:discovery:bootstrap");
const log = debug("waku:discovery:bootstrap");
/**
* Setup discovery method used to bootstrap.
@ -35,8 +43,14 @@ export interface BootstrapOptions {
peers?: string[] | Multiaddr[];
/**
* Getter that retrieve multiaddrs of peers to connect to.
* will be called once.
*/
getPeers?: () => Promise<string[] | Multiaddr[]>;
/**
* The interval between emitting addresses in milliseconds.
* Used if [[peers]] is passed or a sync function is passed for [[getPeers]]
*/
interval?: number;
/**
* An EIP-1459 ENR Tree URL. For example:
* "enrtree://AOFTICU2XWDULNLZGRMQS4RIZPAZEHYMV4FYHAPW563HNRAOERP7C@test.nodes.vac.dev"
@ -56,23 +70,50 @@ export interface BootstrapOptions {
*
* @throws if an invalid combination of options is passed, see [[BootstrapOptions]] for details.
*/
export class Bootstrap {
public static DefaultMaxPeers = 1;
export class Bootstrap
extends EventEmitter<PeerDiscoveryEvents>
implements PeerDiscovery
{
static DefaultMaxPeers = 1;
public readonly getBootstrapPeers: (() => Promise<Multiaddr[]>) | undefined;
private readonly asyncGetBootstrapPeers:
| (() => Promise<Multiaddr[]>)
| undefined;
private peers: PeerInfo[];
private timer?: ReturnType<typeof setInterval>;
private readonly interval: number;
constructor(opts: BootstrapOptions) {
super();
const methods = [
!!opts.default,
!!opts.peers,
!!opts.getPeers,
!!opts.enrUrl,
].filter((x) => x);
if (methods.length > 1) {
throw new Error(
"Bootstrap does not support several discovery methods (yet)"
);
}
this.interval = opts.interval ?? 10000;
opts.default =
opts.default ?? (!opts.peers && !opts.getPeers && !opts.enrUrl);
const maxPeers = opts.maxPeers ?? Bootstrap.DefaultMaxPeers;
this.peers = [];
if (opts.default) {
dbg("Use hosted list of peers.");
log("Use hosted list of peers.");
this.getBootstrapPeers = (): Promise<Multiaddr[]> => {
return Promise.resolve(
getPredefinedBootstrapNodes(undefined, maxPeers)
);
};
} else if (opts.peers !== undefined && opts.peers.length > 0) {
this.peers = multiaddrsToPeerInfo(
getPredefinedBootstrapNodes(undefined, maxPeers)
);
return;
}
if (!!opts.peers && opts.peers.length > 0) {
const allPeers: Multiaddr[] = opts.peers.map(
(node: string | Multiaddr) => {
if (typeof node === "string") {
@ -82,41 +123,121 @@ export class Bootstrap {
}
}
);
const peers = getPseudoRandomSubset(allPeers, maxPeers);
dbg(
"Use provided list of peers (reduced to maxPeers)",
allPeers.map((ma) => ma.toString())
this.peers = multiaddrsToPeerInfo(
getPseudoRandomSubset(allPeers, maxPeers)
);
this.getBootstrapPeers = (): Promise<Multiaddr[]> =>
Promise.resolve(peers);
} else if (typeof opts.getPeers === "function") {
dbg("Bootstrap: Use provided getPeers function.");
log(
"Use provided list of peers (reduced to maxPeers)",
this.peers.map((ma) => ma.toString())
);
return;
}
if (typeof opts.getPeers === "function") {
log("Bootstrap: Use provided getPeers function.");
const getPeers = opts.getPeers;
this.getBootstrapPeers = async (): Promise<Multiaddr[]> => {
this.asyncGetBootstrapPeers = async () => {
const allPeers = await getPeers();
return getPseudoRandomSubset<string | Multiaddr>(
allPeers,
maxPeers
).map((node) => new Multiaddr(node));
};
} else if (opts.enrUrl) {
return;
}
if (opts.enrUrl) {
const wantedNodeCapabilityCount = opts.wantedNodeCapabilityCount;
if (!wantedNodeCapabilityCount)
throw "`wantedNodeCapabilityCount` must be defined when using `enrUrl`";
const enrUrl = opts.enrUrl;
dbg("Use provided EIP-1459 ENR Tree URL.");
log("Use provided EIP-1459 ENR Tree URL.");
const dns = DnsNodeDiscovery.dnsOverHttp();
this.getBootstrapPeers = async (): Promise<Multiaddr[]> => {
this.asyncGetBootstrapPeers = async () => {
const enrs = await dns.getPeers([enrUrl], wantedNodeCapabilityCount);
dbg(`Found ${enrs.length} peers`);
log(`Found ${enrs.length} peers`);
return enrs.map((enr) => enr.getFullMultiaddrs()).flat();
};
} else {
dbg("No bootstrap method specified, no peer will be returned");
this.getBootstrapPeers = undefined;
return;
}
}
/**
* Start discovery process
*/
start(): void {
if (this.asyncGetBootstrapPeers) {
// TODO: This should emit the peer as they are discovered instead of having
// to wait for the full DNS discovery process to be done first.
// TODO: PeerInfo should be returned by discovery
this.asyncGetBootstrapPeers().then((peers) => {
this.peers = multiaddrsToPeerInfo(peers);
this._startTimer();
});
} else {
this._startTimer();
}
}
private _startTimer(): void {
if (this.peers) {
log("Starting bootstrap node discovery");
if (this.timer != null) {
return;
}
this.timer = setInterval(() => this._returnPeers(), this.interval);
this._returnPeers();
}
}
_returnPeers(): void {
if (this.timer == null) {
return;
}
this.peers.forEach((peerData) => {
this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", { detail: peerData })
);
});
}
/**
* Stop emitting events
*/
stop(): void {
if (this.timer != null) {
clearInterval(this.timer);
}
this.timer = undefined;
}
get [symbol](): true {
return true;
}
get [Symbol.toStringTag](): string {
return "@waku/bootstrap";
}
}
function multiaddrsToPeerInfo(mas: Multiaddr[]): PeerInfo[] {
return mas
.map((ma) => {
const peerIdStr = ma.getPeerId();
const protocols: string[] = [];
return {
id: peerIdStr ? peerIdFromString(peerIdStr) : null,
multiaddrs: [ma.decapsulateCode(421)],
protocols,
};
})
.filter((peerInfo): peerInfo is PeerInfo => peerInfo.id !== null);
}

View File

@ -1,19 +1,13 @@
import { Noise } from "@chainsafe/libp2p-noise";
import type { PeerId } from "@libp2p/interface-peer-id";
import { MuxedStream } from "@libp2p/interfaces/stream-muxer/types";
import { Mplex } from "@libp2p/mplex";
import { WebSockets } from "@libp2p/websockets";
import filters from "@libp2p/websockets/filters";
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
import debug from "debug";
import { createLibp2p, Libp2p, Libp2pOptions } from "libp2p";
import Libp2pBootstrap from "libp2p-bootstrap";
import { MuxedStream } from "libp2p-interfaces/dist/src/stream-muxer/types";
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: No types available
import Mplex from "libp2p-mplex";
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: No types available
import Websockets from "libp2p-websockets";
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: No types available
import filters from "libp2p-websockets/src/filters";
import PingService from "libp2p/src/ping";
import { Bootstrap, BootstrapOptions } from "./discovery";
@ -24,8 +18,6 @@ import { WakuRelay } from "./waku_relay";
import { RelayCodecs, RelayPingContentTopic } from "./waku_relay/constants";
import { StoreCodecs, WakuStore } from "./waku_store";
const websocketsTransportKey = Websockets.prototype[Symbol.toStringTag];
export const DefaultPingKeepAliveValueSecs = 0;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
@ -93,6 +85,35 @@ export interface CreateOptions {
decryptionKeys?: Array<Uint8Array | string>;
}
export async function createWaku(): Promise<Waku> {
const libp2pOpts = {
transports: new WebSockets({ filter: filters.all }),
streamMuxers: [new Mplex()],
pubsub: new WakuRelay(),
connectionEncryption: [new Noise()],
};
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: modules property is correctly set thanks to voodoo
const libp2p = await createLibp2p(libp2pOpts);
const wakuStore = new WakuStore(libp2p, {
pubSubTopic: options?.pubSubTopic,
});
const wakuLightPush = new WakuLightPush(libp2p);
const wakuFilter = new WakuFilter(libp2p);
await libp2p.start();
return new Waku(
options ? options : {},
libp2p,
wakuStore,
wakuLightPush,
wakuFilter
);
}
export class Waku {
public libp2p: Libp2p;
public relay: WakuRelay;