feat: split bootstrap logic

Split the bootstrap logic in 2 different classes that implement the
libp2p peer discovery class.

This enables better tree shaking when not using the heaviest version
(DNS Discovery).

It also means using libp2p interface directly when customizing the peer
discovery logic.

Finally, the `default` method is still available via the
`defaultBootstrap` option.
This commit is contained in:
fryorcraken.eth 2022-08-03 15:08:12 +10:00 committed by fryorcraken.eth
parent c1b158cbf0
commit 889ec4d45c
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
18 changed files with 210 additions and 284 deletions

View File

@ -6,9 +6,6 @@ export {
getPublicKey,
} from "./lib/crypto";
export { getPredefinedBootstrapNodes } from "./lib/discovery";
export * as discovery from "./lib/discovery";
export * as enr from "./lib/enr";
export * as utils from "./lib/utils";

View File

@ -1,10 +1,13 @@
import { Noise } from "@chainsafe/libp2p-noise";
import type { PeerDiscovery } from "@libp2p/interface-peer-discovery";
import { Mplex } from "@libp2p/mplex";
import { WebSockets } from "@libp2p/websockets";
import { all as filterAll } from "@libp2p/websockets/filters";
import { createLibp2p, Libp2pOptions } from "libp2p";
import type { Libp2p } from "libp2p";
import { Bootstrap, BootstrapOptions } from "./discovery";
import { getPredefinedBootstrapNodes } from "./peer_discovery_dns/predefined";
import { PeerDiscoveryStaticPeers } from "./peer_discovery_static_list";
import { Waku, WakuOptions } from "./waku";
import { WakuFilter } from "./waku_filter";
import { WakuLightPush } from "./waku_light_push";
@ -48,29 +51,20 @@ export interface CreateOptions {
* Note: It overrides any other peerDiscovery modules that may have been set via
* {@link CreateOptions.libp2p}.
*/
bootstrap?: BootstrapOptions;
defaultBootstrap?: boolean;
}
export async function createWaku(
options?: CreateOptions & WakuOptions
): Promise<Waku> {
const peerDiscovery = [];
if (options?.bootstrap) {
peerDiscovery.push(new Bootstrap(options?.bootstrap));
const libp2pOptions = options?.libp2p ?? {};
const peerDiscovery = libp2pOptions.peerDiscovery ?? [];
if (options?.defaultBootstrap) {
peerDiscovery.push(defaultPeerDiscovery());
Object.assign(libp2pOptions, { peerDiscovery });
}
const libp2pOpts = Object.assign(
{
transports: [new WebSockets({ filter: filterAll })],
streamMuxers: [new Mplex()],
pubsub: new WakuRelay(options),
connectionEncryption: [new Noise()],
peerDiscovery: peerDiscovery,
},
options?.libp2p ?? {}
);
const libp2p = await createLibp2p(libp2pOpts);
const libp2p = await defaultLibp2p(new WakuRelay(options), libp2pOptions);
const wakuStore = new WakuStore(libp2p, options);
const wakuLightPush = new WakuLightPush(libp2p, options);
@ -78,3 +72,24 @@ export async function createWaku(
return new Waku(options ?? {}, libp2p, wakuStore, wakuLightPush, wakuFilter);
}
export function defaultPeerDiscovery(): PeerDiscovery {
return new PeerDiscoveryStaticPeers(getPredefinedBootstrapNodes());
}
export async function defaultLibp2p(
wakuRelay: WakuRelay,
options?: Partial<Libp2pOptions>
): Promise<Libp2p> {
const libp2pOpts = Object.assign(
{
transports: [new WebSockets({ filter: filterAll })],
streamMuxers: [new Mplex()],
connectionEncryption: [new Noise()],
},
{ pubsub: wakuRelay },
options ?? {}
);
return createLibp2p(libp2pOpts);
}

View File

@ -1,244 +0,0 @@
import type {
PeerDiscovery,
PeerDiscoveryEvents,
} from "@libp2p/interface-peer-discovery";
import { symbol } from "@libp2p/interface-peer-discovery";
import type { PeerInfo } from "@libp2p/interface-peer-info";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import { peerIdFromString } from "@libp2p/peer-id";
import { Multiaddr } from "@multiformats/multiaddr";
import debug from "debug";
import { DnsNodeDiscovery, NodeCapabilityCount } from "./dns";
import { getPredefinedBootstrapNodes } from "./predefined";
import { getPseudoRandomSubset } from "./random_subset";
const log = debug("waku:discovery:bootstrap");
/**
* Setup discovery method used to bootstrap.
*
* Only one method is used. [[default]], [[peers]], [[getPeers]] and [[enrUrl]] options are mutually exclusive.
*/
export interface BootstrapOptions {
/**
* The maximum of peers to connect to as part of the bootstrap process.
* This only applies if [[peers]] or [[getPeers]] is used.
*
* @default [[Bootstrap.DefaultMaxPeers]]
*/
maxPeers?: number;
/**
* Use the default discovery method. Overrides all other options but `maxPeers`
*
* The default discovery method is likely to change overtime as new discovery
* methods are implemented.
*
* @default false
*/
default?: boolean;
/**
* Multiaddrs of peers to connect to.
*/
peers?: string[] | Multiaddr[];
/**
* Getter that retrieve multiaddrs of peers to connect to.
* will be called once.
*/
getPeers?: () => Promise<string[] | Multiaddr[]>;
/**
* The interval between emitting addresses in milliseconds.
* Used if [[peers]] is passed or a sync function is passed for [[getPeers]]
*/
interval?: number;
/**
* An EIP-1459 ENR Tree URL. For example:
* "enrtree://AOFTICU2XWDULNLZGRMQS4RIZPAZEHYMV4FYHAPW563HNRAOERP7C@test.nodes.vac.dev"
*
* [[wantedNodeCapabilityCount]] MUST be passed when using this option.
*/
enrUrl?: string;
/**
* Specifies what node capabilities (protocol) must be returned.
* This only applies when [[enrUrl]] is passed (EIP-1459 DNS Discovery).
*/
wantedNodeCapabilityCount?: Partial<NodeCapabilityCount>;
}
/**
* Parse options and expose function to return bootstrap peer addresses.
*
* @throws if an invalid combination of options is passed, see [[BootstrapOptions]] for details.
*/
export class Bootstrap
extends EventEmitter<PeerDiscoveryEvents>
implements PeerDiscovery
{
static DefaultMaxPeers = 1;
private readonly asyncGetBootstrapPeers:
| (() => Promise<Multiaddr[]>)
| undefined;
private peers: PeerInfo[];
private timer?: ReturnType<typeof setInterval>;
private readonly interval: number;
constructor(opts?: BootstrapOptions) {
super();
opts = opts ?? {};
const methods = [
!!opts.default,
!!opts.peers,
!!opts.getPeers,
!!opts.enrUrl,
].filter((x) => x);
if (methods.length > 1) {
throw new Error(
"Bootstrap does not support several discovery methods (yet)"
);
}
this.interval = opts.interval ?? 10000;
opts.default =
opts.default ?? (!opts.peers && !opts.getPeers && !opts.enrUrl);
const maxPeers = opts.maxPeers ?? Bootstrap.DefaultMaxPeers;
this.peers = [];
if (opts.default) {
log("Use hosted list of peers.");
this.peers = multiaddrsToPeerInfo(
getPredefinedBootstrapNodes(undefined, maxPeers)
);
return;
}
if (!!opts.peers && opts.peers.length > 0) {
const allPeers: Multiaddr[] = opts.peers.map(
(node: string | Multiaddr) => {
if (typeof node === "string") {
return new Multiaddr(node);
} else {
return node;
}
}
);
this.peers = multiaddrsToPeerInfo(
getPseudoRandomSubset(allPeers, maxPeers)
);
log(
"Use provided list of peers (reduced to maxPeers)",
this.peers.map((ma) => ma.toString())
);
return;
}
if (typeof opts.getPeers === "function") {
log("Bootstrap: Use provided getPeers function.");
const getPeers = opts.getPeers;
this.asyncGetBootstrapPeers = async () => {
const allPeers = await getPeers();
return getPseudoRandomSubset<string | Multiaddr>(
allPeers,
maxPeers
).map((node) => new Multiaddr(node));
};
return;
}
if (opts.enrUrl) {
const wantedNodeCapabilityCount = opts.wantedNodeCapabilityCount;
if (!wantedNodeCapabilityCount)
throw "`wantedNodeCapabilityCount` must be defined when using `enrUrl`";
const enrUrl = opts.enrUrl;
log("Use provided EIP-1459 ENR Tree URL.");
const dns = DnsNodeDiscovery.dnsOverHttp();
this.asyncGetBootstrapPeers = async () => {
const enrs = await dns.getPeers([enrUrl], wantedNodeCapabilityCount);
log(`Found ${enrs.length} peers`);
return enrs.map((enr) => enr.getFullMultiaddrs()).flat();
};
return;
}
}
/**
* Start discovery process
*/
start(): void {
if (this.asyncGetBootstrapPeers) {
// TODO: This should emit the peer as they are discovered instead of having
// to wait for the full DNS discovery process to be done first.
// TODO: PeerInfo should be returned by discovery
this.asyncGetBootstrapPeers().then((peers) => {
this.peers = multiaddrsToPeerInfo(peers);
this._startTimer();
});
} else {
this._startTimer();
}
}
private _startTimer(): void {
if (this.peers) {
log("Starting bootstrap node discovery");
if (this.timer != null) {
return;
}
this.timer = setInterval(() => this._returnPeers(), this.interval);
this._returnPeers();
}
}
_returnPeers(): void {
if (this.timer == null) {
return;
}
this.peers.forEach((peerData) => {
this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", { detail: peerData })
);
});
}
/**
* Stop emitting events
*/
stop(): void {
if (this.timer != null) {
clearInterval(this.timer);
}
this.timer = undefined;
}
get [symbol](): true {
return true;
}
get [Symbol.toStringTag](): string {
return "@waku/bootstrap";
}
}
function multiaddrsToPeerInfo(mas: Multiaddr[]): PeerInfo[] {
return mas
.map((ma) => {
const peerIdStr = ma.getPeerId();
const protocols: string[] = [];
return {
id: peerIdStr ? peerIdFromString(peerIdStr) : null,
multiaddrs: [ma.decapsulateCode(421)],
protocols,
};
})
.filter((peerInfo): peerInfo is PeerInfo => peerInfo.id !== null);
}

View File

@ -1,6 +0,0 @@
export { getPredefinedBootstrapNodes } from "./predefined";
export * as predefined from "./predefined";
export { Bootstrap, BootstrapOptions } from "./bootstrap";
export * as dns from "./dns";
export { DnsOverHttps } from "./dns_over_https";
export { ENRTree, ENRTreeValues, ENRRootValues } from "./enrtree";

View File

@ -4,7 +4,10 @@ import { ENR } from "../enr";
import { DnsOverHttps } from "./dns_over_https";
import { ENRTree } from "./enrtree";
import fetchNodesUntilCapabilitiesFulfilled from "./fetch_nodes";
import {
fetchNodesUntilCapabilitiesFulfilled,
yieldNodesUntilCapabilitiesFulfilled,
} from "./fetch_nodes";
const dbg = debug("waku:discovery:dns");
@ -70,6 +73,30 @@ export class DnsNodeDiscovery {
this.dns = dns;
}
/**
* {@docInherit getPeers}
*/
async *getNextPeer(
enrTreeUrls: string[],
wantedNodeCapabilityCount: Partial<NodeCapabilityCount>
): AsyncGenerator<ENR> {
const networkIndex = Math.floor(Math.random() * enrTreeUrls.length);
const { publicKey, domain } = ENRTree.parseTree(enrTreeUrls[networkIndex]);
const context: SearchContext = {
domain,
publicKey,
visits: {},
};
for await (const peer of yieldNodesUntilCapabilitiesFulfilled(
wantedNodeCapabilityCount,
this._errorTolerance,
() => this._search(domain, context)
)) {
yield peer;
}
}
/**
* Runs a recursive, randomized descent of the DNS tree to retrieve a single
* ENR record as an ENR. Returns null if parsing or DNS resolution fails.

View File

@ -4,7 +4,7 @@ import { expect } from "chai";
import { ENR, Waku2 } from "../enr";
import fetchNodesUntilCapabilitiesFulfilled from "./fetch_nodes";
import { fetchNodesUntilCapabilitiesFulfilled } from "./fetch_nodes";
async function createEnr(waku2: Waku2): Promise<ENR> {
const peerId = await createSecp256k1PeerId();

View File

@ -11,7 +11,7 @@ const dbg = debug("waku:discovery:fetch_nodes");
* fulfilled or the number of [[getNode]] call exceeds the sum of
* [[wantedNodeCapabilityCount]] plus [[errorTolerance]].
*/
export default async function fetchNodesUntilCapabilitiesFulfilled(
export async function fetchNodesUntilCapabilitiesFulfilled(
wantedNodeCapabilityCount: Partial<NodeCapabilityCount>,
errorTolerance: number,
getNode: () => Promise<ENR | null>
@ -57,6 +57,56 @@ export default async function fetchNodesUntilCapabilitiesFulfilled(
return peers;
}
/**
* Fetch nodes using passed [[getNode]] until all wanted capabilities are
* fulfilled or the number of [[getNode]] call exceeds the sum of
* [[wantedNodeCapabilityCount]] plus [[errorTolerance]].
*/
export async function* yieldNodesUntilCapabilitiesFulfilled(
wantedNodeCapabilityCount: Partial<NodeCapabilityCount>,
errorTolerance: number,
getNode: () => Promise<ENR | null>
): AsyncGenerator<ENR> {
const wanted = {
relay: wantedNodeCapabilityCount.relay ?? 0,
store: wantedNodeCapabilityCount.store ?? 0,
filter: wantedNodeCapabilityCount.filter ?? 0,
lightPush: wantedNodeCapabilityCount.lightPush ?? 0,
};
const maxSearches =
wanted.relay + wanted.store + wanted.filter + wanted.lightPush;
const actual = {
relay: 0,
store: 0,
filter: 0,
lightPush: 0,
};
let totalSearches = 0;
const peerNodeIds = new Set();
while (
!isSatisfied(wanted, actual) &&
totalSearches < maxSearches + errorTolerance
) {
const peer = await getNode();
if (peer && peer.nodeId && !peerNodeIds.has(peer.nodeId)) {
peerNodeIds.add(peer.nodeId);
// ENRs without a waku2 key are ignored.
if (peer.waku2) {
if (helpsSatisfyCapabilities(peer.waku2, wanted, actual)) {
addCapabilities(peer.waku2, actual);
yield peer;
}
}
dbg(`got new peer candidate from DNS address=${peer.nodeId}@${peer.ip}`);
}
totalSearches++;
}
}
function isSatisfied(
wanted: NodeCapabilityCount,
actual: NodeCapabilityCount

View File

@ -1,6 +1,6 @@
import { expect } from "chai";
import { getPseudoRandomSubset } from "./random_subset";
import { getPseudoRandomSubset } from "../random_subset";
describe("Discovery", () => {
it("returns all values when wanted number matches available values", function () {

View File

@ -0,0 +1,84 @@
import type {
PeerDiscovery,
PeerDiscoveryEvents,
} from "@libp2p/interface-peer-discovery";
import { symbol } from "@libp2p/interface-peer-discovery";
import type { PeerInfo } from "@libp2p/interface-peer-info";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import debug from "debug";
import { ENR } from "../enr";
import { multiaddrsToPeerInfo } from "../multiaddr_to_peer_info";
import { DnsNodeDiscovery, NodeCapabilityCount } from "./dns";
const log = debug("waku:peer-discovery-dns");
/**
* Parse options and expose function to return bootstrap peer addresses.
*
* @throws if an invalid combination of options is passed, see [[BootstrapOptions]] for details.
*/
export class PeerDiscoveryDns
extends EventEmitter<PeerDiscoveryEvents>
implements PeerDiscovery
{
private readonly nextPeer: () => AsyncGenerator<ENR>;
private _started: boolean;
/**
* @param enrUrl An EIP-1459 ENR Tree URL. For example:
* "enrtree://AOFTICU2XWDULNLZGRMQS4RIZPAZEHYMV4FYHAPW563HNRAOERP7C@test.nodes.vac.dev"
* @param wantedNodeCapabilityCount Specifies what node capabilities
* (protocol) must be returned.
*/
constructor(
enrUrl: string,
wantedNodeCapabilityCount: Partial<NodeCapabilityCount>
) {
super();
this._started = false;
log("Use following EIP-1459 ENR Tree URL: ", enrUrl);
const dns = DnsNodeDiscovery.dnsOverHttp();
this.nextPeer = dns.getNextPeer.bind(
{},
[enrUrl],
wantedNodeCapabilityCount
);
}
/**
* Start discovery process
*/
async start(): Promise<void> {
log("Starting peer discovery via dns");
this._started = true;
for await (const peer of this.nextPeer()) {
if (!this._started) return;
const peerInfos = multiaddrsToPeerInfo(peer.getFullMultiaddrs());
peerInfos.forEach((peerInfo) => {
this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", { detail: peerInfo })
);
});
}
}
/**
* Stop emitting events
*/
stop(): void {
this._started = false;
}
get [symbol](): true {
return true;
}
get [Symbol.toStringTag](): string {
return "@waku/bootstrap";
}
}

View File

@ -1,6 +1,6 @@
import { Multiaddr } from "@multiformats/multiaddr";
import { getPseudoRandomSubset } from "./random_subset";
import { getPseudoRandomSubset } from "../random_subset";
export const DefaultWantedNumber = 1;

View File

@ -11,7 +11,7 @@ import debug from "debug";
import { multiaddrsToPeerInfo } from "./multiaddr_to_peer_info";
import { getPseudoRandomSubset } from "./random_subset";
const log = debug("waku:discovery:static-list");
const log = debug("waku:peer-discovery-static-list");
export interface Options {
/**
@ -75,7 +75,7 @@ export class PeerDiscoveryStaticPeers
private _startTimer(): void {
if (this.peers) {
log("Starting emitting static peers for boostrap.");
log("Starting to emit static peers.");
if (this.timer != null) {
return;
}

View File

@ -10,6 +10,7 @@ import {
import { createWaku } from "./create_waku";
import { generateSymmetricKey } from "./crypto";
import { PeerDiscoveryStaticPeers } from "./peer_discovery_static_list";
import { waitForRemotePeer } from "./wait_for_remote_peer";
import { Protocols, Waku } from "./waku";
import { WakuMessage } from "./waku_message";
@ -61,7 +62,9 @@ describe("Waku Dial [node only]", function () {
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
bootstrap: { peers: [multiAddrWithId] },
libp2p: {
peerDiscovery: [new PeerDiscoveryStaticPeers([multiAddrWithId])],
},
});
await waku.start();
@ -77,7 +80,7 @@ describe("Waku Dial [node only]", function () {
expect(connectedPeerID.toString()).to.eq(multiAddrWithId.getPeerId());
});
it("Passing a function", async function () {
it("Using a function", async function () {
this.timeout(10_000);
nwaku = new Nwaku(makeLogFileName(this));
@ -85,10 +88,10 @@ describe("Waku Dial [node only]", function () {
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
bootstrap: {
getPeers: async () => {
return [await nwaku.getMultiaddrWithId()];
},
libp2p: {
peerDiscovery: [
new PeerDiscoveryStaticPeers([await nwaku.getMultiaddrWithId()]),
],
},
});
await waku.start();

View File

@ -24,7 +24,7 @@ describe("Waku Dial", function () {
this.timeout(20_000);
waku = await createWaku({
bootstrap: { default: true },
defaultBootstrap: true,
});
await waku.start();