chore: move protocols CreateOptions into interfaces (#1145)

* move protocols `CreateOptions` into interfaces and
add possible TODO

* remove: createOptions for PeerExchange

* update test with new API
This commit is contained in:
Danish Arora 2023-02-02 08:02:06 +05:30 committed by GitHub
parent 11819fc7b1
commit 10b3898762
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 73 additions and 95 deletions

View File

@ -11,6 +11,7 @@ import type {
IDecoder, IDecoder,
IFilter, IFilter,
IMessage, IMessage,
ProtocolCreateOptions,
ProtocolOptions, ProtocolOptions,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { import {
@ -43,18 +44,6 @@ export interface FilterComponents {
connectionManager: ConnectionManager; connectionManager: ConnectionManager;
} }
export interface CreateOptions {
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}.
*
* The usage of the default pubsub topic is recommended.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
* @default {@link DefaultPubSubTopic}
*/
pubSubTopic?: string;
}
export type UnsubscribeFunction = () => Promise<void>; export type UnsubscribeFunction = () => Promise<void>;
/** /**
@ -66,18 +55,21 @@ export type UnsubscribeFunction = () => Promise<void>;
*/ */
class Filter implements IFilter { class Filter implements IFilter {
multicodec: string; multicodec: string;
pubSubTopic: string; options: ProtocolCreateOptions;
private subscriptions: Map<string, Callback<any>>; private subscriptions: Map<string, Callback<any>>;
private decoders: Map< private decoders: Map<
string, // content topic string, // content topic
Set<IDecoder<any>> Set<IDecoder<any>>
>; >;
constructor(public components: FilterComponents, options?: CreateOptions) { constructor(
public components: FilterComponents,
options?: ProtocolCreateOptions
) {
this.options = options ?? {};
this.multicodec = FilterCodec; this.multicodec = FilterCodec;
this.subscriptions = new Map(); this.subscriptions = new Map();
this.decoders = new Map(); this.decoders = new Map();
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
this.components.registrar this.components.registrar
.handle(FilterCodec, this.onRequest.bind(this)) .handle(FilterCodec, this.onRequest.bind(this))
.catch((e) => log("Failed to register filter protocol", e)); .catch((e) => log("Failed to register filter protocol", e));
@ -94,7 +86,7 @@ class Filter implements IFilter {
callback: Callback<T>, callback: Callback<T>,
opts?: ProtocolOptions opts?: ProtocolOptions
): Promise<UnsubscribeFunction> { ): Promise<UnsubscribeFunction> {
const topic = opts?.pubSubTopic ?? this.pubSubTopic; const { pubSubTopic = DefaultPubSubTopic } = this.options;
const groupedDecoders = groupByContentTopic(decoders); const groupedDecoders = groupByContentTopic(decoders);
const contentTopics = Array.from(groupedDecoders.keys()); const contentTopics = Array.from(groupedDecoders.keys());
@ -103,7 +95,7 @@ class Filter implements IFilter {
contentTopic, contentTopic,
})); }));
const request = FilterRPC.createRequest( const request = FilterRPC.createRequest(
topic, pubSubTopic,
contentFilters, contentFilters,
undefined, undefined,
true true
@ -144,7 +136,7 @@ class Filter implements IFilter {
this.addCallback(requestId, callback); this.addCallback(requestId, callback);
return async () => { return async () => {
await this.unsubscribe(topic, contentFilters, requestId, peer); await this.unsubscribe(pubSubTopic, contentFilters, requestId, peer);
this.deleteDecoders(groupedDecoders); this.deleteDecoders(groupedDecoders);
this.deleteCallback(requestId); this.deleteCallback(requestId);
}; };
@ -309,7 +301,7 @@ class Filter implements IFilter {
} }
export function wakuFilter( export function wakuFilter(
init: Partial<CreateOptions> = {} init: Partial<ProtocolCreateOptions> = {}
): (components: FilterComponents) => IFilter { ): (components: FilterComponents) => IFilter {
return (components: FilterComponents) => new Filter(components, init); return (components: FilterComponents) => new Filter(components, init);
} }

View File

@ -6,6 +6,7 @@ import type {
IEncoder, IEncoder,
ILightPush, ILightPush,
IMessage, IMessage,
ProtocolCreateOptions,
ProtocolOptions, ProtocolOptions,
SendResult, SendResult,
} from "@waku/interfaces"; } from "@waku/interfaces";
@ -36,28 +37,19 @@ export interface LightPushComponents {
connectionManager: ConnectionManager; connectionManager: ConnectionManager;
} }
export interface CreateOptions {
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}.
*
* The usage of the default pubsub topic is recommended.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
* @default {@link DefaultPubSubTopic}
*/
pubSubTopic?: string;
}
/** /**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/ */
class LightPush implements ILightPush { class LightPush implements ILightPush {
multicodec: string; multicodec: string;
pubSubTopic: string; options: ProtocolCreateOptions;
constructor(public components: LightPushComponents, options?: CreateOptions) { constructor(
public components: LightPushComponents,
options?: ProtocolCreateOptions
) {
this.multicodec = LightPushCodec; this.multicodec = LightPushCodec;
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; this.options = options || {};
} }
async push( async push(
@ -65,11 +57,11 @@ class LightPush implements ILightPush {
message: IMessage, message: IMessage,
opts?: ProtocolOptions opts?: ProtocolOptions
): Promise<SendResult> { ): Promise<SendResult> {
const pubSubTopic = opts?.pubSubTopic ? opts.pubSubTopic : this.pubSubTopic; const { pubSubTopic = DefaultPubSubTopic } = this.options;
const res = await selectPeerForProtocol( const res = await selectPeerForProtocol(
this.components.peerStore, this.components.peerStore,
[LightPushCodec], [this.multicodec],
opts?.peerId opts?.peerId
); );
@ -152,7 +144,7 @@ class LightPush implements ILightPush {
} }
export function wakuLightPush( export function wakuLightPush(
init: Partial<CreateOptions> = {} init: Partial<ProtocolCreateOptions> = {}
): (components: LightPushComponents) => ILightPush { ): (components: LightPushComponents) => ILightPush {
return (components: LightPushComponents) => new LightPush(components, init); return (components: LightPushComponents) => new LightPush(components, init);
} }

View File

@ -12,6 +12,7 @@ import type {
IEncoder, IEncoder,
IMessage, IMessage,
IRelay, IRelay,
ProtocolCreateOptions,
SendResult, SendResult,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { IDecodedMessage } from "@waku/interfaces"; import { IDecodedMessage } from "@waku/interfaces";
@ -30,22 +31,7 @@ export type Observer<T extends IDecodedMessage> = {
callback: Callback<T>; callback: Callback<T>;
}; };
export interface RelayCreateOptions extends GossipsubOpts { export type RelayCreateOptions = ProtocolCreateOptions & GossipsubOpts;
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}.
*
* One and only one pubsub topic is used by Waku. This is used by:
* - WakuRelay to receive, route and send messages,
* - WakuLightPush to send messages,
* - WakuStore to retrieve messages.
*
* The usage of the default pubsub topic is recommended.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
* @default {@link DefaultPubSubTopic}
*/
pubSubTopic?: string;
}
/** /**
* Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/). * Implements the [Waku v2 Relay protocol](https://rfc.vac.dev/spec/11/).
@ -54,7 +40,7 @@ export interface RelayCreateOptions extends GossipsubOpts {
* @implements {require('libp2p-interfaces/src/pubsub')} * @implements {require('libp2p-interfaces/src/pubsub')}
*/ */
class Relay extends GossipSub implements IRelay { class Relay extends GossipSub implements IRelay {
pubSubTopic: string; options: Partial<RelayCreateOptions>;
defaultDecoder: IDecoder<IDecodedMessage>; defaultDecoder: IDecoder<IDecodedMessage>;
public static multicodec: string = constants.RelayCodecs[0]; public static multicodec: string = constants.RelayCodecs[0];
@ -73,12 +59,13 @@ class Relay extends GossipSub implements IRelay {
globalSignaturePolicy: SignaturePolicy.StrictNoSign, globalSignaturePolicy: SignaturePolicy.StrictNoSign,
fallbackToFloodsub: false, fallbackToFloodsub: false,
}); });
super(components, options); super(components, options);
this.multicodecs = constants.RelayCodecs; this.multicodecs = constants.RelayCodecs;
this.observers = new Map(); this.observers = new Map();
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; this.options = options ?? {};
// TODO: User might want to decide what decoder should be used (e.g. for RLN) // TODO: User might want to decide what decoder should be used (e.g. for RLN)
this.defaultDecoder = new TopicOnlyDecoder(); this.defaultDecoder = new TopicOnlyDecoder();
@ -92,20 +79,24 @@ class Relay extends GossipSub implements IRelay {
* @returns {void} * @returns {void}
*/ */
public async start(): Promise<void> { public async start(): Promise<void> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;
await super.start(); await super.start();
this.subscribe(this.pubSubTopic); this.subscribe(pubSubTopic);
} }
/** /**
* Send Waku message. * Send Waku message.
*/ */
public async send(encoder: IEncoder, message: IMessage): Promise<SendResult> { public async send(encoder: IEncoder, message: IMessage): Promise<SendResult> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;
const msg = await encoder.toWire(message); const msg = await encoder.toWire(message);
if (!msg) { if (!msg) {
log("Failed to encode message, aborting publish"); log("Failed to encode message, aborting publish");
return { recipients: [] }; return { recipients: [] };
} }
return this.publish(this.pubSubTopic, msg);
return this.publish(pubSubTopic, msg);
} }
/** /**
@ -181,7 +172,8 @@ class Relay extends GossipSub implements IRelay {
} }
getMeshPeers(topic?: TopicStr): PeerIdStr[] { getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return super.getMeshPeers(topic ?? this.pubSubTopic); const { pubSubTopic = DefaultPubSubTopic } = this.options;
return super.getMeshPeers(topic ?? pubSubTopic);
} }
} }

View File

@ -10,6 +10,7 @@ import {
IDecoder, IDecoder,
Index, Index,
IStore, IStore,
ProtocolCreateOptions,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { import {
getPeersForProtocol, getPeersForProtocol,
@ -43,18 +44,6 @@ export interface StoreComponents {
connectionManager: ConnectionManager; connectionManager: ConnectionManager;
} }
export interface CreateOptions {
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}.
*
* The usage of the default pubsub topic is recommended.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
* @default {@link DefaultPubSubTopic}
*/
pubSubTopic?: string;
}
export interface TimeFilter { export interface TimeFilter {
startTime: Date; startTime: Date;
endTime: Date; endTime: Date;
@ -65,11 +54,6 @@ export interface QueryOptions {
* The peer to query. If undefined, a pseudo-random peer is selected from the connected Waku Store peers. * The peer to query. If undefined, a pseudo-random peer is selected from the connected Waku Store peers.
*/ */
peerId?: PeerId; peerId?: PeerId;
/**
* The pubsub topic to pass to the query.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/).
*/
pubSubTopic?: string;
/** /**
* The direction in which pages are retrieved: * The direction in which pages are retrieved:
* - { @link PageDirection.BACKWARD }: Most recent page first. * - { @link PageDirection.BACKWARD }: Most recent page first.
@ -106,11 +90,14 @@ export interface QueryOptions {
*/ */
class Store implements IStore { class Store implements IStore {
multicodec: string; multicodec: string;
pubSubTopic: string; options: ProtocolCreateOptions;
constructor(public components: StoreComponents, options?: CreateOptions) { constructor(
public components: StoreComponents,
options?: ProtocolCreateOptions
) {
this.multicodec = StoreCodec; this.multicodec = StoreCodec;
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic; this.options = options ?? {};
} }
/** /**
@ -221,6 +208,8 @@ class Store implements IStore {
decoders: IDecoder<T>[], decoders: IDecoder<T>[],
options?: QueryOptions options?: QueryOptions
): AsyncGenerator<Promise<T | undefined>[]> { ): AsyncGenerator<Promise<T | undefined>[]> {
const { pubSubTopic = DefaultPubSubTopic } = this.options;
let startTime, endTime; let startTime, endTime;
if (options?.timeFilter) { if (options?.timeFilter) {
@ -242,7 +231,7 @@ class Store implements IStore {
const queryOpts = Object.assign( const queryOpts = Object.assign(
{ {
pubSubTopic: this.pubSubTopic, pubSubTopic: pubSubTopic,
pageDirection: PageDirection.BACKWARD, pageDirection: PageDirection.BACKWARD,
pageSize: DefaultPageSize, pageSize: DefaultPageSize,
}, },
@ -433,7 +422,7 @@ export async function createCursor(
} }
export function wakuStore( export function wakuStore(
init: Partial<CreateOptions> = {} init: Partial<ProtocolCreateOptions> = {}
): (components: StoreComponents) => IStore { ): (components: StoreComponents) => IStore {
return (components: StoreComponents) => new Store(components, init); return (components: StoreComponents) => new Store(components, init);
} }

View File

@ -88,7 +88,8 @@ export async function createLightNode(
const store = wakuStore(options); const store = wakuStore(options);
const lightPush = wakuLightPush(options); const lightPush = wakuLightPush(options);
const filter = wakuFilter(options); const filter = wakuFilter(options);
const peerExchange = wakuPeerExchange(options);
const peerExchange = wakuPeerExchange();
return new WakuNode( return new WakuNode(
options ?? {}, options ?? {},
@ -155,7 +156,8 @@ export async function createFullNode(
const store = wakuStore(options); const store = wakuStore(options);
const lightPush = wakuLightPush(options); const lightPush = wakuLightPush(options);
const filter = wakuFilter(options); const filter = wakuFilter(options);
const peerExchange = wakuPeerExchange(options);
const peerExchange = wakuPeerExchange();
return new WakuNode( return new WakuNode(
options ?? {}, options ?? {},

View File

@ -17,8 +17,26 @@ export interface PointToPointProtocol {
peers: () => Promise<Peer[]>; peers: () => Promise<Peer[]>;
} }
export type ProtocolOptions = { export type ProtocolCreateOptions = {
/**
* The PubSub Topic to use. Defaults to {@link @waku/core/DefaultPubSubTopic }.
*
* One and only one pubsub topic is used by Waku. This is used by:
* - WakuRelay to receive, route and send messages,
* - WakuLightPush to send messages,
* - WakuStore to retrieve messages.
*
* The usage of the default pubsub topic is recommended.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
*/
pubSubTopic?: string; pubSubTopic?: string;
};
//TODO
// we can probably move `peerId` into `ProtocolCreateOptions` and remove `ProtocolOptions` and pass it in the constructor
// however, filter protocol can use multiple peers, so we need to think about this
export type ProtocolOptions = {
/** /**
* Optionally specify an PeerId for the protocol request. If not included, will use a random peer. * Optionally specify an PeerId for the protocol request. If not included, will use a random peer.
*/ */

View File

@ -8,7 +8,6 @@ import type {
PeerExchangeComponents, PeerExchangeComponents,
PeerExchangeQueryParams, PeerExchangeQueryParams,
PeerExchangeResponse, PeerExchangeResponse,
ProtocolOptions,
} from "@waku/interfaces"; } from "@waku/interfaces";
import { import {
getPeersForProtocol, getPeersForProtocol,
@ -37,12 +36,8 @@ export class WakuPeerExchange implements IPeerExchange {
/** /**
* @param components - libp2p components * @param components - libp2p components
* @param createOptions - Options for the protocol
*/ */
constructor( constructor(public components: PeerExchangeComponents) {
public components: PeerExchangeComponents,
public createOptions?: ProtocolOptions
) {
this.multicodec = PeerExchangeCodec; this.multicodec = PeerExchangeCodec;
this.components.registrar this.components.registrar
.handle(PeerExchangeCodec, this.handler.bind(this)) .handle(PeerExchangeCodec, this.handler.bind(this))
@ -159,12 +154,11 @@ export class WakuPeerExchange implements IPeerExchange {
/** /**
* *
* @param init - Options for the protocol
* @returns A function that creates a new peer exchange protocol * @returns A function that creates a new peer exchange protocol
*/ */
export function wakuPeerExchange( export function wakuPeerExchange(): (
init: Partial<ProtocolOptions> = {} components: PeerExchangeComponents
): (components: PeerExchangeComponents) => WakuPeerExchange { ) => WakuPeerExchange {
return (components: PeerExchangeComponents) => return (components: PeerExchangeComponents) =>
new WakuPeerExchange(components, init); new WakuPeerExchange(components);
} }

View File

@ -89,7 +89,6 @@ describe("Waku Light Push [node only]", () => {
{ payload: utf8ToBytes(messageText) }, { payload: utf8ToBytes(messageText) },
{ {
peerId: nimPeerId, peerId: nimPeerId,
pubSubTopic: customPubSubTopic,
} }
); );
log("Ack received", pushResponse); log("Ack received", pushResponse);