From e0a23bad44e6e829fc235851620af3fbaaad0e05 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 22 Jun 2022 15:52:02 +1000 Subject: [PATCH] Sort out createWaku and waitForRemotePeer --- src/lib/discovery/bootstrap.ts | 3 +- src/lib/waku.ts | 168 ++++++++++++--------------------- src/lib/waku_relay/index.ts | 8 ++ 3 files changed, 70 insertions(+), 109 deletions(-) diff --git a/src/lib/discovery/bootstrap.ts b/src/lib/discovery/bootstrap.ts index e82e9d240a..7f3662d562 100644 --- a/src/lib/discovery/bootstrap.ts +++ b/src/lib/discovery/bootstrap.ts @@ -83,8 +83,9 @@ export class Bootstrap private timer?: ReturnType; private readonly interval: number; - constructor(opts: BootstrapOptions) { + constructor(opts?: BootstrapOptions) { super(); + opts = opts ?? {}; const methods = [ !!opts.default, diff --git a/src/lib/waku.ts b/src/lib/waku.ts index d7df8efc86..aee8d13689 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -2,12 +2,12 @@ import { Noise } from "@chainsafe/libp2p-noise"; import type { PeerId } from "@libp2p/interface-peer-id"; import { MuxedStream } from "@libp2p/interfaces/stream-muxer/types"; import { Mplex } from "@libp2p/mplex"; +import { peerIdFromString } from "@libp2p/peer-id"; import { WebSockets } from "@libp2p/websockets"; import filters from "@libp2p/websockets/filters"; import { Multiaddr, multiaddr } from "@multiformats/multiaddr"; import debug from "debug"; import { createLibp2p, Libp2p, Libp2pOptions } from "libp2p"; -import Libp2pBootstrap from "libp2p-bootstrap"; import PingService from "libp2p/src/ping"; import { Bootstrap, BootstrapOptions } from "./discovery"; @@ -85,7 +85,13 @@ export interface CreateOptions { decryptionKeys?: Array; } -export async function createWaku(): Promise { +export async function createWaku(options?: CreateOptions): Promise { + const peerDiscovery = []; + if (options?.bootstrap) { + peerDiscovery.push(new Bootstrap(options?.bootstrap)); + } + + // TODO: Use options const libp2pOpts = { transports: new WebSockets({ filter: filters.all }), streamMuxers: [new Mplex()], @@ -97,21 +103,13 @@ export async function createWaku(): Promise { // @ts-ignore: modules property is correctly set thanks to voodoo const libp2p = await createLibp2p(libp2pOpts); - const wakuStore = new WakuStore(libp2p, { - pubSubTopic: options?.pubSubTopic, - }); + const wakuStore = new WakuStore(libp2p); const wakuLightPush = new WakuLightPush(libp2p); const wakuFilter = new WakuFilter(libp2p); await libp2p.start(); - return new Waku( - options ? options : {}, - libp2p, - wakuStore, - wakuLightPush, - wakuFilter - ); + return new Waku({}, libp2p, wakuStore, wakuLightPush, wakuFilter); } export class Waku { @@ -128,7 +126,7 @@ export class Waku { [peer: string]: ReturnType; }; - private constructor( + constructor( options: CreateOptions, libp2p: Libp2p, store: WakuStore, @@ -172,80 +170,8 @@ export class Waku { }); } - /** - * Create and start new waku node. - */ - static async create(options?: CreateOptions): Promise { - // Get an object in case options or libp2p are undefined - - // Default for Websocket filter is `all`: - // Returns all TCP and DNS based addresses, both with ws or wss. - const libp2pOpts: Partial | undefined = Object.assign( - { - transport: { - [websocketsTransportKey]: { - filter: filters.all, - }, - }, - }, - options?.libp2p - ); - - // TODO: Pass self-emit? - // Pass pubsub topic to relay - libp2pOpts.pubsub = new WakuRelay({ pubSubTopic: options?.pubSubTopic }); - - // Default transport for libp2p is Websockets - libp2pOpts.transports = options?.libp2p?.transports ?? [Websockets]; - - // streamMuxer, connection encryption and pubsub are overridden - // as those are the only ones currently supported by Waku nodes. - libp2pOpts.streamMuxers = [Mplex]; - libp2pOpts.connectionEncryption = [new Noise(options?.staticNoiseKey)]; - - if (options?.bootstrap) { - const bootstrap = new Bootstrap(options?.bootstrap); - - if (bootstrap.getBootstrapPeers !== undefined) { - try { - const list = await bootstrap.getBootstrapPeers(); - - // Note: this overrides any other peer discover - libp2pOpts.modules = Object.assign(libp2pOpts.modules, { - peerDiscovery: [Libp2pBootstrap], - }); - - libp2pOpts.config.peerDiscovery = { - [Libp2pBootstrap.tag]: { - list, - enabled: true, - }, - }; - } catch (e) { - dbg("Failed to retrieve bootstrap nodes", e); - } - } - } - - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore: modules property is correctly set thanks to voodoo - const libp2p = await createLibp2p(libp2pOpts); - - const wakuStore = new WakuStore(libp2p, { - pubSubTopic: options?.pubSubTopic, - }); - const wakuLightPush = new WakuLightPush(libp2p); - const wakuFilter = new WakuFilter(libp2p); - - await libp2p.start(); - - return new Waku( - options ? options : {}, - libp2p, - wakuStore, - wakuLightPush, - wakuFilter - ); + start(): void { + this.libp2p.start(); } /** @@ -291,7 +217,7 @@ export class Waku { ): void { let peer; if (typeof peerId === "string") { - peer = PeerId.createFromB58String(peerId); + peer = peerIdFromString(peerId); } else { peer = peerId; } @@ -344,9 +270,9 @@ export class Waku { * @throws if libp2p is not listening on localhost. */ getLocalMultiaddrWithID(): string { - const localMultiaddr = this.libp2p.multiaddrs.find((addr) => - addr.toString().match(/127\.0\.0\.1/) - ); + const localMultiaddr = this.libp2p + .getMultiaddrs() + .find((addr) => addr.toString().match(/127\.0\.0\.1/)); if (!localMultiaddr || localMultiaddr.toString() === "") { throw "Not listening on localhost"; } @@ -374,21 +300,23 @@ export class Waku { const promises: Promise[] = []; if (protocols.includes(Protocols.Relay)) { - const peers = this.relay.getPeers(); + const peers = this.relay.getMeshPeers(this.relay.pubSubTopic); - if (peers.size == 0) { + if (peers.length == 0) { // No peer yet available, wait for a subscription const promise = new Promise((resolve) => { - this.libp2p.pubsub.once("pubsub:subscription-change", () => { + this.relay.addEventListener("subscription-change", () => { // Remote peer subscribed to topic, now wait for a heartbeat // so that the mesh is updated and the remote peer added to it - this.libp2p.pubsub.once("gossipsub:heartbeat", resolve); + this.relay.addEventListener("gossipsub:heartbeat", () => resolve()); }); }); promises.push(promise); } } + // TODO: This can be factored in one helper function + // Probably need to add a "string" protocol to each class to make it easier if (protocols.includes(Protocols.Store)) { const storePromise = (async (): Promise => { const peers = await this.store.peers(); @@ -398,6 +326,29 @@ export class Waku { return; } + await new Promise((resolve) => { + this.libp2p.peerStore.addEventListener("change:protocols", (evt) => { + for (const codec of Object.values(StoreCodecs)) { + if (evt.detail.protocols.includes(codec)) { + dbg("Resolving for", codec, evt.detail.protocols); + resolve(); + } + } + }); + }); + })(); + promises.push(storePromise); + } + + if (protocols.includes(Protocols.LightPush)) { + const lightPushPromise = (async (): Promise => { + const peers = await this.lightPush.peers(); + + if (peers.length) { + dbg("Light Push peer found: ", peers[0].id.toString()); + return; + } + await new Promise((resolve) => { this.libp2p.peerStore.addEventListener("change:protocols", (evt) => { if (evt.detail.protocols.includes(LightPushCodec)) { @@ -407,25 +358,26 @@ export class Waku { }); }); })(); - promises.push(storePromise); - } - - if (protocols.includes(Protocols.LightPush)) { - const lightPushPromise = (async (): Promise => { - for await (const peer of this.lightPush.peers) { - dbg("Light Push peer found", peer.id.toString()); - break; - } - })(); promises.push(lightPushPromise); } if (protocols.includes(Protocols.Filter)) { const filterPromise = (async (): Promise => { - for await (const peer of this.filter.peers) { - dbg("Filter peer found", peer.id.toString()); - break; + const peers = await this.filter.peers(); + + if (peers.length) { + dbg("Filter peer found: ", peers[0].id.toString()); + return; } + + await new Promise((resolve) => { + this.libp2p.peerStore.addEventListener("change:protocols", (evt) => { + if (evt.detail.protocols.includes(FilterCodec)) { + dbg("Resolving for", FilterCodec, evt.detail.protocols); + resolve(); + } + }); + }); })(); promises.push(filterPromise); } diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 40cb020537..64b01682e2 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -3,6 +3,10 @@ import { GossipsubMessage, GossipsubOpts, } from "@chainsafe/libp2p-gossipsub"; +import { + PeerIdStr, + TopicStr, +} from "@chainsafe/libp2p-gossipsub/dist/src/types"; import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; import debug from "debug"; @@ -206,6 +210,10 @@ export class WakuRelay extends GossipSub { super.subscribe(pubSubTopic); } + getMeshPeers(topic?: TopicStr): PeerIdStr[] { + return super.getMeshPeers(topic ?? DefaultPubSubTopic); + } + // TODO: Implement method that uses Relay codec // public async heartbeat(): Promise { }