From 789bebed007ba5169849b27a5cca5766b21e251e Mon Sep 17 00:00:00 2001 From: "fryorcraken.eth" Date: Thu, 23 Feb 2023 14:04:27 +1100 Subject: [PATCH] chore: add `BaseProtocol` class to req-resp protocols Each protocol implementation have some common functionality. Implements those in `BaseProtocol`, to be extended by each protocol implementation. --- packages/core/src/lib/base_protocol.ts | 48 ++++++++++++++++++++ packages/core/src/lib/filter/index.ts | 46 ++----------------- packages/core/src/lib/light_push/index.ts | 55 +++-------------------- packages/core/src/lib/store/index.ts | 55 +++++------------------ 4 files changed, 66 insertions(+), 138 deletions(-) create mode 100644 packages/core/src/lib/base_protocol.ts diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts new file mode 100644 index 0000000000..8524cb4069 --- /dev/null +++ b/packages/core/src/lib/base_protocol.ts @@ -0,0 +1,48 @@ +import type { Stream } from "@libp2p/interface-connection"; +import type { Libp2p } from "@libp2p/interface-libp2p"; +import type { PeerId } from "@libp2p/interface-peer-id"; +import { Peer, PeerStore } from "@libp2p/interface-peer-store"; +import { + getPeersForProtocol, + selectConnection, + selectPeerForProtocol, +} from "@waku/utils"; + +/** + * A class with predefined helpers, to be used as a base to implement Waku + * Protocols. + */ +export class BaseProtocol { + constructor(public multicodec: string, public libp2p: Libp2p) {} + + /** + * Returns known peers from the address book (`libp2p.peerStore`) that support + * the class protocol. Waku may or may not be currently connected to these + * peers. + */ + async peers(): Promise { + return getPeersForProtocol(this.peerStore, [this.multicodec]); + } + + get peerStore(): PeerStore { + return this.libp2p.peerStore; + } + + protected async getPeer(peerId?: PeerId): Promise { + const { peer } = await selectPeerForProtocol( + this.peerStore, + [this.multicodec], + peerId + ); + return peer; + } + protected async newStream(peer: Peer): Promise { + const connections = this.libp2p.getConnections(peer.id); + const connection = selectConnection(connections); + if (!connection) { + throw new Error("Failed to get a connection to the peer"); + } + + return connection.newStream(this.multicodec); + } +} diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 336311f3e0..a2d7a08999 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -1,7 +1,4 @@ -import type { Stream } from "@libp2p/interface-connection"; import type { Libp2p } from "@libp2p/interface-libp2p"; -import type { PeerId } from "@libp2p/interface-peer-id"; -import type { PeerStore } from "@libp2p/interface-peer-store"; import type { Peer } from "@libp2p/interface-peer-store"; import type { IncomingStreamData } from "@libp2p/interface-registrar"; import type { @@ -14,17 +11,12 @@ import type { ProtocolOptions, } from "@waku/interfaces"; import { WakuMessage as WakuMessageProto } from "@waku/proto"; -import { - getPeersForProtocol, - selectConnection, - selectPeerForProtocol, - selectRandomPeer, -} from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; +import { BaseProtocol } from "../base_protocol.js"; import { DefaultPubSubTopic } from "../constants.js"; import { groupByContentTopic } from "../group_by.js"; import { toProtoMessage } from "../to_proto_message.js"; @@ -46,8 +38,7 @@ export type UnsubscribeFunction = () => Promise; * - https://github.com/status-im/go-waku/issues/245 * - https://github.com/status-im/nwaku/issues/948 */ -class Filter implements IFilter { - multicodec: string; +class Filter extends BaseProtocol implements IFilter { options: ProtocolCreateOptions; private subscriptions: Map>; private decoders: Map< @@ -56,8 +47,8 @@ class Filter implements IFilter { >; constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { + super(FilterCodec, libp2p); this.options = options ?? {}; - this.multicodec = FilterCodec; this.subscriptions = new Map(); this.decoders = new Map(); this.libp2p @@ -132,10 +123,6 @@ class Filter implements IFilter { }; } - get peerStore(): PeerStore { - return this.libp2p.peerStore; - } - private onRequest(streamData: IncomingStreamData): void { log("Receiving message push"); try { @@ -256,33 +243,6 @@ class Filter implements IFilter { throw e; } } - - private async newStream(peer: Peer): Promise { - const connections = this.libp2p.getConnections(peer.id); - const connection = selectConnection(connections); - if (!connection) { - throw new Error("Failed to get a connection to the peer"); - } - - return connection.newStream(this.multicodec); - } - - private async getPeer(peerId?: PeerId): Promise { - const { peer } = await selectPeerForProtocol( - this.peerStore, - [this.multicodec], - peerId - ); - return peer; - } - - async peers(): Promise { - return getPeersForProtocol(this.peerStore, [this.multicodec]); - } - - async randomPeer(): Promise { - return selectRandomPeer(await this.peers()); - } } export function wakuFilter( diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 4e98917b4f..30a28072e5 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -1,7 +1,5 @@ import type { Libp2p } from "@libp2p/interface-libp2p"; import type { PeerId } from "@libp2p/interface-peer-id"; -import type { Peer } from "@libp2p/interface-peer-store"; -import type { PeerStore } from "@libp2p/interface-peer-store"; import type { IEncoder, ILightPush, @@ -11,18 +9,13 @@ import type { SendResult, } from "@waku/interfaces"; import { PushResponse } from "@waku/proto"; -import { - getPeersForProtocol, - selectConnection, - selectPeerForProtocol, - selectRandomPeer, -} from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { Uint8ArrayList } from "uint8arraylist"; +import { BaseProtocol } from "../base_protocol.js"; import { DefaultPubSubTopic } from "../constants.js"; import { PushRPC } from "./push_rpc.js"; @@ -35,12 +28,11 @@ export { PushResponse }; /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ -class LightPush implements ILightPush { - multicodec: string; +class LightPush extends BaseProtocol implements ILightPush { options: ProtocolCreateOptions; constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { - this.multicodec = LightPushCodec; + super(LightPushCodec, libp2p); this.options = options || {}; } @@ -51,23 +43,8 @@ class LightPush implements ILightPush { ): Promise { const { pubSubTopic = DefaultPubSubTopic } = this.options; - const res = await selectPeerForProtocol( - this.peerStore, - [this.multicodec], - opts?.peerId - ); - - if (!res) { - throw new Error("Failed to get a peer"); - } - const { peer } = res; - - const connections = this.libp2p.getConnections(peer.id); - const connection = selectConnection(connections); - - if (!connection) throw "Failed to get a connection to the peer"; - - const stream = await connection.newStream(this.multicodec); + const peer = await this.getPeer(opts?.peerId); + const stream = await this.newStream(peer); const recipients: PeerId[] = []; @@ -109,28 +86,6 @@ class LightPush implements ILightPush { } return { recipients }; } - - /** - * Returns known peers from the address book (`libp2p.peerStore`) that support - * light push protocol. Waku may or may not be currently connected to these - * peers. - */ - async peers(): Promise { - return getPeersForProtocol(this.peerStore, [this.multicodec]); - } - - /** - * Returns a random peer that supports light push protocol from the address - * book (`libp2p.peerStore`). Waku may or may not be currently connected to - * this peer. - */ - async randomPeer(): Promise { - return selectRandomPeer(await this.peers()); - } - - get peerStore(): PeerStore { - return this.libp2p.peerStore; - } } export function wakuLightPush( diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 9e25aa220b..9e4e3129fd 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -1,7 +1,6 @@ -import type { Connection } from "@libp2p/interface-connection"; +import type { Stream } from "@libp2p/interface-connection"; import type { Libp2p } from "@libp2p/interface-libp2p"; import type { PeerId } from "@libp2p/interface-peer-id"; -import type { Peer, PeerStore } from "@libp2p/interface-peer-store"; import { sha256 } from "@noble/hashes/sha256"; import { Cursor, @@ -12,19 +11,14 @@ import { ProtocolCreateOptions, } from "@waku/interfaces"; import { proto_store as proto } from "@waku/proto"; -import { - concat, - getPeersForProtocol, - selectConnection, - selectPeerForProtocol, - utf8ToBytes, -} from "@waku/utils"; +import { concat, utf8ToBytes } from "@waku/utils"; import debug from "debug"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; import { Uint8ArrayList } from "uint8arraylist"; +import { BaseProtocol } from "../base_protocol.js"; import { DefaultPubSubTopic } from "../constants.js"; import { toProtoMessage } from "../to_proto_message.js"; @@ -84,12 +78,11 @@ export interface QueryOptions { * * The Waku Store protocol can be used to retrieved historical messages. */ -class Store implements IStore { - multicodec: string; +class Store extends BaseProtocol implements IStore { options: ProtocolCreateOptions; constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { - this.multicodec = StoreCodec; + super(StoreCodec, libp2p); this.options = options ?? {}; } @@ -237,25 +230,10 @@ class Store implements IStore { peerId: options?.peerId?.toString(), }); - const res = await selectPeerForProtocol( - this.peerStore, - [this.multicodec], - options?.peerId - ); - - if (!res) { - throw new Error("Failed to get a peer"); - } - const { peer, protocol } = res; - - const connections = this.libp2p.getConnections(peer.id); - const connection = selectConnection(connections); - - if (!connection) throw "Failed to get a connection to the peer"; + const peer = await this.getPeer(options?.peerId); for await (const messages of paginate( - connection, - protocol, + this.newStream.bind(this, peer), queryOpts, decodersAsMap, options?.cursor @@ -263,23 +241,10 @@ class Store implements IStore { yield messages; } } - - /** - * Returns known peers from the address book (`libp2p.peerStore`) that support - * store protocol. Waku may or may not be currently connected to these peers. - */ - async peers(): Promise { - return getPeersForProtocol(this.peerStore, [this.multicodec]); - } - - get peerStore(): PeerStore { - return this.libp2p.peerStore; - } } async function* paginate( - connection: Connection, - protocol: string, + streamFactory: () => Promise, queryOpts: Params, decoders: Map>, cursor?: Cursor @@ -297,16 +262,16 @@ async function* paginate( while (true) { queryOpts.cursor = currentCursor; - const stream = await connection.newStream(protocol); const historyRpcQuery = HistoryRPC.createQuery(queryOpts); log( "Querying store peer", - connection.remoteAddr.toString(), `for (${queryOpts.pubSubTopic})`, queryOpts.contentTopics ); + const stream = await streamFactory(); + const res = await pipe( [historyRpcQuery.encode()], lp.encode(),