Sort out createWaku and waitForRemotePeer

This commit is contained in:
Franck Royer 2022-06-22 15:52:02 +10:00
parent 4cf197e54d
commit e0a23bad44
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
3 changed files with 70 additions and 109 deletions

View File

@ -83,8 +83,9 @@ export class Bootstrap
private timer?: ReturnType<typeof setInterval>;
private readonly interval: number;
constructor(opts: BootstrapOptions) {
constructor(opts?: BootstrapOptions) {
super();
opts = opts ?? {};
const methods = [
!!opts.default,

View File

@ -2,12 +2,12 @@ import { Noise } from "@chainsafe/libp2p-noise";
import type { PeerId } from "@libp2p/interface-peer-id";
import { MuxedStream } from "@libp2p/interfaces/stream-muxer/types";
import { Mplex } from "@libp2p/mplex";
import { peerIdFromString } from "@libp2p/peer-id";
import { WebSockets } from "@libp2p/websockets";
import filters from "@libp2p/websockets/filters";
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
import debug from "debug";
import { createLibp2p, Libp2p, Libp2pOptions } from "libp2p";
import Libp2pBootstrap from "libp2p-bootstrap";
import PingService from "libp2p/src/ping";
import { Bootstrap, BootstrapOptions } from "./discovery";
@ -85,7 +85,13 @@ export interface CreateOptions {
decryptionKeys?: Array<Uint8Array | string>;
}
export async function createWaku(): Promise<Waku> {
export async function createWaku(options?: CreateOptions): Promise<Waku> {
const peerDiscovery = [];
if (options?.bootstrap) {
peerDiscovery.push(new Bootstrap(options?.bootstrap));
}
// TODO: Use options
const libp2pOpts = {
transports: new WebSockets({ filter: filters.all }),
streamMuxers: [new Mplex()],
@ -97,21 +103,13 @@ export async function createWaku(): Promise<Waku> {
// @ts-ignore: modules property is correctly set thanks to voodoo
const libp2p = await createLibp2p(libp2pOpts);
const wakuStore = new WakuStore(libp2p, {
pubSubTopic: options?.pubSubTopic,
});
const wakuStore = new WakuStore(libp2p);
const wakuLightPush = new WakuLightPush(libp2p);
const wakuFilter = new WakuFilter(libp2p);
await libp2p.start();
return new Waku(
options ? options : {},
libp2p,
wakuStore,
wakuLightPush,
wakuFilter
);
return new Waku({}, libp2p, wakuStore, wakuLightPush, wakuFilter);
}
export class Waku {
@ -128,7 +126,7 @@ export class Waku {
[peer: string]: ReturnType<typeof setInterval>;
};
private constructor(
constructor(
options: CreateOptions,
libp2p: Libp2p,
store: WakuStore,
@ -172,80 +170,8 @@ export class Waku {
});
}
/**
* Create and start new waku node.
*/
static async create(options?: CreateOptions): Promise<Waku> {
// Get an object in case options or libp2p are undefined
// Default for Websocket filter is `all`:
// Returns all TCP and DNS based addresses, both with ws or wss.
const libp2pOpts: Partial<Libp2pOptions> | undefined = Object.assign(
{
transport: {
[websocketsTransportKey]: {
filter: filters.all,
},
},
},
options?.libp2p
);
// TODO: Pass self-emit?
// Pass pubsub topic to relay
libp2pOpts.pubsub = new WakuRelay({ pubSubTopic: options?.pubSubTopic });
// Default transport for libp2p is Websockets
libp2pOpts.transports = options?.libp2p?.transports ?? [Websockets];
// streamMuxer, connection encryption and pubsub are overridden
// as those are the only ones currently supported by Waku nodes.
libp2pOpts.streamMuxers = [Mplex];
libp2pOpts.connectionEncryption = [new Noise(options?.staticNoiseKey)];
if (options?.bootstrap) {
const bootstrap = new Bootstrap(options?.bootstrap);
if (bootstrap.getBootstrapPeers !== undefined) {
try {
const list = await bootstrap.getBootstrapPeers();
// Note: this overrides any other peer discover
libp2pOpts.modules = Object.assign(libp2pOpts.modules, {
peerDiscovery: [Libp2pBootstrap],
});
libp2pOpts.config.peerDiscovery = {
[Libp2pBootstrap.tag]: {
list,
enabled: true,
},
};
} catch (e) {
dbg("Failed to retrieve bootstrap nodes", e);
}
}
}
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: modules property is correctly set thanks to voodoo
const libp2p = await createLibp2p(libp2pOpts);
const wakuStore = new WakuStore(libp2p, {
pubSubTopic: options?.pubSubTopic,
});
const wakuLightPush = new WakuLightPush(libp2p);
const wakuFilter = new WakuFilter(libp2p);
await libp2p.start();
return new Waku(
options ? options : {},
libp2p,
wakuStore,
wakuLightPush,
wakuFilter
);
start(): void {
this.libp2p.start();
}
/**
@ -291,7 +217,7 @@ export class Waku {
): void {
let peer;
if (typeof peerId === "string") {
peer = PeerId.createFromB58String(peerId);
peer = peerIdFromString(peerId);
} else {
peer = peerId;
}
@ -344,9 +270,9 @@ export class Waku {
* @throws if libp2p is not listening on localhost.
*/
getLocalMultiaddrWithID(): string {
const localMultiaddr = this.libp2p.multiaddrs.find((addr) =>
addr.toString().match(/127\.0\.0\.1/)
);
const localMultiaddr = this.libp2p
.getMultiaddrs()
.find((addr) => addr.toString().match(/127\.0\.0\.1/));
if (!localMultiaddr || localMultiaddr.toString() === "") {
throw "Not listening on localhost";
}
@ -374,21 +300,23 @@ export class Waku {
const promises: Promise<void>[] = [];
if (protocols.includes(Protocols.Relay)) {
const peers = this.relay.getPeers();
const peers = this.relay.getMeshPeers(this.relay.pubSubTopic);
if (peers.size == 0) {
if (peers.length == 0) {
// No peer yet available, wait for a subscription
const promise = new Promise<void>((resolve) => {
this.libp2p.pubsub.once("pubsub:subscription-change", () => {
this.relay.addEventListener("subscription-change", () => {
// Remote peer subscribed to topic, now wait for a heartbeat
// so that the mesh is updated and the remote peer added to it
this.libp2p.pubsub.once("gossipsub:heartbeat", resolve);
this.relay.addEventListener("gossipsub:heartbeat", () => resolve());
});
});
promises.push(promise);
}
}
// TODO: This can be factored in one helper function
// Probably need to add a "string" protocol to each class to make it easier
if (protocols.includes(Protocols.Store)) {
const storePromise = (async (): Promise<void> => {
const peers = await this.store.peers();
@ -398,6 +326,29 @@ export class Waku {
return;
}
await new Promise<void>((resolve) => {
this.libp2p.peerStore.addEventListener("change:protocols", (evt) => {
for (const codec of Object.values(StoreCodecs)) {
if (evt.detail.protocols.includes(codec)) {
dbg("Resolving for", codec, evt.detail.protocols);
resolve();
}
}
});
});
})();
promises.push(storePromise);
}
if (protocols.includes(Protocols.LightPush)) {
const lightPushPromise = (async (): Promise<void> => {
const peers = await this.lightPush.peers();
if (peers.length) {
dbg("Light Push peer found: ", peers[0].id.toString());
return;
}
await new Promise<void>((resolve) => {
this.libp2p.peerStore.addEventListener("change:protocols", (evt) => {
if (evt.detail.protocols.includes(LightPushCodec)) {
@ -407,25 +358,26 @@ export class Waku {
});
});
})();
promises.push(storePromise);
}
if (protocols.includes(Protocols.LightPush)) {
const lightPushPromise = (async (): Promise<void> => {
for await (const peer of this.lightPush.peers) {
dbg("Light Push peer found", peer.id.toString());
break;
}
})();
promises.push(lightPushPromise);
}
if (protocols.includes(Protocols.Filter)) {
const filterPromise = (async (): Promise<void> => {
for await (const peer of this.filter.peers) {
dbg("Filter peer found", peer.id.toString());
break;
const peers = await this.filter.peers();
if (peers.length) {
dbg("Filter peer found: ", peers[0].id.toString());
return;
}
await new Promise<void>((resolve) => {
this.libp2p.peerStore.addEventListener("change:protocols", (evt) => {
if (evt.detail.protocols.includes(FilterCodec)) {
dbg("Resolving for", FilterCodec, evt.detail.protocols);
resolve();
}
});
});
})();
promises.push(filterPromise);
}

View File

@ -3,6 +3,10 @@ import {
GossipsubMessage,
GossipsubOpts,
} from "@chainsafe/libp2p-gossipsub";
import {
PeerIdStr,
TopicStr,
} from "@chainsafe/libp2p-gossipsub/dist/src/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import debug from "debug";
@ -206,6 +210,10 @@ export class WakuRelay extends GossipSub {
super.subscribe(pubSubTopic);
}
getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return super.getMeshPeers(topic ?? DefaultPubSubTopic);
}
// TODO: Implement method that uses Relay codec
// public async heartbeat(): Promise<void> {
}