fix: libp2p upgrade (no more autodial + constructor dependency injection)

This commit is contained in:
fryorcraken.eth 2022-11-16 20:30:48 +11:00
parent 910fc5a6b2
commit 8dfb133cd7
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
7 changed files with 45 additions and 50 deletions

View File

@ -17,7 +17,7 @@ export {
} from "./lib/waku_light_push";
export * as waku_relay from "./lib/waku_relay";
export { WakuRelay } from "./lib/waku_relay";
export { WakuRelay, wakuRelay } from "./lib/waku_relay";
export * as waku_store from "./lib/waku_store";
export { PageDirection, WakuStore, StoreCodec } from "./lib/waku_store";

View File

@ -1,9 +1,7 @@
import type { Stream } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PubSub } from "@libp2p/interface-pubsub";
import { peerIdFromString } from "@libp2p/peer-id";
import type { Multiaddr } from "@multiformats/multiaddr";
import { multiaddr } from "@multiformats/multiaddr";
import type { Waku } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import debug from "debug";
@ -103,6 +101,15 @@ export class WakuNode implements Waku {
libp2p.connectionManager.addEventListener("peer:disconnect", (evt) => {
this.stopKeepAlive(evt.detail.remotePeer);
});
// Trivial handling of discovered peers, to be refined.
libp2p.addEventListener("peer:discovery", (evt) => {
const peerId = evt.detail.id;
log(`Found peer ${peerId.toString()}, dialing.`);
libp2p.dial(peerId).catch((err) => {
log(`Fail to dial ${peerId}`, err);
});
});
}
/**
@ -141,29 +148,6 @@ export class WakuNode implements Waku {
return this.libp2p.dialProtocol(peer, codecs);
}
/**
* Add peer to address book, it will be auto-dialed in the background.
*/
async addPeerToAddressBook(
peerId: PeerId | string,
multiaddrs: Multiaddr[] | string[]
): Promise<void> {
let peer;
if (typeof peerId === "string") {
peer = peerIdFromString(peerId);
} else {
peer = peerId;
}
const addresses = multiaddrs.map((addr: Multiaddr | string) => {
if (typeof addr === "string") {
return multiaddr(addr);
} else {
return addr;
}
});
await this.libp2p.peerStore.addressBook.set(peer, addresses);
}
async start(): Promise<void> {
await this.libp2p.start();
}

View File

@ -1,5 +1,6 @@
import {
GossipSub,
GossipSubComponents,
GossipsubMessage,
GossipsubOpts,
} from "@chainsafe/libp2p-gossipsub";
@ -66,13 +67,16 @@ export class WakuRelay extends GossipSub implements Relay {
*/
public observers: Map<string, Set<Observer<any>>>;
constructor(options?: Partial<CreateOptions>) {
constructor(
components: GossipSubComponents,
options?: Partial<CreateOptions>
) {
options = Object.assign(options ?? {}, {
// Ensure that no signature is included nor expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
fallbackToFloodsub: false,
});
super(options);
super(components, options);
this.multicodecs = constants.RelayCodecs;
this.observers = new Map();
@ -188,3 +192,9 @@ export class WakuRelay extends GossipSub implements Relay {
}
WakuRelay.multicodec = constants.RelayCodecs[constants.RelayCodecs.length - 1];
export function wakuRelay(
init: Partial<CreateOptions> = {}
): (components: GossipSubComponents) => Relay {
return (components: GossipSubComponents) => new WakuRelay(components, init);
}

View File

@ -1,9 +1,8 @@
import { Noise } from "@chainsafe/libp2p-noise";
import { noise } from "@chainsafe/libp2p-noise";
import { bootstrap } from "@libp2p/bootstrap";
import type { BootstrapComponents } from "@libp2p/bootstrap";
import type { PeerDiscovery } from "@libp2p/interface-peer-discovery";
import { Mplex } from "@libp2p/mplex";
import { WebSockets } from "@libp2p/websockets";
import { mplex } from "@libp2p/mplex";
import { webSockets } from "@libp2p/websockets";
import { all as filterAll } from "@libp2p/websockets/filters";
import {
waku,
@ -11,13 +10,14 @@ import {
WakuFilter,
WakuLightPush,
WakuNode,
WakuRelay,
wakuRelay,
WakuStore,
} from "@waku/core";
import { getPredefinedBootstrapNodes } from "@waku/core/lib/predefined_bootstrap_nodes";
import type { WakuFull, WakuLight, WakuPrivacy } from "@waku/interfaces";
import type { Relay, WakuFull, WakuLight, WakuPrivacy } from "@waku/interfaces";
import type { Libp2p } from "libp2p";
import { createLibp2p, Libp2pOptions } from "libp2p";
import type { Components } from "libp2p/components";
type WakuOptions = waku.WakuOptions;
type RelayCreateOptions = waku_relay.CreateOptions;
@ -101,7 +101,7 @@ export async function createPrivacyNode(
Object.assign(libp2pOptions, { peerDiscovery });
}
const libp2p = await defaultLibp2p(new WakuRelay(options), libp2pOptions);
const libp2p = await defaultLibp2p(wakuRelay(options), libp2pOptions);
return new WakuNode(options ?? {}, libp2p) as WakuPrivacy;
}
@ -129,7 +129,7 @@ export async function createFullNode(
Object.assign(libp2pOptions, { peerDiscovery });
}
const libp2p = await defaultLibp2p(new WakuRelay(options), libp2pOptions);
const libp2p = await defaultLibp2p(wakuRelay(options), libp2pOptions);
const wakuStore = new WakuStore(libp2p, options);
const wakuLightPush = new WakuLightPush(libp2p, options);
@ -145,20 +145,20 @@ export async function createFullNode(
}
export function defaultPeerDiscovery(): (
components: BootstrapComponents
components: Components
) => PeerDiscovery {
return bootstrap({ list: getPredefinedBootstrapNodes() });
}
export async function defaultLibp2p(
wakuRelay?: WakuRelay,
wakuRelay?: (components: Components) => Relay,
options?: Partial<Libp2pOptions>
): Promise<Libp2p> {
const libp2pOpts = Object.assign(
{
transports: [new WebSockets({ filter: filterAll })],
streamMuxers: [new Mplex()],
connectionEncryption: [new Noise()],
transports: [webSockets({ filter: filterAll })],
streamMuxers: [mplex()],
connectionEncryption: [noise()],
},
wakuRelay ? { pubsub: wakuRelay } : {},
options ?? {}

View File

@ -112,11 +112,6 @@ export interface Waku {
dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;
addPeerToAddressBook(
peerId: PeerId | string,
multiaddrs: Multiaddr[] | string[]
): void;
start(): Promise<void>;
stop(): Promise<void>;

View File

@ -64,10 +64,11 @@ describe("Waku Relay [node only]", () => {
}).then((waku) => waku.start().then(() => waku)),
]);
log("Instances started, adding waku2 to waku1's address book");
waku1.addPeerToAddressBook(
await waku1.libp2p.peerStore.addressBook.set(
waku2.libp2p.peerId,
waku2.libp2p.getMultiaddrs()
);
await waku1.dial(waku2.libp2p.peerId);
log("Wait for mutual pubsub subscription");
await Promise.all([
@ -281,14 +282,18 @@ describe("Waku Relay [node only]", () => {
}).then((waku) => waku.start().then(() => waku)),
]);
waku1.addPeerToAddressBook(
await waku1.libp2p.peerStore.addressBook.set(
waku2.libp2p.peerId,
waku2.libp2p.getMultiaddrs()
);
waku3.addPeerToAddressBook(
await waku3.libp2p.peerStore.addressBook.set(
waku2.libp2p.peerId,
waku2.libp2p.getMultiaddrs()
);
await Promise.all([
waku1.dial(waku2.libp2p.peerId),
waku3.dial(waku2.libp2p.peerId),
]);
await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),

View File

@ -140,10 +140,11 @@ describe("Decryption Keys", () => {
}).then((waku) => waku.start().then(() => waku)),
]);
waku1.addPeerToAddressBook(
await waku1.libp2p.peerStore.addressBook.set(
waku2.libp2p.peerId,
waku2.libp2p.getMultiaddrs()
);
await waku1.dial(waku2.libp2p.peerId);
await Promise.all([
waitForRemotePeer(waku1, [Protocols.Relay]),