Merge pull request #1137 from waku-org/chore/remove-dupe-code

chore: reduce cross duplication across req-resp protocols
This commit is contained in:
fryorcraken.eth 2023-02-27 10:05:16 +11:00 committed by GitHub
commit 6cdcc0546b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 88 additions and 159 deletions

View File

@ -30,7 +30,16 @@
"no-constant-condition": ["error", { "checkLoops": false }], "no-constant-condition": ["error", { "checkLoops": false }],
"import/no-extraneous-dependencies": [ "import/no-extraneous-dependencies": [
"error", "error",
{ "devDependencies": ["**/*.test.ts", "**/*.spec.ts", "**/tests/**"] } {
"devDependencies": [
"**/*.test.ts",
"**/*.spec.ts",
"**/tests/**",
"**/rollup.config.js",
"**/.eslintrc.cjs",
"**/karma.conf.cjs"
]
}
], ],
"sort-imports": [ "sort-imports": [
"error", "error",

2
package-lock.json generated
View File

@ -10,8 +10,8 @@
"packages/proto", "packages/proto",
"packages/interfaces", "packages/interfaces",
"packages/enr", "packages/enr",
"packages/peer-exchange",
"packages/core", "packages/core",
"packages/peer-exchange",
"packages/dns-discovery", "packages/dns-discovery",
"packages/message-encryption", "packages/message-encryption",
"packages/create", "packages/create",

View File

@ -7,8 +7,8 @@
"packages/proto", "packages/proto",
"packages/interfaces", "packages/interfaces",
"packages/enr", "packages/enr",
"packages/peer-exchange",
"packages/core", "packages/core",
"packages/peer-exchange",
"packages/dns-discovery", "packages/dns-discovery",
"packages/message-encryption", "packages/message-encryption",
"packages/create", "packages/create",

View File

@ -0,0 +1,48 @@
import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer, PeerStore } from "@libp2p/interface-peer-store";
import {
getPeersForProtocol,
selectConnection,
selectPeerForProtocol,
} from "@waku/utils";
/**
* A class with predefined helpers, to be used as a base to implement Waku
* Protocols.
*/
export class BaseProtocol {
constructor(public multicodec: string, public libp2p: Libp2p) {}
/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}
get peerStore(): PeerStore {
return this.libp2p.peerStore;
}
protected async getPeer(peerId?: PeerId): Promise<Peer> {
const { peer } = await selectPeerForProtocol(
this.peerStore,
[this.multicodec],
peerId
);
return peer;
}
protected async newStream(peer: Peer): Promise<Stream> {
const connections = this.libp2p.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
return connection.newStream(this.multicodec);
}
}

View File

@ -1,7 +1,4 @@
import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p"; import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerStore } from "@libp2p/interface-peer-store";
import type { Peer } from "@libp2p/interface-peer-store"; import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar"; import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type { import type {
@ -14,17 +11,12 @@ import type {
ProtocolOptions, ProtocolOptions,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { WakuMessage as WakuMessageProto } from "@waku/proto"; import { WakuMessage as WakuMessageProto } from "@waku/proto";
import {
getPeersForProtocol,
selectConnection,
selectPeerForProtocol,
selectRandomPeer,
} from "@waku/utils";
import debug from "debug"; import debug from "debug";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe"; import { pipe } from "it-pipe";
import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubSubTopic } from "../constants.js"; import { DefaultPubSubTopic } from "../constants.js";
import { groupByContentTopic } from "../group_by.js"; import { groupByContentTopic } from "../group_by.js";
import { toProtoMessage } from "../to_proto_message.js"; import { toProtoMessage } from "../to_proto_message.js";
@ -46,8 +38,7 @@ export type UnsubscribeFunction = () => Promise<void>;
* - https://github.com/status-im/go-waku/issues/245 * - https://github.com/status-im/go-waku/issues/245
* - https://github.com/status-im/nwaku/issues/948 * - https://github.com/status-im/nwaku/issues/948
*/ */
class Filter implements IFilter { class Filter extends BaseProtocol implements IFilter {
multicodec: string;
options: ProtocolCreateOptions; options: ProtocolCreateOptions;
private subscriptions: Map<string, Callback<any>>; private subscriptions: Map<string, Callback<any>>;
private decoders: Map< private decoders: Map<
@ -56,12 +47,12 @@ class Filter implements IFilter {
>; >;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodec, libp2p);
this.options = options ?? {}; this.options = options ?? {};
this.multicodec = FilterCodec;
this.subscriptions = new Map(); this.subscriptions = new Map();
this.decoders = new Map(); this.decoders = new Map();
this.libp2p this.libp2p
.handle(FilterCodec, this.onRequest.bind(this)) .handle(this.multicodec, this.onRequest.bind(this))
.catch((e) => log("Failed to register filter protocol", e)); .catch((e) => log("Failed to register filter protocol", e));
} }
@ -132,10 +123,6 @@ class Filter implements IFilter {
}; };
} }
get peerStore(): PeerStore {
return this.libp2p.peerStore;
}
private onRequest(streamData: IncomingStreamData): void { private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push"); log("Receiving message push");
try { try {
@ -256,36 +243,6 @@ class Filter implements IFilter {
throw e; throw e;
} }
} }
private async newStream(peer: Peer): Promise<Stream> {
const connections = this.libp2p.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
return connection.newStream(FilterCodec);
}
private async getPeer(peerId?: PeerId): Promise<Peer> {
const res = await selectPeerForProtocol(
this.peerStore,
[FilterCodec],
peerId
);
if (!res) {
throw new Error(`Failed to select peer for ${FilterCodec}`);
}
return res.peer;
}
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [FilterCodec]);
}
async randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(await this.peers());
}
} }
export function wakuFilter( export function wakuFilter(

View File

@ -1,7 +1,5 @@
import type { Libp2p } from "@libp2p/interface-libp2p"; import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import type { PeerStore } from "@libp2p/interface-peer-store";
import type { import type {
IEncoder, IEncoder,
ILightPush, ILightPush,
@ -11,18 +9,13 @@ import type {
SendResult, SendResult,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { PushResponse } from "@waku/proto"; import { PushResponse } from "@waku/proto";
import {
getPeersForProtocol,
selectConnection,
selectPeerForProtocol,
selectRandomPeer,
} from "@waku/utils";
import debug from "debug"; import debug from "debug";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe"; import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist"; import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubSubTopic } from "../constants.js"; import { DefaultPubSubTopic } from "../constants.js";
import { PushRPC } from "./push_rpc.js"; import { PushRPC } from "./push_rpc.js";
@ -35,12 +28,11 @@ export { PushResponse };
/** /**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/ */
class LightPush implements ILightPush { class LightPush extends BaseProtocol implements ILightPush {
multicodec: string;
options: ProtocolCreateOptions; options: ProtocolCreateOptions;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
this.multicodec = LightPushCodec; super(LightPushCodec, libp2p);
this.options = options || {}; this.options = options || {};
} }
@ -51,23 +43,8 @@ class LightPush implements ILightPush {
): Promise<SendResult> { ): Promise<SendResult> {
const { pubSubTopic = DefaultPubSubTopic } = this.options; const { pubSubTopic = DefaultPubSubTopic } = this.options;
const res = await selectPeerForProtocol( const peer = await this.getPeer(opts?.peerId);
this.peerStore, const stream = await this.newStream(peer);
[this.multicodec],
opts?.peerId
);
if (!res) {
throw new Error("Failed to get a peer");
}
const { peer } = res;
const connections = this.libp2p.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) throw "Failed to get a connection to the peer";
const stream = await connection.newStream(LightPushCodec);
const recipients: PeerId[] = []; const recipients: PeerId[] = [];
@ -109,28 +86,6 @@ class LightPush implements ILightPush {
} }
return { recipients }; return { recipients };
} }
/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* light push protocol. Waku may or may not be currently connected to these
* peers.
*/
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [LightPushCodec]);
}
/**
* Returns a random peer that supports light push protocol from the address
* book (`libp2p.peerStore`). Waku may or may not be currently connected to
* this peer.
*/
async randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(await this.peers());
}
get peerStore(): PeerStore {
return this.libp2p.peerStore;
}
} }
export function wakuLightPush( export function wakuLightPush(

View File

@ -1,7 +1,6 @@
import type { Connection } from "@libp2p/interface-connection"; import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p"; import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id"; import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer, PeerStore } from "@libp2p/interface-peer-store";
import { sha256 } from "@noble/hashes/sha256"; import { sha256 } from "@noble/hashes/sha256";
import { import {
Cursor, Cursor,
@ -12,19 +11,14 @@ import {
ProtocolCreateOptions, ProtocolCreateOptions,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto"; import { proto_store as proto } from "@waku/proto";
import { import { concat, utf8ToBytes } from "@waku/utils";
concat,
getPeersForProtocol,
selectConnection,
selectPeerForProtocol,
utf8ToBytes,
} from "@waku/utils";
import debug from "debug"; import debug from "debug";
import all from "it-all"; import all from "it-all";
import * as lp from "it-length-prefixed"; import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe"; import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist"; import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubSubTopic } from "../constants.js"; import { DefaultPubSubTopic } from "../constants.js";
import { toProtoMessage } from "../to_proto_message.js"; import { toProtoMessage } from "../to_proto_message.js";
@ -84,12 +78,11 @@ export interface QueryOptions {
* *
* The Waku Store protocol can be used to retrieved historical messages. * The Waku Store protocol can be used to retrieved historical messages.
*/ */
class Store implements IStore { class Store extends BaseProtocol implements IStore {
multicodec: string;
options: ProtocolCreateOptions; options: ProtocolCreateOptions;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) { constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
this.multicodec = StoreCodec; super(StoreCodec, libp2p);
this.options = options ?? {}; this.options = options ?? {};
} }
@ -237,25 +230,10 @@ class Store implements IStore {
peerId: options?.peerId?.toString(), peerId: options?.peerId?.toString(),
}); });
const res = await selectPeerForProtocol( const peer = await this.getPeer(options?.peerId);
this.peerStore,
[StoreCodec],
options?.peerId
);
if (!res) {
throw new Error("Failed to get a peer");
}
const { peer, protocol } = res;
const connections = this.libp2p.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) throw "Failed to get a connection to the peer";
for await (const messages of paginate<T>( for await (const messages of paginate<T>(
connection, this.newStream.bind(this, peer),
protocol,
queryOpts, queryOpts,
decodersAsMap, decodersAsMap,
options?.cursor options?.cursor
@ -263,23 +241,10 @@ class Store implements IStore {
yield messages; yield messages;
} }
} }
/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* store protocol. Waku may or may not be currently connected to these peers.
*/
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [StoreCodec]);
}
get peerStore(): PeerStore {
return this.libp2p.peerStore;
}
} }
async function* paginate<T extends IDecodedMessage>( async function* paginate<T extends IDecodedMessage>(
connection: Connection, streamFactory: () => Promise<Stream>,
protocol: string,
queryOpts: Params, queryOpts: Params,
decoders: Map<string, IDecoder<T>>, decoders: Map<string, IDecoder<T>>,
cursor?: Cursor cursor?: Cursor
@ -297,16 +262,16 @@ async function* paginate<T extends IDecodedMessage>(
while (true) { while (true) {
queryOpts.cursor = currentCursor; queryOpts.cursor = currentCursor;
const stream = await connection.newStream(protocol);
const historyRpcQuery = HistoryRPC.createQuery(queryOpts); const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
log( log(
"Querying store peer", "Querying store peer",
connection.remoteAddr.toString(),
`for (${queryOpts.pubSubTopic})`, `for (${queryOpts.pubSubTopic})`,
queryOpts.contentTopics queryOpts.contentTopics
); );
const stream = await streamFactory();
const res = await pipe( const res = await pipe(
[historyRpcQuery.encode()], [historyRpcQuery.encode()],
lp.encode(), lp.encode(),

View File

@ -119,15 +119,12 @@ export class WakuPeerExchange implements IPeerExchange {
* @returns A peer to query * @returns A peer to query
*/ */
private async getPeer(peerId?: PeerId): Promise<Peer> { private async getPeer(peerId?: PeerId): Promise<Peer> {
const res = await selectPeerForProtocol( const { peer } = await selectPeerForProtocol(
this.peerStore, this.peerStore,
[PeerExchangeCodec], [PeerExchangeCodec],
peerId peerId
); );
if (!res) { return peer;
throw new Error(`Failed to select peer for ${PeerExchangeCodec}`);
}
return res.peer;
} }
/** /**

View File

@ -42,22 +42,22 @@ export async function selectPeerForProtocol(
peerStore: PeerStore, peerStore: PeerStore,
protocols: string[], protocols: string[],
peerId?: PeerId peerId?: PeerId
): Promise<{ peer: Peer; protocol: string } | undefined> { ): Promise<{ peer: Peer; protocol: string }> {
let peer; let peer;
if (peerId) { if (peerId) {
peer = await peerStore.get(peerId); peer = await peerStore.get(peerId);
if (!peer) { if (!peer) {
log( throw new Error(
`Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}` `Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}`
); );
return;
} }
} else { } else {
const peers = await getPeersForProtocol(peerStore, protocols); const peers = await getPeersForProtocol(peerStore, protocols);
peer = selectRandomPeer(peers); peer = selectRandomPeer(peers);
if (!peer) { if (!peer) {
log("Failed to find known peer that registers protocols", protocols); throw new Error(
return; `Failed to find known peer that registers protocols: ${protocols}`
);
} }
} }
@ -70,11 +70,9 @@ export async function selectPeerForProtocol(
} }
log(`Using codec ${protocol}`); log(`Using codec ${protocol}`);
if (!protocol) { if (!protocol) {
log( throw new Error(
`Peer does not register required protocols: ${peer.id.toString()}`, `Peer does not register required protocols (${peer.id.toString()}): ${protocols}`
protocols
); );
return;
} }
return { peer, protocol }; return { peer, protocol };