diff --git a/.eslintrc.json b/.eslintrc.json index e96e551a54..d324999ce5 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -30,7 +30,16 @@ "no-constant-condition": ["error", { "checkLoops": false }], "import/no-extraneous-dependencies": [ "error", - { "devDependencies": ["**/*.test.ts", "**/*.spec.ts", "**/tests/**"] } + { + "devDependencies": [ + "**/*.test.ts", + "**/*.spec.ts", + "**/tests/**", + "**/rollup.config.js", + "**/.eslintrc.cjs", + "**/karma.conf.cjs" + ] + } ], "sort-imports": [ "error", diff --git a/package-lock.json b/package-lock.json index 2a3ec1b5e4..0d1b762933 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,8 +10,8 @@ "packages/proto", "packages/interfaces", "packages/enr", - "packages/peer-exchange", "packages/core", + "packages/peer-exchange", "packages/dns-discovery", "packages/message-encryption", "packages/create", diff --git a/package.json b/package.json index 672d52f6c6..84508d8e74 100644 --- a/package.json +++ b/package.json @@ -7,8 +7,8 @@ "packages/proto", "packages/interfaces", "packages/enr", - "packages/peer-exchange", "packages/core", + "packages/peer-exchange", "packages/dns-discovery", "packages/message-encryption", "packages/create", diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts new file mode 100644 index 0000000000..8524cb4069 --- /dev/null +++ b/packages/core/src/lib/base_protocol.ts @@ -0,0 +1,48 @@ +import type { Stream } from "@libp2p/interface-connection"; +import type { Libp2p } from "@libp2p/interface-libp2p"; +import type { PeerId } from "@libp2p/interface-peer-id"; +import { Peer, PeerStore } from "@libp2p/interface-peer-store"; +import { + getPeersForProtocol, + selectConnection, + selectPeerForProtocol, +} from "@waku/utils"; + +/** + * A class with predefined helpers, to be used as a base to implement Waku + * Protocols. + */ +export class BaseProtocol { + constructor(public multicodec: string, public libp2p: Libp2p) {} + + /** + * Returns known peers from the address book (`libp2p.peerStore`) that support + * the class protocol. Waku may or may not be currently connected to these + * peers. + */ + async peers(): Promise { + return getPeersForProtocol(this.peerStore, [this.multicodec]); + } + + get peerStore(): PeerStore { + return this.libp2p.peerStore; + } + + protected async getPeer(peerId?: PeerId): Promise { + const { peer } = await selectPeerForProtocol( + this.peerStore, + [this.multicodec], + peerId + ); + return peer; + } + protected async newStream(peer: Peer): Promise { + const connections = this.libp2p.getConnections(peer.id); + const connection = selectConnection(connections); + if (!connection) { + throw new Error("Failed to get a connection to the peer"); + } + + return connection.newStream(this.multicodec); + } +} diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 31f66e8f5b..a2d7a08999 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -1,7 +1,4 @@ -import type { Stream } from "@libp2p/interface-connection"; import type { Libp2p } from "@libp2p/interface-libp2p"; -import type { PeerId } from "@libp2p/interface-peer-id"; -import type { PeerStore } from "@libp2p/interface-peer-store"; import type { Peer } from "@libp2p/interface-peer-store"; import type { IncomingStreamData } from "@libp2p/interface-registrar"; import type { @@ -14,17 +11,12 @@ import type { ProtocolOptions, } from "@waku/interfaces"; import { WakuMessage as WakuMessageProto } from "@waku/proto"; -import { - getPeersForProtocol, - selectConnection, - selectPeerForProtocol, - selectRandomPeer, -} from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; +import { BaseProtocol } from "../base_protocol.js"; import { DefaultPubSubTopic } from "../constants.js"; import { groupByContentTopic } from "../group_by.js"; import { toProtoMessage } from "../to_proto_message.js"; @@ -46,8 +38,7 @@ export type UnsubscribeFunction = () => Promise; * - https://github.com/status-im/go-waku/issues/245 * - https://github.com/status-im/nwaku/issues/948 */ -class Filter implements IFilter { - multicodec: string; +class Filter extends BaseProtocol implements IFilter { options: ProtocolCreateOptions; private subscriptions: Map>; private decoders: Map< @@ -56,12 +47,12 @@ class Filter implements IFilter { >; constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { + super(FilterCodec, libp2p); this.options = options ?? {}; - this.multicodec = FilterCodec; this.subscriptions = new Map(); this.decoders = new Map(); this.libp2p - .handle(FilterCodec, this.onRequest.bind(this)) + .handle(this.multicodec, this.onRequest.bind(this)) .catch((e) => log("Failed to register filter protocol", e)); } @@ -132,10 +123,6 @@ class Filter implements IFilter { }; } - get peerStore(): PeerStore { - return this.libp2p.peerStore; - } - private onRequest(streamData: IncomingStreamData): void { log("Receiving message push"); try { @@ -256,36 +243,6 @@ class Filter implements IFilter { throw e; } } - - private async newStream(peer: Peer): Promise { - const connections = this.libp2p.getConnections(peer.id); - const connection = selectConnection(connections); - if (!connection) { - throw new Error("Failed to get a connection to the peer"); - } - - return connection.newStream(FilterCodec); - } - - private async getPeer(peerId?: PeerId): Promise { - const res = await selectPeerForProtocol( - this.peerStore, - [FilterCodec], - peerId - ); - if (!res) { - throw new Error(`Failed to select peer for ${FilterCodec}`); - } - return res.peer; - } - - async peers(): Promise { - return getPeersForProtocol(this.peerStore, [FilterCodec]); - } - - async randomPeer(): Promise { - return selectRandomPeer(await this.peers()); - } } export function wakuFilter( diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index b5fd967511..30a28072e5 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -1,7 +1,5 @@ import type { Libp2p } from "@libp2p/interface-libp2p"; import type { PeerId } from "@libp2p/interface-peer-id"; -import type { Peer } from "@libp2p/interface-peer-store"; -import type { PeerStore } from "@libp2p/interface-peer-store"; import type { IEncoder, ILightPush, @@ -11,18 +9,13 @@ import type { SendResult, } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; -import { - getPeersForProtocol, - selectConnection, - selectPeerForProtocol, - selectRandomPeer, -} from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { Uint8ArrayList } from "uint8arraylist"; +import { BaseProtocol } from "../base_protocol.js"; import { DefaultPubSubTopic } from "../constants.js"; import { PushRPC } from "./push_rpc.js"; @@ -35,12 +28,11 @@ export { PushResponse }; /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ -class LightPush implements ILightPush { - multicodec: string; +class LightPush extends BaseProtocol implements ILightPush { options: ProtocolCreateOptions; constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { - this.multicodec = LightPushCodec; + super(LightPushCodec, libp2p); this.options = options || {}; } @@ -51,23 +43,8 @@ class LightPush implements ILightPush { ): Promise { const { pubSubTopic = DefaultPubSubTopic } = this.options; - const res = await selectPeerForProtocol( - this.peerStore, - [this.multicodec], - opts?.peerId - ); - - if (!res) { - throw new Error("Failed to get a peer"); - } - const { peer } = res; - - const connections = this.libp2p.getConnections(peer.id); - const connection = selectConnection(connections); - - if (!connection) throw "Failed to get a connection to the peer"; - - const stream = await connection.newStream(LightPushCodec); + const peer = await this.getPeer(opts?.peerId); + const stream = await this.newStream(peer); const recipients: PeerId[] = []; @@ -109,28 +86,6 @@ class LightPush implements ILightPush { } return { recipients }; } - - /** - * Returns known peers from the address book (`libp2p.peerStore`) that support - * light push protocol. Waku may or may not be currently connected to these - * peers. - */ - async peers(): Promise { - return getPeersForProtocol(this.peerStore, [LightPushCodec]); - } - - /** - * Returns a random peer that supports light push protocol from the address - * book (`libp2p.peerStore`). Waku may or may not be currently connected to - * this peer. - */ - async randomPeer(): Promise { - return selectRandomPeer(await this.peers()); - } - - get peerStore(): PeerStore { - return this.libp2p.peerStore; - } } export function wakuLightPush( diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index a1f71ce746..9e4e3129fd 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -1,7 +1,6 @@ -import type { Connection } from "@libp2p/interface-connection"; +import type { Stream } from "@libp2p/interface-connection"; import type { Libp2p } from "@libp2p/interface-libp2p"; import type { PeerId } from "@libp2p/interface-peer-id"; -import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; import { sha256 } from "@noble/hashes/sha256"; import { Cursor, @@ -12,19 +11,14 @@ import { ProtocolCreateOptions, } from "@waku/interfaces"; import { proto_store as proto } from "@waku/proto"; -import { - concat, - getPeersForProtocol, - selectConnection, - selectPeerForProtocol, - utf8ToBytes, -} from "@waku/utils"; +import { concat, utf8ToBytes } from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { Uint8ArrayList } from "uint8arraylist"; +import { BaseProtocol } from "../base_protocol.js"; import { DefaultPubSubTopic } from "../constants.js"; import { toProtoMessage } from "../to_proto_message.js"; @@ -84,12 +78,11 @@ export interface QueryOptions { * * The Waku Store protocol can be used to retrieved historical messages. */ -class Store implements IStore { - multicodec: string; +class Store extends BaseProtocol implements IStore { options: ProtocolCreateOptions; constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { - this.multicodec = StoreCodec; + super(StoreCodec, libp2p); this.options = options ?? {}; } @@ -237,25 +230,10 @@ class Store implements IStore { peerId: options?.peerId?.toString(), }); - const res = await selectPeerForProtocol( - this.peerStore, - [StoreCodec], - options?.peerId - ); - - if (!res) { - throw new Error("Failed to get a peer"); - } - const { peer, protocol } = res; - - const connections = this.libp2p.getConnections(peer.id); - const connection = selectConnection(connections); - - if (!connection) throw "Failed to get a connection to the peer"; + const peer = await this.getPeer(options?.peerId); for await (const messages of paginate( - connection, - protocol, + this.newStream.bind(this, peer), queryOpts, decodersAsMap, options?.cursor @@ -263,23 +241,10 @@ class Store implements IStore { yield messages; } } - - /** - * Returns known peers from the address book (`libp2p.peerStore`) that support - * store protocol. Waku may or may not be currently connected to these peers. - */ - async peers(): Promise { - return getPeersForProtocol(this.peerStore, [StoreCodec]); - } - - get peerStore(): PeerStore { - return this.libp2p.peerStore; - } } async function* paginate( - connection: Connection, - protocol: string, + streamFactory: () => Promise, queryOpts: Params, decoders: Map>, cursor?: Cursor @@ -297,16 +262,16 @@ async function* paginate( while (true) { queryOpts.cursor = currentCursor; - const stream = await connection.newStream(protocol); const historyRpcQuery = HistoryRPC.createQuery(queryOpts); log( "Querying store peer", - connection.remoteAddr.toString(), `for (${queryOpts.pubSubTopic})`, queryOpts.contentTopics ); + const stream = await streamFactory(); + const res = await pipe( [historyRpcQuery.encode()], lp.encode(), diff --git a/packages/peer-exchange/src/waku_peer_exchange.ts b/packages/peer-exchange/src/waku_peer_exchange.ts index ec66bae934..79c7180c1c 100644 --- a/packages/peer-exchange/src/waku_peer_exchange.ts +++ b/packages/peer-exchange/src/waku_peer_exchange.ts @@ -119,15 +119,12 @@ export class WakuPeerExchange implements IPeerExchange { * @returns A peer to query */ private async getPeer(peerId?: PeerId): Promise { - const res = await selectPeerForProtocol( + const { peer } = await selectPeerForProtocol( this.peerStore, [PeerExchangeCodec], peerId ); - if (!res) { - throw new Error(`Failed to select peer for ${PeerExchangeCodec}`); - } - return res.peer; + return peer; } /** diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index f760a16953..13536c6ba5 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -42,22 +42,22 @@ export async function selectPeerForProtocol( peerStore: PeerStore, protocols: string[], peerId?: PeerId -): Promise<{ peer: Peer; protocol: string } | undefined> { +): Promise<{ peer: Peer; protocol: string }> { let peer; if (peerId) { peer = await peerStore.get(peerId); if (!peer) { - log( + throw new Error( `Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}` ); - return; } } else { const peers = await getPeersForProtocol(peerStore, protocols); peer = selectRandomPeer(peers); if (!peer) { - log("Failed to find known peer that registers protocols", protocols); - return; + throw new Error( + `Failed to find known peer that registers protocols: ${protocols}` + ); } } @@ -70,11 +70,9 @@ export async function selectPeerForProtocol( } log(`Using codec ${protocol}`); if (!protocol) { - log( - `Peer does not register required protocols: ${peer.id.toString()}`, - protocols + throw new Error( + `Peer does not register required protocols (${peer.id.toString()}): ${protocols}` ); - return; } return { peer, protocol };