From 5ce0717f05c71a5e8eaf2e8424c9c82c36fdb317 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 9 Jun 2021 12:25:56 +1000 Subject: [PATCH] Enable passing a custom pubsub topic Note that we currently only support one, and only one, pubsub topic for a given instance across the codebase. The PubSub topic needs to be set when instantiating the Waku* classes. At this stage, we believe that most DApp will use, and only use, the default PubSub topic. Some application want to use an alternative topic but not use the default one so this behaviour should be fine. See #174 for details. --- CHANGELOG.md | 3 ++ src/lib/waku.ts | 26 ++++++++++- src/lib/waku_light_push/index.spec.ts | 40 ++++++++++++++++ src/lib/waku_light_push/index.ts | 24 +++++++++- src/lib/waku_relay/index.spec.ts | 66 +++++++++++++++++++++++++++ src/lib/waku_relay/index.ts | 30 ++++++------ src/lib/waku_store/index.spec.ts | 42 +++++++++++++++++ src/lib/waku_store/index.ts | 32 ++++++++++--- src/test_utils/nim_waku.ts | 8 +++- 9 files changed, 247 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c18548745a..7a25e95883 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Doc: Link to new [topic guidelines](https://rfc.vac.dev/spec/23/) in README. - Doc: Link to [Waku v2 Toy Chat specs](https://rfc.vac.dev/spec/22/) in README. - Examples (web chat): Persist nick. +- Support for custom PubSub Topics to `Waku`, `WakuRelay`, `WakuStore` and `WakuLightPush`; + Passing a PubSub Topic is optional and still defaults to `/waku/2/default-waku/proto`; + JS-Waku currently supports one, and only, PubSub topic per instance. ## [0.5.0] - 2021-05-21 diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 5cdf4e855c..f46330dfde 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -14,6 +14,20 @@ import { StoreCodec, WakuStore } from './waku_store'; const websocketsTransportKey = Websockets.prototype[Symbol.toStringTag]; export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * One and only one pubsub topic is used by Waku. This is used by: + * - WakuRelay to receive, route and send messages, + * - WakuLightPush to send messages, + * - WakuStore to retrieve messages. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; /** * You can pass options to the `Libp2p` instance used by {@link Waku} using the {@link CreateOptions.libp2p} property. * This property is the same type than the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) @@ -71,6 +85,14 @@ export class Waku { options?.libp2p?.config ); + // Pass pubsub topic to relay + if (options?.pubsubTopic) { + libp2pOpts.config.pubsub = Object.assign( + { pubsubTopic: options.pubsubTopic }, + libp2pOpts.config.pubsub + ); + } + libp2pOpts.modules = Object.assign({}, options?.libp2p?.modules); // Default transport for libp2p is Websockets @@ -93,7 +115,9 @@ export class Waku { // @ts-ignore: modules property is correctly set thanks to voodoo const libp2p = await Libp2p.create(libp2pOpts); - const wakuStore = new WakuStore(libp2p); + const wakuStore = new WakuStore(libp2p, { + pubsubTopic: options?.pubsubTopic, + }); const wakuLightPush = new WakuLightPush(libp2p); await libp2p.start(); diff --git a/src/lib/waku_light_push/index.spec.ts b/src/lib/waku_light_push/index.spec.ts index 620dc815b8..ad9e1e062d 100644 --- a/src/lib/waku_light_push/index.spec.ts +++ b/src/lib/waku_light_push/index.spec.ts @@ -51,4 +51,44 @@ describe('Waku Light Push', () => { expect(msgs[0].version).to.equal(message.version); expect(msgs[0].payloadAsUtf8).to.equal(messageText); }); + + it('Push on custom pubsub topic', async function () { + this.timeout(5_000); + + const customPubSubTopic = '/waku/2/custom-dapp/proto'; + + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ lightpush: true, topics: customPubSubTopic }); + + waku = await Waku.create({ + pubsubTopic: customPubSubTopic, + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + const nimPeerId = await nimWaku.getPeerId(); + + const messageText = 'Light Push works!'; + const message = WakuMessage.fromUtf8String(messageText); + + const pushResponse = await waku.lightPush.push(nimPeerId, message); + expect(pushResponse?.isSuccess).to.be.true; + + let msgs: WakuMessage[] = []; + + while (msgs.length === 0) { + await delay(200); + msgs = await nimWaku.messages(); + } + + expect(msgs[0].contentTopic).to.equal(message.contentTopic); + expect(msgs[0].version).to.equal(message.version); + expect(msgs[0].payloadAsUtf8).to.equal(messageText); + }); }); diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index f2189ec17e..c5a6c58046 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -13,16 +13,36 @@ import { PushRPC } from './push_rpc'; export const LightPushCodec = '/vac/waku/lightpush/2.0.0-alpha1'; export { PushResponse }; +export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; +} + /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ export class WakuLightPush { - constructor(public libp2p: Libp2p) {} + pubsubTopic: string; + + constructor(public libp2p: Libp2p, options?: CreateOptions) { + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = DefaultPubsubTopic; + } + } async push( peerId: PeerId, message: WakuMessage, - pubsubTopic: string = DefaultPubsubTopic + pubsubTopic: string = this.pubsubTopic ): Promise { const peer = this.libp2p.peerStore.get(peerId); if (!peer) throw 'Peer is unknown'; diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index bc826744b6..6c1c26e88c 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -142,6 +142,72 @@ describe('Waku Relay', () => { }); }); + describe('Custom pubsub topic', () => { + it('Publish', async function () { + this.timeout(10000); + + const pubsubTopic = '/some/pubsub/topic'; + + // 1 and 2 uses a custom pubsub + const [waku1, waku2, waku3] = await Promise.all([ + Waku.create({ + pubsubTopic, + staticNoiseKey: NOISE_KEY_1, + }), + Waku.create({ + pubsubTopic, + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } }, + }), + Waku.create({ + staticNoiseKey: NOISE_KEY_2, + }), + ]); + + waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); + waku3.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); + + await Promise.all([ + new Promise((resolve) => + waku1.libp2p.pubsub.once('pubsub:subscription-change', () => + resolve(null) + ) + ), + new Promise((resolve) => + waku2.libp2p.pubsub.once('pubsub:subscription-change', () => + resolve(null) + ) + ), + // No subscription change expected for Waku 3 + ]); + + const messageText = 'Communicating using a custom pubsub topic'; + const message = WakuMessage.fromUtf8String(messageText); + + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + waku2.relay.addObserver(resolve); + } + ); + + // The promise **fails** if we receive a message on the default + // pubsub topic. + const waku3NoMsgPromise: Promise = new Promise( + (resolve, reject) => { + waku3.relay.addObserver(reject); + setTimeout(resolve, 1000); + } + ); + + await waku1.relay.send(message); + + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; + await waku3NoMsgPromise; + + expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(messageText); + }); + }); + describe('Interop: Nim', function () { describe('Nim connects to js', function () { let waku: Waku; diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index c1dabf8d71..c846b7f85e 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -16,6 +16,7 @@ import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import PeerId from 'peer-id'; +import { CreateOptions } from '../waku'; import { WakuMessage } from '../waku_message'; import * as constants from './constants'; @@ -26,9 +27,9 @@ import { RelayHeartbeat } from './relay_heartbeat'; export { RelayCodec, DefaultPubsubTopic }; /** - * See {GossipOptions} from libp2p-gossipsub + * See constructor libp2p-gossipsub [API](https://github.com/ChainSafe/js-libp2p-gossipsub#api). */ -interface GossipOptions { +export interface GossipOptions { emitSelf: boolean; gossipIncoming: boolean; fallbackToFloodsub: boolean; @@ -49,8 +50,6 @@ interface GossipOptions { Dlazy: number; } -export type WakuRelayOptions = GossipOptions; - /** * Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}. * Must be passed as a `pubsub` module to a {Libp2p} instance. @@ -60,6 +59,7 @@ export type WakuRelayOptions = GossipOptions; */ export class WakuRelay extends Gossipsub implements Pubsub { heartbeat: RelayHeartbeat; + pubsubTopic: string; /** * observers called when receiving new message. * Observers under key "" are always called. @@ -68,12 +68,10 @@ export class WakuRelay extends Gossipsub implements Pubsub { [contentTopic: string]: Array<(message: WakuMessage) => void>; }; - /** - * - * @param {Libp2p} libp2p - * @param {Partial} [options] - */ - constructor(libp2p: Libp2p, options?: Partial) { + constructor( + libp2p: Libp2p, + options?: Partial + ) { super( libp2p, Object.assign(options, { @@ -88,6 +86,12 @@ export class WakuRelay extends Gossipsub implements Pubsub { const multicodecs = [constants.RelayCodec]; Object.assign(this, { multicodecs }); + + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = constants.DefaultPubsubTopic; + } } /** @@ -99,7 +103,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { */ public start(): void { super.start(); - this.subscribe(constants.DefaultPubsubTopic); + this.subscribe(this.pubsubTopic); } /** @@ -110,7 +114,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { */ public async send(message: WakuMessage): Promise { const msg = message.encode(); - await super.publish(constants.DefaultPubsubTopic, Buffer.from(msg)); + await super.publish(this.pubsubTopic, Buffer.from(msg)); } /** @@ -144,7 +148,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { * Return the relay peers we are connected to and we would publish a message to */ getPeers(): Set { - return getRelayPeers(this, DefaultPubsubTopic, this._options.D, (id) => { + return getRelayPeers(this, this.pubsubTopic, this._options.D, (id) => { // Filter peers we would not publish to return ( this.score.score(id) >= this._options.scoreThresholds.publishThreshold diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index f27a51d9ae..3caf210352 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -93,4 +93,46 @@ describe('Waku Store', () => { ).to.eq(index); } }); + + it('Retrieves history using custom pubsub topic', async function () { + this.timeout(5_000); + + const customPubSubTopic = '/waku/2/custom-dapp/proto'; + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ persistMessages: true, topics: customPubSubTopic }); + + for (let i = 0; i < 2; i++) { + expect( + await nimWaku.sendMessage( + WakuMessage.fromUtf8String(`Message ${i}`), + customPubSubTopic + ) + ).to.be.true; + } + + waku = await Waku.create({ + pubsubTopic: customPubSubTopic, + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + const nimPeerId = await nimWaku.getPeerId(); + + const messages = await waku.store.queryHistory({ + peerId: nimPeerId, + contentTopics: [], + }); + + expect(messages?.length).eq(2); + const result = messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === 'Message 0'; + }); + expect(result).to.not.eq(-1); + }); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 3a360c4357..bb14790f2d 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -13,6 +13,18 @@ export const StoreCodec = '/vac/waku/store/2.0.0-beta3'; export { Direction }; +export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; +} + export interface QueryOptions { peerId: PeerId; contentTopics: string[]; @@ -26,24 +38,32 @@ export interface QueryOptions { * Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/). */ export class WakuStore { - constructor(public libp2p: Libp2p) {} + pubsubTopic: string; + + constructor(public libp2p: Libp2p, options?: CreateOptions) { + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = DefaultPubsubTopic; + } + } /** * Query given peer using Waku Store. * * @param options - * @param options.peerId The peer to query. - * @param options.contentTopics The content topics to retrieve, leave empty to + * @param options.peerId The peer to query.Options + * @param options.contentTopics The content topics to pass to the query, leave empty to * retrieve all messages. - * @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes - * use the same pubsub topic. This is reserved for future applications. + * @param options.pubsubTopic The pubsub topic to pass to the query. Defaults + * to the value set at creation. See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/). * @param options.callback Callback called on page of stored messages as they are retrieved * @throws If not able to reach the peer to query. */ async queryHistory(options: QueryOptions): Promise { const opts = Object.assign( { - pubsubTopic: DefaultPubsubTopic, + pubsubTopic: this.pubsubTopic, direction: Direction.BACKWARD, pageSize: 10, }, diff --git a/src/test_utils/nim_waku.ts b/src/test_utils/nim_waku.ts index 0f43c5066a..8b5ac45179 100644 --- a/src/test_utils/nim_waku.ts +++ b/src/test_utils/nim_waku.ts @@ -40,6 +40,7 @@ export interface Args { logLevel?: LogLevel; persistMessages?: boolean; lightpush?: boolean; + topics?: string; } export enum LogLevel { @@ -155,7 +156,10 @@ export class NimWaku { return this.rpcCall('get_waku_v2_debug_v1_info', []); } - async sendMessage(message: WakuMessage): Promise { + async sendMessage( + message: WakuMessage, + pubsubTopic?: string + ): Promise { this.checkProcess(); if (!message.payload) { @@ -168,7 +172,7 @@ export class NimWaku { }; return this.rpcCall('post_waku_v2_relay_v1_message', [ - DefaultPubsubTopic, + pubsubTopic ? pubsubTopic : DefaultPubsubTopic, rpcMessage, ]); }