diff --git a/CHANGELOG.md b/CHANGELOG.md index c18548745a..7a25e95883 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Doc: Link to new [topic guidelines](https://rfc.vac.dev/spec/23/) in README. - Doc: Link to [Waku v2 Toy Chat specs](https://rfc.vac.dev/spec/22/) in README. - Examples (web chat): Persist nick. +- Support for custom PubSub Topics to `Waku`, `WakuRelay`, `WakuStore` and `WakuLightPush`; + Passing a PubSub Topic is optional and still defaults to `/waku/2/default-waku/proto`; + JS-Waku currently supports one, and only, PubSub topic per instance. ## [0.5.0] - 2021-05-21 diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 5cdf4e855c..f46330dfde 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -14,6 +14,20 @@ import { StoreCodec, WakuStore } from './waku_store'; const websocketsTransportKey = Websockets.prototype[Symbol.toStringTag]; export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * One and only one pubsub topic is used by Waku. This is used by: + * - WakuRelay to receive, route and send messages, + * - WakuLightPush to send messages, + * - WakuStore to retrieve messages. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; /** * You can pass options to the `Libp2p` instance used by {@link Waku} using the {@link CreateOptions.libp2p} property. * This property is the same type than the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) @@ -71,6 +85,14 @@ export class Waku { options?.libp2p?.config ); + // Pass pubsub topic to relay + if (options?.pubsubTopic) { + libp2pOpts.config.pubsub = Object.assign( + { pubsubTopic: options.pubsubTopic }, + libp2pOpts.config.pubsub + ); + } + libp2pOpts.modules = Object.assign({}, options?.libp2p?.modules); // Default transport for libp2p is Websockets @@ -93,7 +115,9 @@ export class Waku { // @ts-ignore: modules property is correctly set thanks to voodoo const libp2p = await Libp2p.create(libp2pOpts); - const wakuStore = new WakuStore(libp2p); + const wakuStore = new WakuStore(libp2p, { + pubsubTopic: options?.pubsubTopic, + }); const wakuLightPush = new WakuLightPush(libp2p); await libp2p.start(); diff --git a/src/lib/waku_light_push/index.spec.ts b/src/lib/waku_light_push/index.spec.ts index 620dc815b8..ad9e1e062d 100644 --- a/src/lib/waku_light_push/index.spec.ts +++ b/src/lib/waku_light_push/index.spec.ts @@ -51,4 +51,44 @@ describe('Waku Light Push', () => { expect(msgs[0].version).to.equal(message.version); expect(msgs[0].payloadAsUtf8).to.equal(messageText); }); + + it('Push on custom pubsub topic', async function () { + this.timeout(5_000); + + const customPubSubTopic = '/waku/2/custom-dapp/proto'; + + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ lightpush: true, topics: customPubSubTopic }); + + waku = await Waku.create({ + pubsubTopic: customPubSubTopic, + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + const nimPeerId = await nimWaku.getPeerId(); + + const messageText = 'Light Push works!'; + const message = WakuMessage.fromUtf8String(messageText); + + const pushResponse = await waku.lightPush.push(nimPeerId, message); + expect(pushResponse?.isSuccess).to.be.true; + + let msgs: WakuMessage[] = []; + + while (msgs.length === 0) { + await delay(200); + msgs = await nimWaku.messages(); + } + + expect(msgs[0].contentTopic).to.equal(message.contentTopic); + expect(msgs[0].version).to.equal(message.version); + expect(msgs[0].payloadAsUtf8).to.equal(messageText); + }); }); diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index f2189ec17e..c5a6c58046 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -13,16 +13,36 @@ import { PushRPC } from './push_rpc'; export const LightPushCodec = '/vac/waku/lightpush/2.0.0-alpha1'; export { PushResponse }; +export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; +} + /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ export class WakuLightPush { - constructor(public libp2p: Libp2p) {} + pubsubTopic: string; + + constructor(public libp2p: Libp2p, options?: CreateOptions) { + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = DefaultPubsubTopic; + } + } async push( peerId: PeerId, message: WakuMessage, - pubsubTopic: string = DefaultPubsubTopic + pubsubTopic: string = this.pubsubTopic ): Promise { const peer = this.libp2p.peerStore.get(peerId); if (!peer) throw 'Peer is unknown'; diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index bc826744b6..6c1c26e88c 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -142,6 +142,72 @@ describe('Waku Relay', () => { }); }); + describe('Custom pubsub topic', () => { + it('Publish', async function () { + this.timeout(10000); + + const pubsubTopic = '/some/pubsub/topic'; + + // 1 and 2 uses a custom pubsub + const [waku1, waku2, waku3] = await Promise.all([ + Waku.create({ + pubsubTopic, + staticNoiseKey: NOISE_KEY_1, + }), + Waku.create({ + pubsubTopic, + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } }, + }), + Waku.create({ + staticNoiseKey: NOISE_KEY_2, + }), + ]); + + waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); + waku3.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); + + await Promise.all([ + new Promise((resolve) => + waku1.libp2p.pubsub.once('pubsub:subscription-change', () => + resolve(null) + ) + ), + new Promise((resolve) => + waku2.libp2p.pubsub.once('pubsub:subscription-change', () => + resolve(null) + ) + ), + // No subscription change expected for Waku 3 + ]); + + const messageText = 'Communicating using a custom pubsub topic'; + const message = WakuMessage.fromUtf8String(messageText); + + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + waku2.relay.addObserver(resolve); + } + ); + + // The promise **fails** if we receive a message on the default + // pubsub topic. + const waku3NoMsgPromise: Promise = new Promise( + (resolve, reject) => { + waku3.relay.addObserver(reject); + setTimeout(resolve, 1000); + } + ); + + await waku1.relay.send(message); + + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; + await waku3NoMsgPromise; + + expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(messageText); + }); + }); + describe('Interop: Nim', function () { describe('Nim connects to js', function () { let waku: Waku; diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index c1dabf8d71..c846b7f85e 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -16,6 +16,7 @@ import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import PeerId from 'peer-id'; +import { CreateOptions } from '../waku'; import { WakuMessage } from '../waku_message'; import * as constants from './constants'; @@ -26,9 +27,9 @@ import { RelayHeartbeat } from './relay_heartbeat'; export { RelayCodec, DefaultPubsubTopic }; /** - * See {GossipOptions} from libp2p-gossipsub + * See constructor libp2p-gossipsub [API](https://github.com/ChainSafe/js-libp2p-gossipsub#api). */ -interface GossipOptions { +export interface GossipOptions { emitSelf: boolean; gossipIncoming: boolean; fallbackToFloodsub: boolean; @@ -49,8 +50,6 @@ interface GossipOptions { Dlazy: number; } -export type WakuRelayOptions = GossipOptions; - /** * Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}. * Must be passed as a `pubsub` module to a {Libp2p} instance. @@ -60,6 +59,7 @@ export type WakuRelayOptions = GossipOptions; */ export class WakuRelay extends Gossipsub implements Pubsub { heartbeat: RelayHeartbeat; + pubsubTopic: string; /** * observers called when receiving new message. * Observers under key "" are always called. @@ -68,12 +68,10 @@ export class WakuRelay extends Gossipsub implements Pubsub { [contentTopic: string]: Array<(message: WakuMessage) => void>; }; - /** - * - * @param {Libp2p} libp2p - * @param {Partial} [options] - */ - constructor(libp2p: Libp2p, options?: Partial) { + constructor( + libp2p: Libp2p, + options?: Partial + ) { super( libp2p, Object.assign(options, { @@ -88,6 +86,12 @@ export class WakuRelay extends Gossipsub implements Pubsub { const multicodecs = [constants.RelayCodec]; Object.assign(this, { multicodecs }); + + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = constants.DefaultPubsubTopic; + } } /** @@ -99,7 +103,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { */ public start(): void { super.start(); - this.subscribe(constants.DefaultPubsubTopic); + this.subscribe(this.pubsubTopic); } /** @@ -110,7 +114,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { */ public async send(message: WakuMessage): Promise { const msg = message.encode(); - await super.publish(constants.DefaultPubsubTopic, Buffer.from(msg)); + await super.publish(this.pubsubTopic, Buffer.from(msg)); } /** @@ -144,7 +148,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { * Return the relay peers we are connected to and we would publish a message to */ getPeers(): Set { - return getRelayPeers(this, DefaultPubsubTopic, this._options.D, (id) => { + return getRelayPeers(this, this.pubsubTopic, this._options.D, (id) => { // Filter peers we would not publish to return ( this.score.score(id) >= this._options.scoreThresholds.publishThreshold diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index f27a51d9ae..3caf210352 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -93,4 +93,46 @@ describe('Waku Store', () => { ).to.eq(index); } }); + + it('Retrieves history using custom pubsub topic', async function () { + this.timeout(5_000); + + const customPubSubTopic = '/waku/2/custom-dapp/proto'; + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ persistMessages: true, topics: customPubSubTopic }); + + for (let i = 0; i < 2; i++) { + expect( + await nimWaku.sendMessage( + WakuMessage.fromUtf8String(`Message ${i}`), + customPubSubTopic + ) + ).to.be.true; + } + + waku = await Waku.create({ + pubsubTopic: customPubSubTopic, + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + const nimPeerId = await nimWaku.getPeerId(); + + const messages = await waku.store.queryHistory({ + peerId: nimPeerId, + contentTopics: [], + }); + + expect(messages?.length).eq(2); + const result = messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === 'Message 0'; + }); + expect(result).to.not.eq(-1); + }); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 3a360c4357..bb14790f2d 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -13,6 +13,18 @@ export const StoreCodec = '/vac/waku/store/2.0.0-beta3'; export { Direction }; +export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; +} + export interface QueryOptions { peerId: PeerId; contentTopics: string[]; @@ -26,24 +38,32 @@ export interface QueryOptions { * Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/). */ export class WakuStore { - constructor(public libp2p: Libp2p) {} + pubsubTopic: string; + + constructor(public libp2p: Libp2p, options?: CreateOptions) { + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = DefaultPubsubTopic; + } + } /** * Query given peer using Waku Store. * * @param options - * @param options.peerId The peer to query. - * @param options.contentTopics The content topics to retrieve, leave empty to + * @param options.peerId The peer to query.Options + * @param options.contentTopics The content topics to pass to the query, leave empty to * retrieve all messages. - * @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes - * use the same pubsub topic. This is reserved for future applications. + * @param options.pubsubTopic The pubsub topic to pass to the query. Defaults + * to the value set at creation. See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/). * @param options.callback Callback called on page of stored messages as they are retrieved * @throws If not able to reach the peer to query. */ async queryHistory(options: QueryOptions): Promise { const opts = Object.assign( { - pubsubTopic: DefaultPubsubTopic, + pubsubTopic: this.pubsubTopic, direction: Direction.BACKWARD, pageSize: 10, }, diff --git a/src/test_utils/nim_waku.ts b/src/test_utils/nim_waku.ts index 0f43c5066a..8b5ac45179 100644 --- a/src/test_utils/nim_waku.ts +++ b/src/test_utils/nim_waku.ts @@ -40,6 +40,7 @@ export interface Args { logLevel?: LogLevel; persistMessages?: boolean; lightpush?: boolean; + topics?: string; } export enum LogLevel { @@ -155,7 +156,10 @@ export class NimWaku { return this.rpcCall('get_waku_v2_debug_v1_info', []); } - async sendMessage(message: WakuMessage): Promise { + async sendMessage( + message: WakuMessage, + pubsubTopic?: string + ): Promise { this.checkProcess(); if (!message.payload) { @@ -168,7 +172,7 @@ export class NimWaku { }; return this.rpcCall('post_waku_v2_relay_v1_message', [ - DefaultPubsubTopic, + pubsubTopic ? pubsubTopic : DefaultPubsubTopic, rpcMessage, ]); }