diff --git a/package-lock.json b/package-lock.json index 3f778380aa..63237e1889 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2479,9 +2479,9 @@ } }, "node_modules/@libp2p/interface-registrar": { - "version": "2.0.3", - "resolved": "https://registry.npmjs.org/@libp2p/interface-registrar/-/interface-registrar-2.0.3.tgz", - "integrity": "sha512-YA/A+o+166/+noXxMFXvZdg9soZSZX2EPOlUwnGXZWR7J5B2sxyP76QxHWXL5npsEMj7suP+Rjb/GJYGz7rDyg==", + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/@libp2p/interface-registrar/-/interface-registrar-2.0.4.tgz", + "integrity": "sha512-GD5EY+LrtV4v4Mvm/L/ObeMWb96VPZppi7Vl1b1HU5dMzWSnPdOylJZ/N0/Ppryg30CO6yayq9g+/CQN8YEk4g==", "dependencies": { "@libp2p/interface-connection": "^3.0.0", "@libp2p/interface-peer-id": "^1.0.0" @@ -21836,6 +21836,7 @@ "@libp2p/interface-peer-info": "^1.0.4", "@libp2p/interface-peer-store": "^1.2.3", "@libp2p/interface-pubsub": "^3.0.1", + "@libp2p/interface-registrar": "^2.0.4", "@libp2p/interfaces": "^3.0.4", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", @@ -22026,7 +22027,7 @@ "@chainsafe/libp2p-gossipsub": "^5.2.1", "@libp2p/interface-connection": "^3.0.2", "@libp2p/interface-peer-id": "^1.0.5", - "@libp2p/interface-peer-store": "^1.2.2", + "@libp2p/interface-peer-store": "^1.2.3", "@multiformats/multiaddr": "^11.0.6", "libp2p": "0.40.0" }, @@ -24135,9 +24136,9 @@ } }, "@libp2p/interface-registrar": { - "version": "2.0.3", - "resolved": "https://registry.npmjs.org/@libp2p/interface-registrar/-/interface-registrar-2.0.3.tgz", - "integrity": "sha512-YA/A+o+166/+noXxMFXvZdg9soZSZX2EPOlUwnGXZWR7J5B2sxyP76QxHWXL5npsEMj7suP+Rjb/GJYGz7rDyg==", + "version": "2.0.4", + "resolved": "https://registry.npmjs.org/@libp2p/interface-registrar/-/interface-registrar-2.0.4.tgz", + "integrity": "sha512-GD5EY+LrtV4v4Mvm/L/ObeMWb96VPZppi7Vl1b1HU5dMzWSnPdOylJZ/N0/Ppryg30CO6yayq9g+/CQN8YEk4g==", "requires": { "@libp2p/interface-connection": "^3.0.0", "@libp2p/interface-peer-id": "^1.0.0" @@ -26066,6 +26067,7 @@ "@libp2p/interface-peer-info": "^1.0.4", "@libp2p/interface-peer-store": "^1.2.3", "@libp2p/interface-pubsub": "^3.0.1", + "@libp2p/interface-registrar": "^2.0.4", "@libp2p/interfaces": "^3.0.4", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", @@ -26228,7 +26230,7 @@ "@chainsafe/libp2p-gossipsub": "^5.2.1", "@libp2p/interface-connection": "^3.0.2", "@libp2p/interface-peer-id": "^1.0.5", - "@libp2p/interface-peer-store": "^1.2.2", + "@libp2p/interface-peer-store": "^1.2.3", "@multiformats/multiaddr": "^11.0.6", "@typescript-eslint/eslint-plugin": "^5.8.1", "@typescript-eslint/parser": "^5.8.1", diff --git a/packages/core/package.json b/packages/core/package.json index 1869b11450..37e0294353 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -82,7 +82,6 @@ "node": ">=16" }, "dependencies": { - "@waku/byte-utils": "*", "@chainsafe/libp2p-gossipsub": "^5.2.1", "@libp2p/interface-connection": "^3.0.3", "@libp2p/interface-peer-discovery": "^1.0.0", @@ -90,9 +89,11 @@ "@libp2p/interface-peer-info": "^1.0.4", "@libp2p/interface-peer-store": "^1.2.3", "@libp2p/interface-pubsub": "^3.0.1", + "@libp2p/interface-registrar": "^2.0.4", "@libp2p/interfaces": "^3.0.4", "@libp2p/peer-id": "^1.1.10", "@multiformats/multiaddr": "^11.0.6", + "@waku/byte-utils": "*", "@waku/interfaces": "*", "debug": "^4.3.4", "it-all": "^1.0.6", diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 388376bfd8..eb8de3e863 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -7,17 +7,17 @@ export * as waku from "./lib/waku"; export { WakuNode } from "./lib/waku"; export * as waku_filter from "./lib/waku_filter"; -export { WakuFilter } from "./lib/waku_filter"; +export { wakuFilter } from "./lib/waku_filter"; export * as waku_light_push from "./lib/waku_light_push"; export { - WakuLightPush, + wakuLightPush, LightPushCodec, PushResponse, } from "./lib/waku_light_push"; export * as waku_relay from "./lib/waku_relay"; -export { WakuRelay, wakuRelay } from "./lib/waku_relay"; +export { wakuRelay } from "./lib/waku_relay"; export * as waku_store from "./lib/waku_store"; -export { PageDirection, WakuStore, StoreCodec } from "./lib/waku_store"; +export { PageDirection, wakuStore, StoreCodec } from "./lib/waku_store"; diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/core/src/lib/wait_for_remote_peer.ts index caa254239a..a7f9e2d64b 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/core/src/lib/wait_for_remote_peer.ts @@ -94,13 +94,13 @@ async function waitForConnectedPeer( for (const codec of codecs) { if (evt.detail.protocols.includes(codec)) { log("Resolving for", codec, evt.detail.protocols); - waku.libp2p.peerStore.removeEventListener("change:protocols", cb); + waku.peerStore.removeEventListener("change:protocols", cb); resolve(); break; } } }; - waku.libp2p.peerStore.addEventListener("change:protocols", cb); + waku.peerStore.addEventListener("change:protocols", cb); }); } diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index 9b88d374f6..7192d2a59d 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -2,18 +2,17 @@ import type { Stream } from "@libp2p/interface-connection"; import type { PeerId } from "@libp2p/interface-peer-id"; import type { PubSub } from "@libp2p/interface-pubsub"; import type { Multiaddr } from "@multiformats/multiaddr"; -import type { Waku } from "@waku/interfaces"; +import type { Filter, LightPush, Relay, Store, Waku } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import debug from "debug"; import type { Libp2p } from "libp2p"; -import { FilterCodec, WakuFilter } from "./waku_filter"; -import { LightPushCodec, WakuLightPush } from "./waku_light_push"; +import { FilterCodec, FilterComponents } from "./waku_filter"; +import { LightPushCodec, LightPushComponents } from "./waku_light_push"; import { EncoderV0 } from "./waku_message/version_0"; -import { WakuRelay } from "./waku_relay"; import * as relayConstants from "./waku_relay/constants"; import { RelayCodecs, RelayPingContentTopic } from "./waku_relay/constants"; -import { StoreCodec, WakuStore } from "./waku_store"; +import { StoreCodec, StoreComponents } from "./waku_store"; export const DefaultPingKeepAliveValueSecs = 0; export const DefaultRelayKeepAliveValueSecs = 5 * 60; @@ -39,10 +38,10 @@ export interface WakuOptions { export class WakuNode implements Waku { public libp2p: Libp2p; - public relay?: WakuRelay; - public store?: WakuStore; - public filter?: WakuFilter; - public lightPush?: WakuLightPush; + public relay?: Relay; + public store?: Store; + public filter?: Filter; + public lightPush?: LightPush; private pingKeepAliveTimers: { [peer: string]: ReturnType; @@ -54,16 +53,26 @@ export class WakuNode implements Waku { constructor( options: WakuOptions, libp2p: Libp2p, - store?: WakuStore, - lightPush?: WakuLightPush, - filter?: WakuFilter + store?: (components: StoreComponents) => Store, + lightPush?: (components: LightPushComponents) => LightPush, + filter?: (components: FilterComponents) => Filter ) { this.libp2p = libp2p; - this.store = store; - this.filter = filter; - this.lightPush = lightPush; - if (isWakuRelay(libp2p.pubsub)) { + const { peerStore, connectionManager, registrar } = libp2p; + const components = { peerStore, connectionManager, registrar }; + + if (store) { + this.store = store(components); + } + if (filter) { + this.filter = filter(components); + } + if (lightPush) { + this.lightPush = lightPush(components); + } + + if (isRelay(libp2p.pubsub)) { this.relay = libp2p.pubsub; } @@ -233,7 +242,7 @@ export class WakuNode implements Waku { } } -function isWakuRelay(pubsub: PubSub): pubsub is WakuRelay { +function isRelay(pubsub: PubSub): pubsub is Relay { if (pubsub) { try { return pubsub.multicodecs.includes( diff --git a/packages/core/src/lib/waku_filter/index.ts b/packages/core/src/lib/waku_filter/index.ts index e09b9249ea..9a0ccb7cb9 100644 --- a/packages/core/src/lib/waku_filter/index.ts +++ b/packages/core/src/lib/waku_filter/index.ts @@ -1,7 +1,10 @@ import type { Stream } from "@libp2p/interface-connection"; +import type { ConnectionManager } from "@libp2p/interface-connection-manager"; import type { PeerId } from "@libp2p/interface-peer-id"; +import type { PeerStore } from "@libp2p/interface-peer-store"; import type { Peer } from "@libp2p/interface-peer-store"; import type { IncomingStreamData } from "@libp2p/interface-registrar"; +import type { Registrar } from "@libp2p/interface-registrar"; import type { Callback, DecodedMessage, @@ -14,7 +17,6 @@ import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; -import type { Libp2p } from "libp2p"; import { WakuMessage as WakuMessageProto } from "../../proto/message"; import { DefaultPubSubTopic } from "../constants"; @@ -28,12 +30,19 @@ import { import { toProtoMessage } from "../to_proto_message"; import { ContentFilter, FilterRPC } from "./filter_rpc"; + export { ContentFilter }; export const FilterCodec = "/vac/waku/filter/2.0.0-beta1"; const log = debug("waku:filter"); +export interface FilterComponents { + peerStore: PeerStore; + registrar: Registrar; + connectionManager: ConnectionManager; +} + export interface CreateOptions { /** * The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}. @@ -55,7 +64,7 @@ export type UnsubscribeFunction = () => Promise; * - https://github.com/status-im/go-waku/issues/245 * - https://github.com/status-im/nwaku/issues/948 */ -export class WakuFilter implements Filter { +class WakuFilter implements Filter { pubSubTopic: string; private subscriptions: Map>; private decoders: Map< @@ -63,11 +72,11 @@ export class WakuFilter implements Filter { Set> >; - constructor(public libp2p: Libp2p, options?: CreateOptions) { + constructor(public components: FilterComponents, options?: CreateOptions) { this.subscriptions = new Map(); this.decoders = new Map(); this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; - this.libp2p + this.components.registrar .handle(FilterCodec, this.onRequest.bind(this)) .catch((e) => log("Failed to register filter protocol", e)); } @@ -139,6 +148,10 @@ export class WakuFilter implements Filter { }; } + get peerStore(): PeerStore { + return this.components.peerStore; + } + private onRequest(streamData: IncomingStreamData): void { log("Receiving message push"); try { @@ -261,7 +274,9 @@ export class WakuFilter implements Filter { } private async newStream(peer: Peer): Promise { - const connections = this.libp2p.connectionManager.getConnections(peer.id); + const connections = this.components.connectionManager.getConnections( + peer.id + ); const connection = selectConnection(connections); if (!connection) { throw new Error("Failed to get a connection to the peer"); @@ -272,7 +287,7 @@ export class WakuFilter implements Filter { private async getPeer(peerId?: PeerId): Promise { const res = await selectPeerForProtocol( - this.libp2p.peerStore, + this.components.peerStore, [FilterCodec], peerId ); @@ -283,10 +298,16 @@ export class WakuFilter implements Filter { } async peers(): Promise { - return getPeersForProtocol(this.libp2p.peerStore, [FilterCodec]); + return getPeersForProtocol(this.components.peerStore, [FilterCodec]); } async randomPeer(): Promise { return selectRandomPeer(await this.peers()); } } + +export function wakuFilter( + init: Partial = {} +): (components: FilterComponents) => Filter { + return (components: FilterComponents) => new WakuFilter(components, init); +} diff --git a/packages/core/src/lib/waku_light_push/index.ts b/packages/core/src/lib/waku_light_push/index.ts index 78600fe9d2..2ddaccaff8 100644 --- a/packages/core/src/lib/waku_light_push/index.ts +++ b/packages/core/src/lib/waku_light_push/index.ts @@ -1,7 +1,10 @@ +import { ConnectionManager } from "@libp2p/interface-connection-manager"; import type { PeerId } from "@libp2p/interface-peer-id"; import type { Peer } from "@libp2p/interface-peer-store"; +import type { PeerStore } from "@libp2p/interface-peer-store"; import type { Encoder, + LightPush, Message, ProtocolOptions, SendResult, @@ -10,7 +13,6 @@ import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; -import { Libp2p } from "libp2p"; import { Uint8ArrayList } from "uint8arraylist"; import { PushResponse } from "../../proto/light_push"; @@ -29,6 +31,11 @@ const log = debug("waku:light-push"); export const LightPushCodec = "/vac/waku/lightpush/2.0.0-beta1"; export { PushResponse }; +export interface LightPushComponents { + peerStore: PeerStore; + connectionManager: ConnectionManager; +} + export interface CreateOptions { /** * The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}. @@ -44,10 +51,10 @@ export interface CreateOptions { /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ -export class WakuLightPush { +class WakuLightPush implements LightPush { pubSubTopic: string; - constructor(public libp2p: Libp2p, options?: CreateOptions) { + constructor(public components: LightPushComponents, options?: CreateOptions) { this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; } @@ -59,7 +66,7 @@ export class WakuLightPush { const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic; const res = await selectPeerForProtocol( - this.libp2p.peerStore, + this.components.peerStore, [LightPushCodec], opts?.peerId ); @@ -69,7 +76,9 @@ export class WakuLightPush { } const { peer } = res; - const connections = this.libp2p.connectionManager.getConnections(peer.id); + const connections = this.components.connectionManager.getConnections( + peer.id + ); const connection = selectConnection(connections); if (!connection) throw "Failed to get a connection to the peer"; @@ -123,7 +132,7 @@ export class WakuLightPush { * peers. */ async peers(): Promise { - return getPeersForProtocol(this.libp2p.peerStore, [LightPushCodec]); + return getPeersForProtocol(this.components.peerStore, [LightPushCodec]); } /** @@ -134,4 +143,15 @@ export class WakuLightPush { async randomPeer(): Promise { return selectRandomPeer(await this.peers()); } + + get peerStore(): PeerStore { + return this.components.peerStore; + } +} + +export function wakuLightPush( + init: Partial = {} +): (components: LightPushComponents) => LightPush { + return (components: LightPushComponents) => + new WakuLightPush(components, init); } diff --git a/packages/core/src/lib/waku_relay/index.ts b/packages/core/src/lib/waku_relay/index.ts index a2fac9b0d7..f1832e9e3d 100644 --- a/packages/core/src/lib/waku_relay/index.ts +++ b/packages/core/src/lib/waku_relay/index.ts @@ -56,7 +56,7 @@ export type CreateOptions = { * * @implements {require('libp2p-interfaces/src/pubsub')} */ -export class WakuRelay extends GossipSub implements Relay { +class WakuRelay extends GossipSub implements Relay { pubSubTopic: string; defaultDecoder: Decoder; public static multicodec: string = constants.RelayCodecs[0]; diff --git a/packages/core/src/lib/waku_store/index.ts b/packages/core/src/lib/waku_store/index.ts index efcbf10b72..c93005bced 100644 --- a/packages/core/src/lib/waku_store/index.ts +++ b/packages/core/src/lib/waku_store/index.ts @@ -1,12 +1,12 @@ import type { Connection } from "@libp2p/interface-connection"; +import type { ConnectionManager } from "@libp2p/interface-connection-manager"; import type { PeerId } from "@libp2p/interface-peer-id"; -import { Peer } from "@libp2p/interface-peer-store"; -import { DecodedMessage, Decoder } from "@waku/interfaces"; +import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; +import { DecodedMessage, Decoder, Store } from "@waku/interfaces"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; -import { Libp2p } from "libp2p"; import { Uint8ArrayList } from "uint8arraylist"; import * as proto from "../../proto/store"; @@ -27,6 +27,11 @@ export const DefaultPageSize = 10; export { PageDirection }; +export interface StoreComponents { + peerStore: PeerStore; + connectionManager: ConnectionManager; +} + export interface CreateOptions { /** * The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}. @@ -82,10 +87,10 @@ export interface QueryOptions { * * The Waku Store protocol can be used to retrieved historical messages. */ -export class WakuStore { +class WakuStore implements Store { pubSubTopic: string; - constructor(public libp2p: Libp2p, options?: CreateOptions) { + constructor(public components: StoreComponents, options?: CreateOptions) { this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; } @@ -232,7 +237,7 @@ export class WakuStore { }); const res = await selectPeerForProtocol( - this.libp2p.peerStore, + this.components.peerStore, [StoreCodec], options?.peerId ); @@ -242,7 +247,9 @@ export class WakuStore { } const { peer, protocol } = res; - const connections = this.libp2p.connectionManager.getConnections(peer.id); + const connections = this.components.connectionManager.getConnections( + peer.id + ); const connection = selectConnection(connections); if (!connection) throw "Failed to get a connection to the peer"; @@ -262,7 +269,11 @@ export class WakuStore { * store protocol. Waku may or may not be currently connected to these peers. */ async peers(): Promise { - return getPeersForProtocol(this.libp2p.peerStore, [StoreCodec]); + return getPeersForProtocol(this.components.peerStore, [StoreCodec]); + } + + get peerStore(): PeerStore { + return this.components.peerStore; } } @@ -370,3 +381,9 @@ async function* paginate( export function isDefined(msg: T | undefined): msg is T { return !!msg; } + +export function wakuStore( + init: Partial = {} +): (components: StoreComponents) => Store { + return (components: StoreComponents) => new WakuStore(components, init); +} diff --git a/packages/create/src/index.ts b/packages/create/src/index.ts index 309c81f750..d7f33bb321 100644 --- a/packages/create/src/index.ts +++ b/packages/create/src/index.ts @@ -7,11 +7,11 @@ import { all as filterAll } from "@libp2p/websockets/filters"; import { waku, waku_relay, - WakuFilter, - WakuLightPush, + wakuFilter, + wakuLightPush, WakuNode, wakuRelay, - WakuStore, + wakuStore, } from "@waku/core"; import { getPredefinedBootstrapNodes } from "@waku/core/lib/predefined_bootstrap_nodes"; import type { Relay, WakuFull, WakuLight, WakuPrivacy } from "@waku/interfaces"; @@ -74,16 +74,16 @@ export async function createLightNode( const libp2p = await defaultLibp2p(undefined, libp2pOptions); - const wakuStore = new WakuStore(libp2p, options); - const wakuLightPush = new WakuLightPush(libp2p, options); - const wakuFilter = new WakuFilter(libp2p, options); + const store = wakuStore(options); + const lightPush = wakuLightPush(options); + const filter = wakuFilter(options); return new WakuNode( options ?? {}, libp2p, - wakuStore, - wakuLightPush, - wakuFilter + store, + lightPush, + filter ) as WakuLight; } @@ -131,16 +131,16 @@ export async function createFullNode( const libp2p = await defaultLibp2p(wakuRelay(options), libp2pOptions); - const wakuStore = new WakuStore(libp2p, options); - const wakuLightPush = new WakuLightPush(libp2p, options); - const wakuFilter = new WakuFilter(libp2p, options); + const store = wakuStore(options); + const lightPush = wakuLightPush(options); + const filter = wakuFilter(options); return new WakuNode( options ?? {}, libp2p, - wakuStore, - wakuLightPush, - wakuFilter + store, + lightPush, + filter ) as WakuFull; } diff --git a/packages/interfaces/package.json b/packages/interfaces/package.json index 82c3850f0b..696954ab28 100644 --- a/packages/interfaces/package.json +++ b/packages/interfaces/package.json @@ -51,7 +51,7 @@ "@chainsafe/libp2p-gossipsub": "^5.2.1", "@libp2p/interface-connection": "^3.0.2", "@libp2p/interface-peer-id": "^1.0.5", - "@libp2p/interface-peer-store": "^1.2.2", + "@libp2p/interface-peer-store": "^1.2.3", "@multiformats/multiaddr": "^11.0.6", "libp2p": "0.40.0" }, diff --git a/packages/interfaces/src/index.ts b/packages/interfaces/src/index.ts index 8c3c025bc3..f32176ff00 100644 --- a/packages/interfaces/src/index.ts +++ b/packages/interfaces/src/index.ts @@ -2,6 +2,7 @@ import type { GossipSub } from "@chainsafe/libp2p-gossipsub"; import type { Stream } from "@libp2p/interface-connection"; import type { PeerId } from "@libp2p/interface-peer-id"; import type { Peer } from "@libp2p/interface-peer-store"; +import type { PeerStore } from "@libp2p/interface-peer-store"; import type { Multiaddr } from "@multiformats/multiaddr"; import type { Libp2p } from "libp2p"; @@ -13,7 +14,7 @@ export enum Protocols { } export interface PointToPointProtocol { - libp2p: Libp2p; + peerStore: PeerStore; peers: () => Promise; }