From c85b113df74928b9aeb508df4c3f248934b1e781 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Wed, 25 Jan 2023 17:25:46 +1100 Subject: [PATCH 1/5] chore: use `this.multicodec` over constant To enable extraction of common functions. --- packages/core/src/lib/filter/index.ts | 10 +++++----- packages/core/src/lib/light_push/index.ts | 4 ++-- packages/core/src/lib/store/index.ts | 4 ++-- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 31f66e8f5b..341c68b0fc 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -61,7 +61,7 @@ class Filter implements IFilter { this.subscriptions = new Map(); this.decoders = new Map(); this.libp2p - .handle(FilterCodec, this.onRequest.bind(this)) + .handle(this.multicodec, this.onRequest.bind(this)) .catch((e) => log("Failed to register filter protocol", e)); } @@ -264,23 +264,23 @@ class Filter implements IFilter { throw new Error("Failed to get a connection to the peer"); } - return connection.newStream(FilterCodec); + return connection.newStream(this.multicodec); } private async getPeer(peerId?: PeerId): Promise { const res = await selectPeerForProtocol( this.peerStore, - [FilterCodec], + [this.multicodec], peerId ); if (!res) { - throw new Error(`Failed to select peer for ${FilterCodec}`); + throw new Error(`Failed to select peer for ${this.multicodec}`); } return res.peer; } async peers(): Promise { - return getPeersForProtocol(this.peerStore, [FilterCodec]); + return getPeersForProtocol(this.peerStore, [this.multicodec]); } async randomPeer(): Promise { diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index b5fd967511..4e98917b4f 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -67,7 +67,7 @@ class LightPush implements ILightPush { if (!connection) throw "Failed to get a connection to the peer"; - const stream = await connection.newStream(LightPushCodec); + const stream = await connection.newStream(this.multicodec); const recipients: PeerId[] = []; @@ -116,7 +116,7 @@ class LightPush implements ILightPush { * peers. */ async peers(): Promise { - return getPeersForProtocol(this.peerStore, [LightPushCodec]); + return getPeersForProtocol(this.peerStore, [this.multicodec]); } /** diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index a1f71ce746..9e25aa220b 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -239,7 +239,7 @@ class Store implements IStore { const res = await selectPeerForProtocol( this.peerStore, - [StoreCodec], + [this.multicodec], options?.peerId ); @@ -269,7 +269,7 @@ class Store implements IStore { * store protocol. Waku may or may not be currently connected to these peers. */ async peers(): Promise { - return getPeersForProtocol(this.peerStore, [StoreCodec]); + return getPeersForProtocol(this.peerStore, [this.multicodec]); } get peerStore(): PeerStore { From 7d29ed1d993ca2be8737ef327d546727bd317576 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Wed, 25 Jan 2023 17:34:36 +1100 Subject: [PATCH 2/5] chore: move error throwing within `selectPeerForProtocol` As all callers throw upon undefined result. --- packages/core/src/lib/filter/index.ts | 7 ++----- packages/peer-exchange/src/waku_peer_exchange.ts | 7 ++----- packages/utils/src/libp2p/index.ts | 16 +++++++--------- 3 files changed, 11 insertions(+), 19 deletions(-) diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 341c68b0fc..336311f3e0 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -268,15 +268,12 @@ class Filter implements IFilter { } private async getPeer(peerId?: PeerId): Promise { - const res = await selectPeerForProtocol( + const { peer } = await selectPeerForProtocol( this.peerStore, [this.multicodec], peerId ); - if (!res) { - throw new Error(`Failed to select peer for ${this.multicodec}`); - } - return res.peer; + return peer; } async peers(): Promise { diff --git a/packages/peer-exchange/src/waku_peer_exchange.ts b/packages/peer-exchange/src/waku_peer_exchange.ts index ec66bae934..79c7180c1c 100644 --- a/packages/peer-exchange/src/waku_peer_exchange.ts +++ b/packages/peer-exchange/src/waku_peer_exchange.ts @@ -119,15 +119,12 @@ export class WakuPeerExchange implements IPeerExchange { * @returns A peer to query */ private async getPeer(peerId?: PeerId): Promise { - const res = await selectPeerForProtocol( + const { peer } = await selectPeerForProtocol( this.peerStore, [PeerExchangeCodec], peerId ); - if (!res) { - throw new Error(`Failed to select peer for ${PeerExchangeCodec}`); - } - return res.peer; + return peer; } /** diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index f760a16953..13536c6ba5 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -42,22 +42,22 @@ export async function selectPeerForProtocol( peerStore: PeerStore, protocols: string[], peerId?: PeerId -): Promise<{ peer: Peer; protocol: string } | undefined> { +): Promise<{ peer: Peer; protocol: string }> { let peer; if (peerId) { peer = await peerStore.get(peerId); if (!peer) { - log( + throw new Error( `Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}` ); - return; } } else { const peers = await getPeersForProtocol(peerStore, protocols); peer = selectRandomPeer(peers); if (!peer) { - log("Failed to find known peer that registers protocols", protocols); - return; + throw new Error( + `Failed to find known peer that registers protocols: ${protocols}` + ); } } @@ -70,11 +70,9 @@ export async function selectPeerForProtocol( } log(`Using codec ${protocol}`); if (!protocol) { - log( - `Peer does not register required protocols: ${peer.id.toString()}`, - protocols + throw new Error( + `Peer does not register required protocols (${peer.id.toString()}): ${protocols}` ); - return; } return { peer, protocol }; From 789bebed007ba5169849b27a5cca5766b21e251e Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Thu, 23 Feb 2023 14:04:27 +1100 Subject: [PATCH 3/5] chore: add `BaseProtocol` class to req-resp protocols Each protocol implementation have some common functionality. Implements those in `BaseProtocol`, to be extended by each protocol implementation. --- packages/core/src/lib/base_protocol.ts | 48 ++++++++++++++++++++ packages/core/src/lib/filter/index.ts | 46 ++----------------- packages/core/src/lib/light_push/index.ts | 55 +++-------------------- packages/core/src/lib/store/index.ts | 55 +++++------------------ 4 files changed, 66 insertions(+), 138 deletions(-) create mode 100644 packages/core/src/lib/base_protocol.ts diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts new file mode 100644 index 0000000000..8524cb4069 --- /dev/null +++ b/packages/core/src/lib/base_protocol.ts @@ -0,0 +1,48 @@ +import type { Stream } from "@libp2p/interface-connection"; +import type { Libp2p } from "@libp2p/interface-libp2p"; +import type { PeerId } from "@libp2p/interface-peer-id"; +import { Peer, PeerStore } from "@libp2p/interface-peer-store"; +import { + getPeersForProtocol, + selectConnection, + selectPeerForProtocol, +} from "@waku/utils"; + +/** + * A class with predefined helpers, to be used as a base to implement Waku + * Protocols. + */ +export class BaseProtocol { + constructor(public multicodec: string, public libp2p: Libp2p) {} + + /** + * Returns known peers from the address book (`libp2p.peerStore`) that support + * the class protocol. Waku may or may not be currently connected to these + * peers. + */ + async peers(): Promise { + return getPeersForProtocol(this.peerStore, [this.multicodec]); + } + + get peerStore(): PeerStore { + return this.libp2p.peerStore; + } + + protected async getPeer(peerId?: PeerId): Promise { + const { peer } = await selectPeerForProtocol( + this.peerStore, + [this.multicodec], + peerId + ); + return peer; + } + protected async newStream(peer: Peer): Promise { + const connections = this.libp2p.getConnections(peer.id); + const connection = selectConnection(connections); + if (!connection) { + throw new Error("Failed to get a connection to the peer"); + } + + return connection.newStream(this.multicodec); + } +} diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 336311f3e0..a2d7a08999 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -1,7 +1,4 @@ -import type { Stream } from "@libp2p/interface-connection"; import type { Libp2p } from "@libp2p/interface-libp2p"; -import type { PeerId } from "@libp2p/interface-peer-id"; -import type { PeerStore } from "@libp2p/interface-peer-store"; import type { Peer } from "@libp2p/interface-peer-store"; import type { IncomingStreamData } from "@libp2p/interface-registrar"; import type { @@ -14,17 +11,12 @@ import type { ProtocolOptions, } from "@waku/interfaces"; import { WakuMessage as WakuMessageProto } from "@waku/proto"; -import { - getPeersForProtocol, - selectConnection, - selectPeerForProtocol, - selectRandomPeer, -} from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; +import { BaseProtocol } from "../base_protocol.js"; import { DefaultPubSubTopic } from "../constants.js"; import { groupByContentTopic } from "../group_by.js"; import { toProtoMessage } from "../to_proto_message.js"; @@ -46,8 +38,7 @@ export type UnsubscribeFunction = () => Promise; * - https://github.com/status-im/go-waku/issues/245 * - https://github.com/status-im/nwaku/issues/948 */ -class Filter implements IFilter { - multicodec: string; +class Filter extends BaseProtocol implements IFilter { options: ProtocolCreateOptions; private subscriptions: Map>; private decoders: Map< @@ -56,8 +47,8 @@ class Filter implements IFilter { >; constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { + super(FilterCodec, libp2p); this.options = options ?? {}; - this.multicodec = FilterCodec; this.subscriptions = new Map(); this.decoders = new Map(); this.libp2p @@ -132,10 +123,6 @@ class Filter implements IFilter { }; } - get peerStore(): PeerStore { - return this.libp2p.peerStore; - } - private onRequest(streamData: IncomingStreamData): void { log("Receiving message push"); try { @@ -256,33 +243,6 @@ class Filter implements IFilter { throw e; } } - - private async newStream(peer: Peer): Promise { - const connections = this.libp2p.getConnections(peer.id); - const connection = selectConnection(connections); - if (!connection) { - throw new Error("Failed to get a connection to the peer"); - } - - return connection.newStream(this.multicodec); - } - - private async getPeer(peerId?: PeerId): Promise { - const { peer } = await selectPeerForProtocol( - this.peerStore, - [this.multicodec], - peerId - ); - return peer; - } - - async peers(): Promise { - return getPeersForProtocol(this.peerStore, [this.multicodec]); - } - - async randomPeer(): Promise { - return selectRandomPeer(await this.peers()); - } } export function wakuFilter( diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 4e98917b4f..30a28072e5 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -1,7 +1,5 @@ import type { Libp2p } from "@libp2p/interface-libp2p"; import type { PeerId } from "@libp2p/interface-peer-id"; -import type { Peer } from "@libp2p/interface-peer-store"; -import type { PeerStore } from "@libp2p/interface-peer-store"; import type { IEncoder, ILightPush, @@ -11,18 +9,13 @@ import type { SendResult, } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; -import { - getPeersForProtocol, - selectConnection, - selectPeerForProtocol, - selectRandomPeer, -} from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { Uint8ArrayList } from "uint8arraylist"; +import { BaseProtocol } from "../base_protocol.js"; import { DefaultPubSubTopic } from "../constants.js"; import { PushRPC } from "./push_rpc.js"; @@ -35,12 +28,11 @@ export { PushResponse }; /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ -class LightPush implements ILightPush { - multicodec: string; +class LightPush extends BaseProtocol implements ILightPush { options: ProtocolCreateOptions; constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { - this.multicodec = LightPushCodec; + super(LightPushCodec, libp2p); this.options = options || {}; } @@ -51,23 +43,8 @@ class LightPush implements ILightPush { ): Promise { const { pubSubTopic = DefaultPubSubTopic } = this.options; - const res = await selectPeerForProtocol( - this.peerStore, - [this.multicodec], - opts?.peerId - ); - - if (!res) { - throw new Error("Failed to get a peer"); - } - const { peer } = res; - - const connections = this.libp2p.getConnections(peer.id); - const connection = selectConnection(connections); - - if (!connection) throw "Failed to get a connection to the peer"; - - const stream = await connection.newStream(this.multicodec); + const peer = await this.getPeer(opts?.peerId); + const stream = await this.newStream(peer); const recipients: PeerId[] = []; @@ -109,28 +86,6 @@ class LightPush implements ILightPush { } return { recipients }; } - - /** - * Returns known peers from the address book (`libp2p.peerStore`) that support - * light push protocol. Waku may or may not be currently connected to these - * peers. - */ - async peers(): Promise { - return getPeersForProtocol(this.peerStore, [this.multicodec]); - } - - /** - * Returns a random peer that supports light push protocol from the address - * book (`libp2p.peerStore`). Waku may or may not be currently connected to - * this peer. - */ - async randomPeer(): Promise { - return selectRandomPeer(await this.peers()); - } - - get peerStore(): PeerStore { - return this.libp2p.peerStore; - } } export function wakuLightPush( diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 9e25aa220b..9e4e3129fd 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -1,7 +1,6 @@ -import type { Connection } from "@libp2p/interface-connection"; +import type { Stream } from "@libp2p/interface-connection"; import type { Libp2p } from "@libp2p/interface-libp2p"; import type { PeerId } from "@libp2p/interface-peer-id"; -import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; import { sha256 } from "@noble/hashes/sha256"; import { Cursor, @@ -12,19 +11,14 @@ import { ProtocolCreateOptions, } from "@waku/interfaces"; import { proto_store as proto } from "@waku/proto"; -import { - concat, - getPeersForProtocol, - selectConnection, - selectPeerForProtocol, - utf8ToBytes, -} from "@waku/utils"; +import { concat, utf8ToBytes } from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { Uint8ArrayList } from "uint8arraylist"; +import { BaseProtocol } from "../base_protocol.js"; import { DefaultPubSubTopic } from "../constants.js"; import { toProtoMessage } from "../to_proto_message.js"; @@ -84,12 +78,11 @@ export interface QueryOptions { * * The Waku Store protocol can be used to retrieved historical messages. */ -class Store implements IStore { - multicodec: string; +class Store extends BaseProtocol implements IStore { options: ProtocolCreateOptions; constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { - this.multicodec = StoreCodec; + super(StoreCodec, libp2p); this.options = options ?? {}; } @@ -237,25 +230,10 @@ class Store implements IStore { peerId: options?.peerId?.toString(), }); - const res = await selectPeerForProtocol( - this.peerStore, - [this.multicodec], - options?.peerId - ); - - if (!res) { - throw new Error("Failed to get a peer"); - } - const { peer, protocol } = res; - - const connections = this.libp2p.getConnections(peer.id); - const connection = selectConnection(connections); - - if (!connection) throw "Failed to get a connection to the peer"; + const peer = await this.getPeer(options?.peerId); for await (const messages of paginate( - connection, - protocol, + this.newStream.bind(this, peer), queryOpts, decodersAsMap, options?.cursor @@ -263,23 +241,10 @@ class Store implements IStore { yield messages; } } - - /** - * Returns known peers from the address book (`libp2p.peerStore`) that support - * store protocol. Waku may or may not be currently connected to these peers. - */ - async peers(): Promise { - return getPeersForProtocol(this.peerStore, [this.multicodec]); - } - - get peerStore(): PeerStore { - return this.libp2p.peerStore; - } } async function* paginate( - connection: Connection, - protocol: string, + streamFactory: () => Promise, queryOpts: Params, decoders: Map>, cursor?: Cursor @@ -297,16 +262,16 @@ async function* paginate( while (true) { queryOpts.cursor = currentCursor; - const stream = await connection.newStream(protocol); const historyRpcQuery = HistoryRPC.createQuery(queryOpts); log( "Querying store peer", - connection.remoteAddr.toString(), `for (${queryOpts.pubSubTopic})`, queryOpts.contentTopics ); + const stream = await streamFactory(); + const res = await pipe( [historyRpcQuery.encode()], lp.encode(), From c5546e7eee018977fa2ef8a04185371f9c05d0fc Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Fri, 27 Jan 2023 12:11:10 +1100 Subject: [PATCH 4/5] test: build tools are expected to use dev dependencies --- .eslintrc.json | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/.eslintrc.json b/.eslintrc.json index e96e551a54..d324999ce5 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -30,7 +30,16 @@ "no-constant-condition": ["error", { "checkLoops": false }], "import/no-extraneous-dependencies": [ "error", - { "devDependencies": ["**/*.test.ts", "**/*.spec.ts", "**/tests/**"] } + { + "devDependencies": [ + "**/*.test.ts", + "**/*.spec.ts", + "**/tests/**", + "**/rollup.config.js", + "**/.eslintrc.cjs", + "**/karma.conf.cjs" + ] + } ], "sort-imports": [ "error", From a78d72b7fdbdbf85e3d47b33218e138a707a8491 Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Fri, 27 Jan 2023 13:42:43 +1100 Subject: [PATCH 5/5] chore: re-order package build peer-exchange now depends on core. --- package-lock.json | 2 +- package.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2a3ec1b5e4..0d1b762933 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,8 +10,8 @@ "packages/proto", "packages/interfaces", "packages/enr", - "packages/peer-exchange", "packages/core", + "packages/peer-exchange", "packages/dns-discovery", "packages/message-encryption", "packages/create", diff --git a/package.json b/package.json index 672d52f6c6..84508d8e74 100644 --- a/package.json +++ b/package.json @@ -7,8 +7,8 @@ "packages/proto", "packages/interfaces", "packages/enr", - "packages/peer-exchange", "packages/core", + "packages/peer-exchange", "packages/dns-discovery", "packages/message-encryption", "packages/create",