chore: add `BaseProtocol` class to req-resp protocols

Each protocol implementation have some common functionality.
Implements those in `BaseProtocol`, to be extended by each protocol
implementation.
This commit is contained in:
fryorcraken.eth 2023-02-23 14:04:27 +11:00
parent 7d29ed1d99
commit 789bebed00
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
4 changed files with 66 additions and 138 deletions

View File

@ -0,0 +1,48 @@
import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer, PeerStore } from "@libp2p/interface-peer-store";
import {
getPeersForProtocol,
selectConnection,
selectPeerForProtocol,
} from "@waku/utils";
/**
* A class with predefined helpers, to be used as a base to implement Waku
* Protocols.
*/
export class BaseProtocol {
constructor(public multicodec: string, public libp2p: Libp2p) {}
/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* the class protocol. Waku may or may not be currently connected to these
* peers.
*/
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}
get peerStore(): PeerStore {
return this.libp2p.peerStore;
}
protected async getPeer(peerId?: PeerId): Promise<Peer> {
const { peer } = await selectPeerForProtocol(
this.peerStore,
[this.multicodec],
peerId
);
return peer;
}
protected async newStream(peer: Peer): Promise<Stream> {
const connections = this.libp2p.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
return connection.newStream(this.multicodec);
}
}

View File

@ -1,7 +1,4 @@
import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PeerStore } from "@libp2p/interface-peer-store";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import type {
@ -14,17 +11,12 @@ import type {
ProtocolOptions,
} from "@waku/interfaces";
import { WakuMessage as WakuMessageProto } from "@waku/proto";
import {
getPeersForProtocol,
selectConnection,
selectPeerForProtocol,
selectRandomPeer,
} from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubSubTopic } from "../constants.js";
import { groupByContentTopic } from "../group_by.js";
import { toProtoMessage } from "../to_proto_message.js";
@ -46,8 +38,7 @@ export type UnsubscribeFunction = () => Promise<void>;
* - https://github.com/status-im/go-waku/issues/245
* - https://github.com/status-im/nwaku/issues/948
*/
class Filter implements IFilter {
multicodec: string;
class Filter extends BaseProtocol implements IFilter {
options: ProtocolCreateOptions;
private subscriptions: Map<string, Callback<any>>;
private decoders: Map<
@ -56,8 +47,8 @@ class Filter implements IFilter {
>;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
super(FilterCodec, libp2p);
this.options = options ?? {};
this.multicodec = FilterCodec;
this.subscriptions = new Map();
this.decoders = new Map();
this.libp2p
@ -132,10 +123,6 @@ class Filter implements IFilter {
};
}
get peerStore(): PeerStore {
return this.libp2p.peerStore;
}
private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
@ -256,33 +243,6 @@ class Filter implements IFilter {
throw e;
}
}
private async newStream(peer: Peer): Promise<Stream> {
const connections = this.libp2p.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
return connection.newStream(this.multicodec);
}
private async getPeer(peerId?: PeerId): Promise<Peer> {
const { peer } = await selectPeerForProtocol(
this.peerStore,
[this.multicodec],
peerId
);
return peer;
}
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}
async randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(await this.peers());
}
}
export function wakuFilter(

View File

@ -1,7 +1,5 @@
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import type { PeerStore } from "@libp2p/interface-peer-store";
import type {
IEncoder,
ILightPush,
@ -11,18 +9,13 @@ import type {
SendResult,
} from "@waku/interfaces";
import { PushResponse } from "@waku/proto";
import {
getPeersForProtocol,
selectConnection,
selectPeerForProtocol,
selectRandomPeer,
} from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubSubTopic } from "../constants.js";
import { PushRPC } from "./push_rpc.js";
@ -35,12 +28,11 @@ export { PushResponse };
/**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
class LightPush implements ILightPush {
multicodec: string;
class LightPush extends BaseProtocol implements ILightPush {
options: ProtocolCreateOptions;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
this.multicodec = LightPushCodec;
super(LightPushCodec, libp2p);
this.options = options || {};
}
@ -51,23 +43,8 @@ class LightPush implements ILightPush {
): Promise<SendResult> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;
const res = await selectPeerForProtocol(
this.peerStore,
[this.multicodec],
opts?.peerId
);
if (!res) {
throw new Error("Failed to get a peer");
}
const { peer } = res;
const connections = this.libp2p.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) throw "Failed to get a connection to the peer";
const stream = await connection.newStream(this.multicodec);
const peer = await this.getPeer(opts?.peerId);
const stream = await this.newStream(peer);
const recipients: PeerId[] = [];
@ -109,28 +86,6 @@ class LightPush implements ILightPush {
}
return { recipients };
}
/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* light push protocol. Waku may or may not be currently connected to these
* peers.
*/
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}
/**
* Returns a random peer that supports light push protocol from the address
* book (`libp2p.peerStore`). Waku may or may not be currently connected to
* this peer.
*/
async randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(await this.peers());
}
get peerStore(): PeerStore {
return this.libp2p.peerStore;
}
}
export function wakuLightPush(

View File

@ -1,7 +1,6 @@
import type { Connection } from "@libp2p/interface-connection";
import type { Stream } from "@libp2p/interface-connection";
import type { Libp2p } from "@libp2p/interface-libp2p";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer, PeerStore } from "@libp2p/interface-peer-store";
import { sha256 } from "@noble/hashes/sha256";
import {
Cursor,
@ -12,19 +11,14 @@ import {
ProtocolCreateOptions,
} from "@waku/interfaces";
import { proto_store as proto } from "@waku/proto";
import {
concat,
getPeersForProtocol,
selectConnection,
selectPeerForProtocol,
utf8ToBytes,
} from "@waku/utils";
import { concat, utf8ToBytes } from "@waku/utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { Uint8ArrayList } from "uint8arraylist";
import { BaseProtocol } from "../base_protocol.js";
import { DefaultPubSubTopic } from "../constants.js";
import { toProtoMessage } from "../to_proto_message.js";
@ -84,12 +78,11 @@ export interface QueryOptions {
*
* The Waku Store protocol can be used to retrieved historical messages.
*/
class Store implements IStore {
multicodec: string;
class Store extends BaseProtocol implements IStore {
options: ProtocolCreateOptions;
constructor(public libp2p: Libp2p, options?: ProtocolCreateOptions) {
this.multicodec = StoreCodec;
super(StoreCodec, libp2p);
this.options = options ?? {};
}
@ -237,25 +230,10 @@ class Store implements IStore {
peerId: options?.peerId?.toString(),
});
const res = await selectPeerForProtocol(
this.peerStore,
[this.multicodec],
options?.peerId
);
if (!res) {
throw new Error("Failed to get a peer");
}
const { peer, protocol } = res;
const connections = this.libp2p.getConnections(peer.id);
const connection = selectConnection(connections);
if (!connection) throw "Failed to get a connection to the peer";
const peer = await this.getPeer(options?.peerId);
for await (const messages of paginate<T>(
connection,
protocol,
this.newStream.bind(this, peer),
queryOpts,
decodersAsMap,
options?.cursor
@ -263,23 +241,10 @@ class Store implements IStore {
yield messages;
}
}
/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* store protocol. Waku may or may not be currently connected to these peers.
*/
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}
get peerStore(): PeerStore {
return this.libp2p.peerStore;
}
}
async function* paginate<T extends IDecodedMessage>(
connection: Connection,
protocol: string,
streamFactory: () => Promise<Stream>,
queryOpts: Params,
decoders: Map<string, IDecoder<T>>,
cursor?: Cursor
@ -297,16 +262,16 @@ async function* paginate<T extends IDecodedMessage>(
while (true) {
queryOpts.cursor = currentCursor;
const stream = await connection.newStream(protocol);
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
log(
"Querying store peer",
connection.remoteAddr.toString(),
`for (${queryOpts.pubSubTopic})`,
queryOpts.contentTopics
);
const stream = await streamFactory();
const res = await pipe(
[historyRpcQuery.encode()],
lp.encode(),