From f17a0082787ea157b2204c576dc8e495828711f8 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 8 Jun 2021 22:01:48 +1000 Subject: [PATCH 1/6] Separate the libp2p create options from Waku's --- CHANGELOG.md | 4 ++ examples/cli-chat/src/chat.ts | 6 +- examples/web-chat/src/App.tsx | 10 +-- src/lib/waku.spec.ts | 8 ++- src/lib/waku.ts | 89 ++++++++++++++------------- src/lib/waku_light_push/index.spec.ts | 2 +- src/lib/waku_relay/index.spec.ts | 14 +++-- src/lib/waku_store/index.spec.ts | 4 +- 8 files changed, 76 insertions(+), 61 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e7dcc8bcc..c18548745a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed +- **Breaking**: Options passed to `Waku.create` used to be passed to `Libp2p.create`; + Now, only the `libp2p` property is passed to `Libp2p.create`, allowing for a cleaner interface. + ### Added - Enable access to `WakuMessage.timestamp`. - Examples (web chat): Use `WakuMessage.timestamp` as unique key for list items. diff --git a/examples/cli-chat/src/chat.ts b/examples/cli-chat/src/chat.ts index 32d286740e..0f29881e73 100644 --- a/examples/cli-chat/src/chat.ts +++ b/examples/cli-chat/src/chat.ts @@ -27,8 +27,10 @@ export default async function startChat(): Promise { } const waku = await Waku.create({ - listenAddresses: [opts.listenAddr], - modules: { transport: [TCP] }, + libp2p: { + addresses: { listen: [opts.listenAddr] }, + modules: { transport: [TCP] }, + }, }); console.log('PeerId: ', waku.libp2p.peerId.toB58String()); console.log('Listening on '); diff --git a/examples/web-chat/src/App.tsx b/examples/web-chat/src/App.tsx index db8a8df8db..24115bfeb9 100644 --- a/examples/web-chat/src/App.tsx +++ b/examples/web-chat/src/App.tsx @@ -181,10 +181,12 @@ export default function App() { async function initWaku(setter: (waku: Waku) => void) { try { const waku = await Waku.create({ - config: { - pubsub: { - enabled: true, - emitSelf: true, + libp2p: { + config: { + pubsub: { + enabled: true, + emitSelf: true, + }, }, }, }); diff --git a/src/lib/waku.spec.ts b/src/lib/waku.spec.ts index c0fe3b982a..635ace2655 100644 --- a/src/lib/waku.spec.ts +++ b/src/lib/waku.spec.ts @@ -17,7 +17,7 @@ describe('Waku Dial', function () { const [waku1, waku2] = await Promise.all([ Waku.create({ staticNoiseKey: NOISE_KEY_1, - listenAddresses: ['/ip4/0.0.0.0/tcp/0/wss'], + libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } }, }), Waku.create({ staticNoiseKey: NOISE_KEY_2 }), ]); @@ -39,8 +39,10 @@ describe('Waku Dial', function () { this.timeout(10_000); const waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - listenAddresses: ['/ip4/0.0.0.0/tcp/0'], - modules: { transport: [TCP] }, + libp2p: { + addresses: { listen: ['/ip4/0.0.0.0/tcp/0'] }, + modules: { transport: [TCP] }, + }, }); const multiAddrWithId = waku.getLocalMultiaddrWithID(); diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 4184d092d5..5cdf4e855c 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -1,4 +1,4 @@ -import Libp2p, { Libp2pConfig, Libp2pModules, Libp2pOptions } from 'libp2p'; +import Libp2p, { Libp2pModules, Libp2pOptions } from 'libp2p'; import Mplex from 'libp2p-mplex'; import { bytes } from 'libp2p-noise/dist/src/@types/basic'; import { Noise } from 'libp2p-noise/dist/src/noise'; @@ -11,16 +11,26 @@ import { WakuLightPush } from './waku_light_push'; import { RelayCodec, WakuRelay } from './waku_relay'; import { StoreCodec, WakuStore } from './waku_store'; -const transportKey = Websockets.prototype[Symbol.toStringTag]; +const websocketsTransportKey = Websockets.prototype[Symbol.toStringTag]; -export type CreateOptions = - | { - listenAddresses: string[] | undefined; - staticNoiseKey: bytes | undefined; - modules: Partial; - config: Partial; - } - | (Libp2pOptions & import('libp2p').CreateOptions); +export interface CreateOptions { + /** + * You can pass options to the `Libp2p` instance used by {@link Waku} using the {@link CreateOptions.libp2p} property. + * This property is the same type than the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) + * apart that we made the `modules` property optional and partial, + * allowing its omission and letting Waku set good defaults. + * Notes that some values are overridden by {@link Waku} to ensure it implements the Waku protocol. + */ + libp2p?: Omit & { + modules?: Partial; + }; + /** + * Byte array used as key for the noise protocol used for connection encryption + * by [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) + * This is only used for test purposes to not run out of entropy during CI runs. + */ + staticNoiseKey?: bytes; +} export class Waku { public libp2p: Libp2p; @@ -44,52 +54,45 @@ export class Waku { * * @param options Takes the same options than `Libp2p`. */ - static async create(options: Partial): Promise { - const opts = Object.assign( - { - listenAddresses: [], - staticNoiseKey: undefined, - }, - options - ); + static async create(options?: CreateOptions): Promise { + // Get an object in case options or libp2p are undefined + const libp2pOpts = Object.assign({}, options?.libp2p); - opts.config = Object.assign( + // Default for Websocket filter is `all`: + // Returns all TCP and DNS based addresses, both with ws or wss. + libp2pOpts.config = Object.assign( { transport: { - [transportKey]: { + [websocketsTransportKey]: { filter: filters.all, }, }, }, - options.config + options?.libp2p?.config ); - opts.modules = Object.assign({}, options.modules); + libp2pOpts.modules = Object.assign({}, options?.libp2p?.modules); - let transport = [Websockets]; - if (opts.modules?.transport) { - transport = transport.concat(opts.modules?.transport); - } + // Default transport for libp2p is Websockets + libp2pOpts.modules = Object.assign( + { + transport: [Websockets], + }, + options?.libp2p?.modules + ); - // FIXME: By controlling the creation of libp2p we have to think about what - // needs to be exposed and what does not. Ideally, we should be able to let - // the user create the WakuStore, WakuRelay instances and pass them when - // creating the libp2p instance. - const libp2p = await Libp2p.create({ - addresses: { - listen: opts.listenAddresses, - }, - modules: { - transport, - streamMuxer: [Mplex], - connEncryption: [new Noise(opts.staticNoiseKey)], - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore: Type needs update - pubsub: WakuRelay, - }, - config: opts.config, + // streamMuxer, connection encryption and pubsub are overridden + // as those are the only ones currently supported by Waku nodes. + libp2pOpts.modules = Object.assign(libp2pOpts.modules, { + streamMuxer: [Mplex], + connEncryption: [new Noise(options?.staticNoiseKey)], + pubsub: WakuRelay, }); + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore: modules property is correctly set thanks to voodoo + const libp2p = await Libp2p.create(libp2pOpts); + const wakuStore = new WakuStore(libp2p); const wakuLightPush = new WakuLightPush(libp2p); diff --git a/src/lib/waku_light_push/index.spec.ts b/src/lib/waku_light_push/index.spec.ts index a6f0450498..620dc815b8 100644 --- a/src/lib/waku_light_push/index.spec.ts +++ b/src/lib/waku_light_push/index.spec.ts @@ -23,7 +23,7 @@ describe('Waku Light Push', () => { waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, }); await waku.dial(await nimWaku.getMultiaddrWithId()); diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index d05eef324f..bc826744b6 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -31,7 +31,7 @@ describe('Waku Relay', () => { Waku.create({ staticNoiseKey: NOISE_KEY_1 }), Waku.create({ staticNoiseKey: NOISE_KEY_2, - listenAddresses: ['/ip4/0.0.0.0/tcp/0/wss'], + libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } }, }), ]); @@ -153,8 +153,10 @@ describe('Waku Relay', () => { log('Create waku node'); waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - listenAddresses: ['/ip4/0.0.0.0/tcp/0'], - modules: { transport: [TCP] }, + libp2p: { + addresses: { listen: ['/ip4/0.0.0.0/tcp/0'] }, + modules: { transport: [TCP] }, + }, }); const multiAddrWithId = waku.getLocalMultiaddrWithID(); @@ -231,7 +233,7 @@ describe('Waku Relay', () => { this.timeout(30_000); waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, }); nimWaku = new NimWaku(this.test?.ctx?.currentTest?.title + ''); @@ -328,11 +330,11 @@ describe('Waku Relay', () => { [waku1, waku2] = await Promise.all([ Waku.create({ staticNoiseKey: NOISE_KEY_1, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, }), Waku.create({ staticNoiseKey: NOISE_KEY_2, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, }), ]); diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 81eb4acbe6..f27a51d9ae 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -30,7 +30,7 @@ describe('Waku Store', () => { waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, }); await waku.dial(await nimWaku.getMultiaddrWithId()); @@ -67,7 +67,7 @@ describe('Waku Store', () => { waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, }); await waku.dial(await nimWaku.getMultiaddrWithId()); From d8629b7fbc47d8137c6f621376a97c104e91e877 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Tue, 8 Jun 2021 22:02:22 +1000 Subject: [PATCH 2/6] Define type for WakuRelay create options --- src/lib/waku_relay/index.ts | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 368e580fda..b11eef2a1d 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -36,7 +36,8 @@ interface GossipOptions { doPX: boolean; msgIdFn: MessageIdFunction; messageCache: MessageCache; - globalSignaturePolicy: string; + // This option is always overridden + // globalSignaturePolicy: string; scoreParams: Partial; scoreThresholds: Partial; directPeers: AddrInfo[]; @@ -48,6 +49,8 @@ interface GossipOptions { Dlazy: number; } +export type WakuRelayOptions = GossipOptions; + /** * Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}. * Must be passed as a `pubsub` module to a {Libp2p} instance. @@ -70,7 +73,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { * @param {Libp2p} libp2p * @param {Partial} [options] */ - constructor(libp2p: Libp2p, options?: Partial) { + constructor(libp2p: Libp2p, options?: Partial) { super( libp2p, Object.assign(options, { From 4366618bda8c266346bdb8d0ecf46e594b6d97b5 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 9 Jun 2021 10:12:45 +1000 Subject: [PATCH 3/6] Exclude internal methods from doc At this stage we don't want developers to use any GossipSub method but iinstead improve the WakuRelay interface. --- package.json | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index b1e07b20ad..2a5e65ed1f 100644 --- a/package.json +++ b/package.json @@ -44,8 +44,8 @@ "cov:send": "run-s cov:lcov && codecov", "cov:check": "nyc report && nyc check-coverage --lines 100 --functions 100 --branches 100", "doc": "run-s doc:html && open-cli build/docs/index.html", - "doc:html": "typedoc --exclude **/*.spec.ts --out build/docs src/", - "doc:json": "typedoc src/ --exclude **/*.spec.ts --json build/docs/typedoc.json", + "doc:html": "typedoc --excludeInternal --listInvalidSymbolLinks --exclude **/*.spec.ts --out build/docs src/", + "doc:json": "typedoc src/ --excludeInternal --listInvalidSymbolLinks --exclude **/*.spec.ts --json build/docs/typedoc.json", "doc:publish": "gh-pages -m \"[ci skip] Updates\" -d build/docs", "version": "standard-version", "reset-hard": "git clean -dfx && git reset --hard && npm i && npm run build && for d in examples/*; do (cd $d; npm i); done", From eb521b4dbd28e1d6ee644ddf0719008a4773bf1a Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 9 Jun 2021 10:13:41 +1000 Subject: [PATCH 4/6] Extract subscription logic from start method --- src/lib/waku_relay/index.ts | 45 ++++++++++++++++++++++--------------- 1 file changed, 27 insertions(+), 18 deletions(-) diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index b11eef2a1d..c1dabf8d71 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -98,24 +98,8 @@ export class WakuRelay extends Gossipsub implements Pubsub { * @returns {void} */ public start(): void { - this.on(constants.DefaultPubsubTopic, (event) => { - const wakuMsg = WakuMessage.decode(event.data); - if (this.observers['']) { - this.observers[''].forEach((callbackFn) => { - callbackFn(wakuMsg); - }); - } - if (wakuMsg.contentTopic) { - if (this.observers[wakuMsg.contentTopic]) { - this.observers[wakuMsg.contentTopic].forEach((callbackFn) => { - callbackFn(wakuMsg); - }); - } - } - }); - super.start(); - super.subscribe(constants.DefaultPubsubTopic); + this.subscribe(constants.DefaultPubsubTopic); } /** @@ -168,12 +152,37 @@ export class WakuRelay extends Gossipsub implements Pubsub { }); } + /** + * Subscribe to a pubsub topic and start emitting Waku messages to observers. + * + * @override + */ + subscribe(pubsubTopic: string): void { + this.on(pubsubTopic, (event) => { + const wakuMsg = WakuMessage.decode(event.data); + if (this.observers['']) { + this.observers[''].forEach((callbackFn) => { + callbackFn(wakuMsg); + }); + } + if (wakuMsg.contentTopic) { + if (this.observers[wakuMsg.contentTopic]) { + this.observers[wakuMsg.contentTopic].forEach((callbackFn) => { + callbackFn(wakuMsg); + }); + } + } + }); + + super.subscribe(pubsubTopic); + } + /** * Join pubsub topic. * This is present to override the behavior of Gossipsub and should not * be used by API Consumers * - * @ignore + * @internal * @param {string} topic * @returns {void} * @override From f0f14f9995e3cae108dc588e75dcf19b62ca1fdf Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 9 Jun 2021 13:30:46 +1000 Subject: [PATCH 5/6] Rename type before introducing CreateOptions --- src/lib/waku_store/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 619f4b8f88..3a360c4357 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -13,7 +13,7 @@ export const StoreCodec = '/vac/waku/store/2.0.0-beta3'; export { Direction }; -export interface Options { +export interface QueryOptions { peerId: PeerId; contentTopics: string[]; pubsubTopic?: string; @@ -40,7 +40,7 @@ export class WakuStore { * @param options.callback Callback called on page of stored messages as they are retrieved * @throws If not able to reach the peer to query. */ - async queryHistory(options: Options): Promise { + async queryHistory(options: QueryOptions): Promise { const opts = Object.assign( { pubsubTopic: DefaultPubsubTopic, From 5ce0717f05c71a5e8eaf2e8424c9c82c36fdb317 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 9 Jun 2021 12:25:56 +1000 Subject: [PATCH 6/6] Enable passing a custom pubsub topic Note that we currently only support one, and only one, pubsub topic for a given instance across the codebase. The PubSub topic needs to be set when instantiating the Waku* classes. At this stage, we believe that most DApp will use, and only use, the default PubSub topic. Some application want to use an alternative topic but not use the default one so this behaviour should be fine. See #174 for details. --- CHANGELOG.md | 3 ++ src/lib/waku.ts | 26 ++++++++++- src/lib/waku_light_push/index.spec.ts | 40 ++++++++++++++++ src/lib/waku_light_push/index.ts | 24 +++++++++- src/lib/waku_relay/index.spec.ts | 66 +++++++++++++++++++++++++++ src/lib/waku_relay/index.ts | 30 ++++++------ src/lib/waku_store/index.spec.ts | 42 +++++++++++++++++ src/lib/waku_store/index.ts | 32 ++++++++++--- src/test_utils/nim_waku.ts | 8 +++- 9 files changed, 247 insertions(+), 24 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c18548745a..7a25e95883 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Doc: Link to new [topic guidelines](https://rfc.vac.dev/spec/23/) in README. - Doc: Link to [Waku v2 Toy Chat specs](https://rfc.vac.dev/spec/22/) in README. - Examples (web chat): Persist nick. +- Support for custom PubSub Topics to `Waku`, `WakuRelay`, `WakuStore` and `WakuLightPush`; + Passing a PubSub Topic is optional and still defaults to `/waku/2/default-waku/proto`; + JS-Waku currently supports one, and only, PubSub topic per instance. ## [0.5.0] - 2021-05-21 diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 5cdf4e855c..f46330dfde 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -14,6 +14,20 @@ import { StoreCodec, WakuStore } from './waku_store'; const websocketsTransportKey = Websockets.prototype[Symbol.toStringTag]; export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * One and only one pubsub topic is used by Waku. This is used by: + * - WakuRelay to receive, route and send messages, + * - WakuLightPush to send messages, + * - WakuStore to retrieve messages. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; /** * You can pass options to the `Libp2p` instance used by {@link Waku} using the {@link CreateOptions.libp2p} property. * This property is the same type than the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) @@ -71,6 +85,14 @@ export class Waku { options?.libp2p?.config ); + // Pass pubsub topic to relay + if (options?.pubsubTopic) { + libp2pOpts.config.pubsub = Object.assign( + { pubsubTopic: options.pubsubTopic }, + libp2pOpts.config.pubsub + ); + } + libp2pOpts.modules = Object.assign({}, options?.libp2p?.modules); // Default transport for libp2p is Websockets @@ -93,7 +115,9 @@ export class Waku { // @ts-ignore: modules property is correctly set thanks to voodoo const libp2p = await Libp2p.create(libp2pOpts); - const wakuStore = new WakuStore(libp2p); + const wakuStore = new WakuStore(libp2p, { + pubsubTopic: options?.pubsubTopic, + }); const wakuLightPush = new WakuLightPush(libp2p); await libp2p.start(); diff --git a/src/lib/waku_light_push/index.spec.ts b/src/lib/waku_light_push/index.spec.ts index 620dc815b8..ad9e1e062d 100644 --- a/src/lib/waku_light_push/index.spec.ts +++ b/src/lib/waku_light_push/index.spec.ts @@ -51,4 +51,44 @@ describe('Waku Light Push', () => { expect(msgs[0].version).to.equal(message.version); expect(msgs[0].payloadAsUtf8).to.equal(messageText); }); + + it('Push on custom pubsub topic', async function () { + this.timeout(5_000); + + const customPubSubTopic = '/waku/2/custom-dapp/proto'; + + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ lightpush: true, topics: customPubSubTopic }); + + waku = await Waku.create({ + pubsubTopic: customPubSubTopic, + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + const nimPeerId = await nimWaku.getPeerId(); + + const messageText = 'Light Push works!'; + const message = WakuMessage.fromUtf8String(messageText); + + const pushResponse = await waku.lightPush.push(nimPeerId, message); + expect(pushResponse?.isSuccess).to.be.true; + + let msgs: WakuMessage[] = []; + + while (msgs.length === 0) { + await delay(200); + msgs = await nimWaku.messages(); + } + + expect(msgs[0].contentTopic).to.equal(message.contentTopic); + expect(msgs[0].version).to.equal(message.version); + expect(msgs[0].payloadAsUtf8).to.equal(messageText); + }); }); diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index f2189ec17e..c5a6c58046 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -13,16 +13,36 @@ import { PushRPC } from './push_rpc'; export const LightPushCodec = '/vac/waku/lightpush/2.0.0-alpha1'; export { PushResponse }; +export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; +} + /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ export class WakuLightPush { - constructor(public libp2p: Libp2p) {} + pubsubTopic: string; + + constructor(public libp2p: Libp2p, options?: CreateOptions) { + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = DefaultPubsubTopic; + } + } async push( peerId: PeerId, message: WakuMessage, - pubsubTopic: string = DefaultPubsubTopic + pubsubTopic: string = this.pubsubTopic ): Promise { const peer = this.libp2p.peerStore.get(peerId); if (!peer) throw 'Peer is unknown'; diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index bc826744b6..6c1c26e88c 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -142,6 +142,72 @@ describe('Waku Relay', () => { }); }); + describe('Custom pubsub topic', () => { + it('Publish', async function () { + this.timeout(10000); + + const pubsubTopic = '/some/pubsub/topic'; + + // 1 and 2 uses a custom pubsub + const [waku1, waku2, waku3] = await Promise.all([ + Waku.create({ + pubsubTopic, + staticNoiseKey: NOISE_KEY_1, + }), + Waku.create({ + pubsubTopic, + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } }, + }), + Waku.create({ + staticNoiseKey: NOISE_KEY_2, + }), + ]); + + waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); + waku3.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); + + await Promise.all([ + new Promise((resolve) => + waku1.libp2p.pubsub.once('pubsub:subscription-change', () => + resolve(null) + ) + ), + new Promise((resolve) => + waku2.libp2p.pubsub.once('pubsub:subscription-change', () => + resolve(null) + ) + ), + // No subscription change expected for Waku 3 + ]); + + const messageText = 'Communicating using a custom pubsub topic'; + const message = WakuMessage.fromUtf8String(messageText); + + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + waku2.relay.addObserver(resolve); + } + ); + + // The promise **fails** if we receive a message on the default + // pubsub topic. + const waku3NoMsgPromise: Promise = new Promise( + (resolve, reject) => { + waku3.relay.addObserver(reject); + setTimeout(resolve, 1000); + } + ); + + await waku1.relay.send(message); + + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; + await waku3NoMsgPromise; + + expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(messageText); + }); + }); + describe('Interop: Nim', function () { describe('Nim connects to js', function () { let waku: Waku; diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index c1dabf8d71..c846b7f85e 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -16,6 +16,7 @@ import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import PeerId from 'peer-id'; +import { CreateOptions } from '../waku'; import { WakuMessage } from '../waku_message'; import * as constants from './constants'; @@ -26,9 +27,9 @@ import { RelayHeartbeat } from './relay_heartbeat'; export { RelayCodec, DefaultPubsubTopic }; /** - * See {GossipOptions} from libp2p-gossipsub + * See constructor libp2p-gossipsub [API](https://github.com/ChainSafe/js-libp2p-gossipsub#api). */ -interface GossipOptions { +export interface GossipOptions { emitSelf: boolean; gossipIncoming: boolean; fallbackToFloodsub: boolean; @@ -49,8 +50,6 @@ interface GossipOptions { Dlazy: number; } -export type WakuRelayOptions = GossipOptions; - /** * Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}. * Must be passed as a `pubsub` module to a {Libp2p} instance. @@ -60,6 +59,7 @@ export type WakuRelayOptions = GossipOptions; */ export class WakuRelay extends Gossipsub implements Pubsub { heartbeat: RelayHeartbeat; + pubsubTopic: string; /** * observers called when receiving new message. * Observers under key "" are always called. @@ -68,12 +68,10 @@ export class WakuRelay extends Gossipsub implements Pubsub { [contentTopic: string]: Array<(message: WakuMessage) => void>; }; - /** - * - * @param {Libp2p} libp2p - * @param {Partial} [options] - */ - constructor(libp2p: Libp2p, options?: Partial) { + constructor( + libp2p: Libp2p, + options?: Partial + ) { super( libp2p, Object.assign(options, { @@ -88,6 +86,12 @@ export class WakuRelay extends Gossipsub implements Pubsub { const multicodecs = [constants.RelayCodec]; Object.assign(this, { multicodecs }); + + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = constants.DefaultPubsubTopic; + } } /** @@ -99,7 +103,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { */ public start(): void { super.start(); - this.subscribe(constants.DefaultPubsubTopic); + this.subscribe(this.pubsubTopic); } /** @@ -110,7 +114,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { */ public async send(message: WakuMessage): Promise { const msg = message.encode(); - await super.publish(constants.DefaultPubsubTopic, Buffer.from(msg)); + await super.publish(this.pubsubTopic, Buffer.from(msg)); } /** @@ -144,7 +148,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { * Return the relay peers we are connected to and we would publish a message to */ getPeers(): Set { - return getRelayPeers(this, DefaultPubsubTopic, this._options.D, (id) => { + return getRelayPeers(this, this.pubsubTopic, this._options.D, (id) => { // Filter peers we would not publish to return ( this.score.score(id) >= this._options.scoreThresholds.publishThreshold diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index f27a51d9ae..3caf210352 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -93,4 +93,46 @@ describe('Waku Store', () => { ).to.eq(index); } }); + + it('Retrieves history using custom pubsub topic', async function () { + this.timeout(5_000); + + const customPubSubTopic = '/waku/2/custom-dapp/proto'; + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ persistMessages: true, topics: customPubSubTopic }); + + for (let i = 0; i < 2; i++) { + expect( + await nimWaku.sendMessage( + WakuMessage.fromUtf8String(`Message ${i}`), + customPubSubTopic + ) + ).to.be.true; + } + + waku = await Waku.create({ + pubsubTopic: customPubSubTopic, + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + const nimPeerId = await nimWaku.getPeerId(); + + const messages = await waku.store.queryHistory({ + peerId: nimPeerId, + contentTopics: [], + }); + + expect(messages?.length).eq(2); + const result = messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === 'Message 0'; + }); + expect(result).to.not.eq(-1); + }); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 3a360c4357..bb14790f2d 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -13,6 +13,18 @@ export const StoreCodec = '/vac/waku/store/2.0.0-beta3'; export { Direction }; +export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; +} + export interface QueryOptions { peerId: PeerId; contentTopics: string[]; @@ -26,24 +38,32 @@ export interface QueryOptions { * Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/). */ export class WakuStore { - constructor(public libp2p: Libp2p) {} + pubsubTopic: string; + + constructor(public libp2p: Libp2p, options?: CreateOptions) { + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = DefaultPubsubTopic; + } + } /** * Query given peer using Waku Store. * * @param options - * @param options.peerId The peer to query. - * @param options.contentTopics The content topics to retrieve, leave empty to + * @param options.peerId The peer to query.Options + * @param options.contentTopics The content topics to pass to the query, leave empty to * retrieve all messages. - * @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes - * use the same pubsub topic. This is reserved for future applications. + * @param options.pubsubTopic The pubsub topic to pass to the query. Defaults + * to the value set at creation. See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/). * @param options.callback Callback called on page of stored messages as they are retrieved * @throws If not able to reach the peer to query. */ async queryHistory(options: QueryOptions): Promise { const opts = Object.assign( { - pubsubTopic: DefaultPubsubTopic, + pubsubTopic: this.pubsubTopic, direction: Direction.BACKWARD, pageSize: 10, }, diff --git a/src/test_utils/nim_waku.ts b/src/test_utils/nim_waku.ts index 0f43c5066a..8b5ac45179 100644 --- a/src/test_utils/nim_waku.ts +++ b/src/test_utils/nim_waku.ts @@ -40,6 +40,7 @@ export interface Args { logLevel?: LogLevel; persistMessages?: boolean; lightpush?: boolean; + topics?: string; } export enum LogLevel { @@ -155,7 +156,10 @@ export class NimWaku { return this.rpcCall('get_waku_v2_debug_v1_info', []); } - async sendMessage(message: WakuMessage): Promise { + async sendMessage( + message: WakuMessage, + pubsubTopic?: string + ): Promise { this.checkProcess(); if (!message.payload) { @@ -168,7 +172,7 @@ export class NimWaku { }; return this.rpcCall('post_waku_v2_relay_v1_message', [ - DefaultPubsubTopic, + pubsubTopic ? pubsubTopic : DefaultPubsubTopic, rpcMessage, ]); }