refactor: adopt dependency injection patter for other Waku protocols

It actually simplies the API as the caller does not need to pass
libp2p to each protocol anymore (when not using a `create*` helper).
This commit is contained in:
fryorcraken.eth 2022-11-17 11:01:27 +11:00
parent 8dfb133cd7
commit a372307a30
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
12 changed files with 142 additions and 71 deletions

18
package-lock.json generated
View File

@ -2479,9 +2479,9 @@
}
},
"node_modules/@libp2p/interface-registrar": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/@libp2p/interface-registrar/-/interface-registrar-2.0.3.tgz",
"integrity": "sha512-YA/A+o+166/+noXxMFXvZdg9soZSZX2EPOlUwnGXZWR7J5B2sxyP76QxHWXL5npsEMj7suP+Rjb/GJYGz7rDyg==",
"version": "2.0.4",
"resolved": "https://registry.npmjs.org/@libp2p/interface-registrar/-/interface-registrar-2.0.4.tgz",
"integrity": "sha512-GD5EY+LrtV4v4Mvm/L/ObeMWb96VPZppi7Vl1b1HU5dMzWSnPdOylJZ/N0/Ppryg30CO6yayq9g+/CQN8YEk4g==",
"dependencies": {
"@libp2p/interface-connection": "^3.0.0",
"@libp2p/interface-peer-id": "^1.0.0"
@ -21836,6 +21836,7 @@
"@libp2p/interface-peer-info": "^1.0.4",
"@libp2p/interface-peer-store": "^1.2.3",
"@libp2p/interface-pubsub": "^3.0.1",
"@libp2p/interface-registrar": "^2.0.4",
"@libp2p/interfaces": "^3.0.4",
"@libp2p/peer-id": "^1.1.10",
"@multiformats/multiaddr": "^11.0.6",
@ -22026,7 +22027,7 @@
"@chainsafe/libp2p-gossipsub": "^5.2.1",
"@libp2p/interface-connection": "^3.0.2",
"@libp2p/interface-peer-id": "^1.0.5",
"@libp2p/interface-peer-store": "^1.2.2",
"@libp2p/interface-peer-store": "^1.2.3",
"@multiformats/multiaddr": "^11.0.6",
"libp2p": "0.40.0"
},
@ -24135,9 +24136,9 @@
}
},
"@libp2p/interface-registrar": {
"version": "2.0.3",
"resolved": "https://registry.npmjs.org/@libp2p/interface-registrar/-/interface-registrar-2.0.3.tgz",
"integrity": "sha512-YA/A+o+166/+noXxMFXvZdg9soZSZX2EPOlUwnGXZWR7J5B2sxyP76QxHWXL5npsEMj7suP+Rjb/GJYGz7rDyg==",
"version": "2.0.4",
"resolved": "https://registry.npmjs.org/@libp2p/interface-registrar/-/interface-registrar-2.0.4.tgz",
"integrity": "sha512-GD5EY+LrtV4v4Mvm/L/ObeMWb96VPZppi7Vl1b1HU5dMzWSnPdOylJZ/N0/Ppryg30CO6yayq9g+/CQN8YEk4g==",
"requires": {
"@libp2p/interface-connection": "^3.0.0",
"@libp2p/interface-peer-id": "^1.0.0"
@ -26066,6 +26067,7 @@
"@libp2p/interface-peer-info": "^1.0.4",
"@libp2p/interface-peer-store": "^1.2.3",
"@libp2p/interface-pubsub": "^3.0.1",
"@libp2p/interface-registrar": "^2.0.4",
"@libp2p/interfaces": "^3.0.4",
"@libp2p/peer-id": "^1.1.10",
"@multiformats/multiaddr": "^11.0.6",
@ -26228,7 +26230,7 @@
"@chainsafe/libp2p-gossipsub": "^5.2.1",
"@libp2p/interface-connection": "^3.0.2",
"@libp2p/interface-peer-id": "^1.0.5",
"@libp2p/interface-peer-store": "^1.2.2",
"@libp2p/interface-peer-store": "^1.2.3",
"@multiformats/multiaddr": "^11.0.6",
"@typescript-eslint/eslint-plugin": "^5.8.1",
"@typescript-eslint/parser": "^5.8.1",

View File

@ -82,7 +82,6 @@
"node": ">=16"
},
"dependencies": {
"@waku/byte-utils": "*",
"@chainsafe/libp2p-gossipsub": "^5.2.1",
"@libp2p/interface-connection": "^3.0.3",
"@libp2p/interface-peer-discovery": "^1.0.0",
@ -90,9 +89,11 @@
"@libp2p/interface-peer-info": "^1.0.4",
"@libp2p/interface-peer-store": "^1.2.3",
"@libp2p/interface-pubsub": "^3.0.1",
"@libp2p/interface-registrar": "^2.0.4",
"@libp2p/interfaces": "^3.0.4",
"@libp2p/peer-id": "^1.1.10",
"@multiformats/multiaddr": "^11.0.6",
"@waku/byte-utils": "*",
"@waku/interfaces": "*",
"debug": "^4.3.4",
"it-all": "^1.0.6",

View File

@ -7,17 +7,17 @@ export * as waku from "./lib/waku";
export { WakuNode } from "./lib/waku";
export * as waku_filter from "./lib/waku_filter";
export { WakuFilter } from "./lib/waku_filter";
export { wakuFilter } from "./lib/waku_filter";
export * as waku_light_push from "./lib/waku_light_push";
export {
WakuLightPush,
wakuLightPush,
LightPushCodec,
PushResponse,
} from "./lib/waku_light_push";
export * as waku_relay from "./lib/waku_relay";
export { WakuRelay, wakuRelay } from "./lib/waku_relay";
export { wakuRelay } from "./lib/waku_relay";
export * as waku_store from "./lib/waku_store";
export { PageDirection, WakuStore, StoreCodec } from "./lib/waku_store";
export { PageDirection, wakuStore, StoreCodec } from "./lib/waku_store";

View File

@ -94,13 +94,13 @@ async function waitForConnectedPeer(
for (const codec of codecs) {
if (evt.detail.protocols.includes(codec)) {
log("Resolving for", codec, evt.detail.protocols);
waku.libp2p.peerStore.removeEventListener("change:protocols", cb);
waku.peerStore.removeEventListener("change:protocols", cb);
resolve();
break;
}
}
};
waku.libp2p.peerStore.addEventListener("change:protocols", cb);
waku.peerStore.addEventListener("change:protocols", cb);
});
}

View File

@ -2,18 +2,17 @@ import type { Stream } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PubSub } from "@libp2p/interface-pubsub";
import type { Multiaddr } from "@multiformats/multiaddr";
import type { Waku } from "@waku/interfaces";
import type { Filter, LightPush, Relay, Store, Waku } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import debug from "debug";
import type { Libp2p } from "libp2p";
import { FilterCodec, WakuFilter } from "./waku_filter";
import { LightPushCodec, WakuLightPush } from "./waku_light_push";
import { FilterCodec, FilterComponents } from "./waku_filter";
import { LightPushCodec, LightPushComponents } from "./waku_light_push";
import { EncoderV0 } from "./waku_message/version_0";
import { WakuRelay } from "./waku_relay";
import * as relayConstants from "./waku_relay/constants";
import { RelayCodecs, RelayPingContentTopic } from "./waku_relay/constants";
import { StoreCodec, WakuStore } from "./waku_store";
import { StoreCodec, StoreComponents } from "./waku_store";
export const DefaultPingKeepAliveValueSecs = 0;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
@ -39,10 +38,10 @@ export interface WakuOptions {
export class WakuNode implements Waku {
public libp2p: Libp2p;
public relay?: WakuRelay;
public store?: WakuStore;
public filter?: WakuFilter;
public lightPush?: WakuLightPush;
public relay?: Relay;
public store?: Store;
public filter?: Filter;
public lightPush?: LightPush;
private pingKeepAliveTimers: {
[peer: string]: ReturnType<typeof setInterval>;
@ -54,16 +53,26 @@ export class WakuNode implements Waku {
constructor(
options: WakuOptions,
libp2p: Libp2p,
store?: WakuStore,
lightPush?: WakuLightPush,
filter?: WakuFilter
store?: (components: StoreComponents) => Store,
lightPush?: (components: LightPushComponents) => LightPush,
filter?: (components: FilterComponents) => Filter
) {
this.libp2p = libp2p;
this.store = store;
this.filter = filter;
this.lightPush = lightPush;
if (isWakuRelay(libp2p.pubsub)) {
const { peerStore, connectionManager, registrar } = libp2p;
const components = { peerStore, connectionManager, registrar };
if (store) {
this.store = store(components);
}
if (filter) {
this.filter = filter(components);
}
if (lightPush) {
this.lightPush = lightPush(components);
}
if (isRelay(libp2p.pubsub)) {
this.relay = libp2p.pubsub;
}
@ -233,7 +242,7 @@ export class WakuNode implements Waku {
}
}
function isWakuRelay(pubsub: PubSub): pubsub is WakuRelay {
function isRelay(pubsub: PubSub): pubsub is Relay {
if (pubsub) {
try {
return pubsub.multicodecs.includes(

View File

@ -1,7 +1,10 @@
import type { Stream } from "@libp2p/interface-connection";
import type { ConnectionManager } from "@libp2p/interface-connection-manager";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerStore } from "@libp2p/interface-peer-store";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type { Registrar } from "@libp2p/interface-registrar";
import type {
Callback,
DecodedMessage,
@ -14,7 +17,6 @@ import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import type { Libp2p } from "libp2p";
import { WakuMessage as WakuMessageProto } from "../../proto/message";
import { DefaultPubSubTopic } from "../constants";
@ -28,12 +30,19 @@ import {
import { toProtoMessage } from "../to_proto_message";
import { ContentFilter, FilterRPC } from "./filter_rpc";
export { ContentFilter };
export const FilterCodec = "/vac/waku/filter/2.0.0-beta1";
const log = debug("waku:filter");
export interface FilterComponents {
peerStore: PeerStore;
registrar: Registrar;
connectionManager: ConnectionManager;
}
export interface CreateOptions {
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}.
@ -55,7 +64,7 @@ export type UnsubscribeFunction = () => Promise<void>;
* - https://github.com/status-im/go-waku/issues/245
* - https://github.com/status-im/nwaku/issues/948
*/
export class WakuFilter implements Filter {
class WakuFilter implements Filter {
pubSubTopic: string;
private subscriptions: Map<string, Callback<any>>;
private decoders: Map<
@ -63,11 +72,11 @@ export class WakuFilter implements Filter {
Set<Decoder<any>>
>;
constructor(public libp2p: Libp2p, options?: CreateOptions) {
constructor(public components: FilterComponents, options?: CreateOptions) {
this.subscriptions = new Map();
this.decoders = new Map();
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
this.libp2p
this.components.registrar
.handle(FilterCodec, this.onRequest.bind(this))
.catch((e) => log("Failed to register filter protocol", e));
}
@ -139,6 +148,10 @@ export class WakuFilter implements Filter {
};
}
get peerStore(): PeerStore {
return this.components.peerStore;
}
private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
@ -261,7 +274,9 @@ export class WakuFilter implements Filter {
}
private async newStream(peer: Peer): Promise<Stream> {
const connections = this.libp2p.connectionManager.getConnections(peer.id);
const connections = this.components.connectionManager.getConnections(
peer.id
);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
@ -272,7 +287,7 @@ export class WakuFilter implements Filter {
private async getPeer(peerId?: PeerId): Promise<Peer> {
const res = await selectPeerForProtocol(
this.libp2p.peerStore,
this.components.peerStore,
[FilterCodec],
peerId
);
@ -283,10 +298,16 @@ export class WakuFilter implements Filter {
}
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.libp2p.peerStore, [FilterCodec]);
return getPeersForProtocol(this.components.peerStore, [FilterCodec]);
}
async randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(await this.peers());
}
}
export function wakuFilter(
init: Partial<CreateOptions> = {}
): (components: FilterComponents) => Filter {
return (components: FilterComponents) => new WakuFilter(components, init);
}

View File

@ -1,7 +1,10 @@
import { ConnectionManager } from "@libp2p/interface-connection-manager";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import type { PeerStore } from "@libp2p/interface-peer-store";
import type {
Encoder,
LightPush,
Message,
ProtocolOptions,
SendResult,
@ -10,7 +13,6 @@ import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Libp2p } from "libp2p";
import { Uint8ArrayList } from "uint8arraylist";
import { PushResponse } from "../../proto/light_push";
@ -29,6 +31,11 @@ const log = debug("waku:light-push");
export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1";
export { PushResponse };
export interface LightPushComponents {
peerStore: PeerStore;
connectionManager: ConnectionManager;
}
export interface CreateOptions {
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}.
@ -44,10 +51,10 @@ export interface CreateOptions {
/**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
export class WakuLightPush {
class WakuLightPush implements LightPush {
pubSubTopic: string;
constructor(public libp2p: Libp2p, options?: CreateOptions) {
constructor(public components: LightPushComponents, options?: CreateOptions) {
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
}
@ -59,7 +66,7 @@ export class WakuLightPush {
const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic;
const res = await selectPeerForProtocol(
this.libp2p.peerStore,
this.components.peerStore,
[LightPushCodec],
opts?.peerId
);
@ -69,7 +76,9 @@ export class WakuLightPush {
}
const { peer } = res;
const connections = this.libp2p.connectionManager.getConnections(peer.id);
const connections = this.components.connectionManager.getConnections(
peer.id
);
const connection = selectConnection(connections);
if (!connection) throw "Failed to get a connection to the peer";
@ -123,7 +132,7 @@ export class WakuLightPush {
* peers.
*/
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.libp2p.peerStore, [LightPushCodec]);
return getPeersForProtocol(this.components.peerStore, [LightPushCodec]);
}
/**
@ -134,4 +143,15 @@ export class WakuLightPush {
async randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(await this.peers());
}
get peerStore(): PeerStore {
return this.components.peerStore;
}
}
export function wakuLightPush(
init: Partial<CreateOptions> = {}
): (components: LightPushComponents) => LightPush {
return (components: LightPushComponents) =>
new WakuLightPush(components, init);
}

View File

@ -56,7 +56,7 @@ export type CreateOptions = {
*
* @implements {require('libp2p-interfaces/src/pubsub')}
*/
export class WakuRelay extends GossipSub implements Relay {
class WakuRelay extends GossipSub implements Relay {
pubSubTopic: string;
defaultDecoder: Decoder<DecodedMessage>;
public static multicodec: string = constants.RelayCodecs[0];

View File

@ -1,12 +1,12 @@
import type { Connection } from "@libp2p/interface-connection";
import type { ConnectionManager } from "@libp2p/interface-connection-manager";
import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer } from "@libp2p/interface-peer-store";
import { DecodedMessage, Decoder } from "@waku/interfaces";
import type { Peer, PeerStore } from "@libp2p/interface-peer-store";
import { DecodedMessage, Decoder, Store } from "@waku/interfaces";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Libp2p } from "libp2p";
import { Uint8ArrayList } from "uint8arraylist";
import * as proto from "../../proto/store";
@ -27,6 +27,11 @@ export const DefaultPageSize = 10;
export { PageDirection };
export interface StoreComponents {
peerStore: PeerStore;
connectionManager: ConnectionManager;
}
export interface CreateOptions {
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}.
@ -82,10 +87,10 @@ export interface QueryOptions {
*
* The Waku Store protocol can be used to retrieved historical messages.
*/
export class WakuStore {
class WakuStore implements Store {
pubSubTopic: string;
constructor(public libp2p: Libp2p, options?: CreateOptions) {
constructor(public components: StoreComponents, options?: CreateOptions) {
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
}
@ -232,7 +237,7 @@ export class WakuStore {
});
const res = await selectPeerForProtocol(
this.libp2p.peerStore,
this.components.peerStore,
[StoreCodec],
options?.peerId
);
@ -242,7 +247,9 @@ export class WakuStore {
}
const { peer, protocol } = res;
const connections = this.libp2p.connectionManager.getConnections(peer.id);
const connections = this.components.connectionManager.getConnections(
peer.id
);
const connection = selectConnection(connections);
if (!connection) throw "Failed to get a connection to the peer";
@ -262,7 +269,11 @@ export class WakuStore {
* store protocol. Waku may or may not be currently connected to these peers.
*/
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.libp2p.peerStore, [StoreCodec]);
return getPeersForProtocol(this.components.peerStore, [StoreCodec]);
}
get peerStore(): PeerStore {
return this.components.peerStore;
}
}
@ -370,3 +381,9 @@ async function* paginate<T extends DecodedMessage>(
export function isDefined<T>(msg: T | undefined): msg is T {
return !!msg;
}
export function wakuStore(
init: Partial<CreateOptions> = {}
): (components: StoreComponents) => Store {
return (components: StoreComponents) => new WakuStore(components, init);
}

View File

@ -7,11 +7,11 @@ import { all as filterAll } from "@libp2p/websockets/filters";
import {
waku,
waku_relay,
WakuFilter,
WakuLightPush,
wakuFilter,
wakuLightPush,
WakuNode,
wakuRelay,
WakuStore,
wakuStore,
} from "@waku/core";
import { getPredefinedBootstrapNodes } from "@waku/core/lib/predefined_bootstrap_nodes";
import type { Relay, WakuFull, WakuLight, WakuPrivacy } from "@waku/interfaces";
@ -74,16 +74,16 @@ export async function createLightNode(
const libp2p = await defaultLibp2p(undefined, libp2pOptions);
const wakuStore = new WakuStore(libp2p, options);
const wakuLightPush = new WakuLightPush(libp2p, options);
const wakuFilter = new WakuFilter(libp2p, options);
const store = wakuStore(options);
const lightPush = wakuLightPush(options);
const filter = wakuFilter(options);
return new WakuNode(
options ?? {},
libp2p,
wakuStore,
wakuLightPush,
wakuFilter
store,
lightPush,
filter
) as WakuLight;
}
@ -131,16 +131,16 @@ export async function createFullNode(
const libp2p = await defaultLibp2p(wakuRelay(options), libp2pOptions);
const wakuStore = new WakuStore(libp2p, options);
const wakuLightPush = new WakuLightPush(libp2p, options);
const wakuFilter = new WakuFilter(libp2p, options);
const store = wakuStore(options);
const lightPush = wakuLightPush(options);
const filter = wakuFilter(options);
return new WakuNode(
options ?? {},
libp2p,
wakuStore,
wakuLightPush,
wakuFilter
store,
lightPush,
filter
) as WakuFull;
}

View File

@ -51,7 +51,7 @@
"@chainsafe/libp2p-gossipsub": "^5.2.1",
"@libp2p/interface-connection": "^3.0.2",
"@libp2p/interface-peer-id": "^1.0.5",
"@libp2p/interface-peer-store": "^1.2.2",
"@libp2p/interface-peer-store": "^1.2.3",
"@multiformats/multiaddr": "^11.0.6",
"libp2p": "0.40.0"
},

View File

@ -2,6 +2,7 @@ import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import type { Stream } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import type { PeerStore } from "@libp2p/interface-peer-store";
import type { Multiaddr } from "@multiformats/multiaddr";
import type { Libp2p } from "libp2p";
@ -13,7 +14,7 @@ export enum Protocols {
}
export interface PointToPointProtocol {
libp2p: Libp2p;
peerStore: PeerStore;
peers: () => Promise<Peer[]>;
}