diff --git a/CHANGELOG.md b/CHANGELOG.md index 3e7dcc8bcc..7a25e95883 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,12 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Changed +- **Breaking**: Options passed to `Waku.create` used to be passed to `Libp2p.create`; + Now, only the `libp2p` property is passed to `Libp2p.create`, allowing for a cleaner interface. + ### Added - Enable access to `WakuMessage.timestamp`. - Examples (web chat): Use `WakuMessage.timestamp` as unique key for list items. - Doc: Link to new [topic guidelines](https://rfc.vac.dev/spec/23/) in README. - Doc: Link to [Waku v2 Toy Chat specs](https://rfc.vac.dev/spec/22/) in README. - Examples (web chat): Persist nick. +- Support for custom PubSub Topics to `Waku`, `WakuRelay`, `WakuStore` and `WakuLightPush`; + Passing a PubSub Topic is optional and still defaults to `/waku/2/default-waku/proto`; + JS-Waku currently supports one, and only, PubSub topic per instance. ## [0.5.0] - 2021-05-21 diff --git a/examples/cli-chat/src/chat.ts b/examples/cli-chat/src/chat.ts index 32d286740e..0f29881e73 100644 --- a/examples/cli-chat/src/chat.ts +++ b/examples/cli-chat/src/chat.ts @@ -27,8 +27,10 @@ export default async function startChat(): Promise { } const waku = await Waku.create({ - listenAddresses: [opts.listenAddr], - modules: { transport: [TCP] }, + libp2p: { + addresses: { listen: [opts.listenAddr] }, + modules: { transport: [TCP] }, + }, }); console.log('PeerId: ', waku.libp2p.peerId.toB58String()); console.log('Listening on '); diff --git a/examples/web-chat/src/App.tsx b/examples/web-chat/src/App.tsx index db8a8df8db..24115bfeb9 100644 --- a/examples/web-chat/src/App.tsx +++ b/examples/web-chat/src/App.tsx @@ -181,10 +181,12 @@ export default function App() { async function initWaku(setter: (waku: Waku) => void) { try { const waku = await Waku.create({ - config: { - pubsub: { - enabled: true, - emitSelf: true, + libp2p: { + config: { + pubsub: { + enabled: true, + emitSelf: true, + }, }, }, }); diff --git a/package.json b/package.json index b1e07b20ad..2a5e65ed1f 100644 --- a/package.json +++ b/package.json @@ -44,8 +44,8 @@ "cov:send": "run-s cov:lcov && codecov", "cov:check": "nyc report && nyc check-coverage --lines 100 --functions 100 --branches 100", "doc": "run-s doc:html && open-cli build/docs/index.html", - "doc:html": "typedoc --exclude **/*.spec.ts --out build/docs src/", - "doc:json": "typedoc src/ --exclude **/*.spec.ts --json build/docs/typedoc.json", + "doc:html": "typedoc --excludeInternal --listInvalidSymbolLinks --exclude **/*.spec.ts --out build/docs src/", + "doc:json": "typedoc src/ --excludeInternal --listInvalidSymbolLinks --exclude **/*.spec.ts --json build/docs/typedoc.json", "doc:publish": "gh-pages -m \"[ci skip] Updates\" -d build/docs", "version": "standard-version", "reset-hard": "git clean -dfx && git reset --hard && npm i && npm run build && for d in examples/*; do (cd $d; npm i); done", diff --git a/src/lib/waku.spec.ts b/src/lib/waku.spec.ts index c0fe3b982a..635ace2655 100644 --- a/src/lib/waku.spec.ts +++ b/src/lib/waku.spec.ts @@ -17,7 +17,7 @@ describe('Waku Dial', function () { const [waku1, waku2] = await Promise.all([ Waku.create({ staticNoiseKey: NOISE_KEY_1, - listenAddresses: ['/ip4/0.0.0.0/tcp/0/wss'], + libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } }, }), Waku.create({ staticNoiseKey: NOISE_KEY_2 }), ]); @@ -39,8 +39,10 @@ describe('Waku Dial', function () { this.timeout(10_000); const waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - listenAddresses: ['/ip4/0.0.0.0/tcp/0'], - modules: { transport: [TCP] }, + libp2p: { + addresses: { listen: ['/ip4/0.0.0.0/tcp/0'] }, + modules: { transport: [TCP] }, + }, }); const multiAddrWithId = waku.getLocalMultiaddrWithID(); diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 4184d092d5..f46330dfde 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -1,4 +1,4 @@ -import Libp2p, { Libp2pConfig, Libp2pModules, Libp2pOptions } from 'libp2p'; +import Libp2p, { Libp2pModules, Libp2pOptions } from 'libp2p'; import Mplex from 'libp2p-mplex'; import { bytes } from 'libp2p-noise/dist/src/@types/basic'; import { Noise } from 'libp2p-noise/dist/src/noise'; @@ -11,16 +11,40 @@ import { WakuLightPush } from './waku_light_push'; import { RelayCodec, WakuRelay } from './waku_relay'; import { StoreCodec, WakuStore } from './waku_store'; -const transportKey = Websockets.prototype[Symbol.toStringTag]; +const websocketsTransportKey = Websockets.prototype[Symbol.toStringTag]; -export type CreateOptions = - | { - listenAddresses: string[] | undefined; - staticNoiseKey: bytes | undefined; - modules: Partial; - config: Partial; - } - | (Libp2pOptions & import('libp2p').CreateOptions); +export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * One and only one pubsub topic is used by Waku. This is used by: + * - WakuRelay to receive, route and send messages, + * - WakuLightPush to send messages, + * - WakuStore to retrieve messages. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; + /** + * You can pass options to the `Libp2p` instance used by {@link Waku} using the {@link CreateOptions.libp2p} property. + * This property is the same type than the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) + * apart that we made the `modules` property optional and partial, + * allowing its omission and letting Waku set good defaults. + * Notes that some values are overridden by {@link Waku} to ensure it implements the Waku protocol. + */ + libp2p?: Omit & { + modules?: Partial; + }; + /** + * Byte array used as key for the noise protocol used for connection encryption + * by [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) + * This is only used for test purposes to not run out of entropy during CI runs. + */ + staticNoiseKey?: bytes; +} export class Waku { public libp2p: Libp2p; @@ -44,53 +68,56 @@ export class Waku { * * @param options Takes the same options than `Libp2p`. */ - static async create(options: Partial): Promise { - const opts = Object.assign( - { - listenAddresses: [], - staticNoiseKey: undefined, - }, - options - ); + static async create(options?: CreateOptions): Promise { + // Get an object in case options or libp2p are undefined + const libp2pOpts = Object.assign({}, options?.libp2p); - opts.config = Object.assign( + // Default for Websocket filter is `all`: + // Returns all TCP and DNS based addresses, both with ws or wss. + libp2pOpts.config = Object.assign( { transport: { - [transportKey]: { + [websocketsTransportKey]: { filter: filters.all, }, }, }, - options.config + options?.libp2p?.config ); - opts.modules = Object.assign({}, options.modules); - - let transport = [Websockets]; - if (opts.modules?.transport) { - transport = transport.concat(opts.modules?.transport); + // Pass pubsub topic to relay + if (options?.pubsubTopic) { + libp2pOpts.config.pubsub = Object.assign( + { pubsubTopic: options.pubsubTopic }, + libp2pOpts.config.pubsub + ); } - // FIXME: By controlling the creation of libp2p we have to think about what - // needs to be exposed and what does not. Ideally, we should be able to let - // the user create the WakuStore, WakuRelay instances and pass them when - // creating the libp2p instance. - const libp2p = await Libp2p.create({ - addresses: { - listen: opts.listenAddresses, + libp2pOpts.modules = Object.assign({}, options?.libp2p?.modules); + + // Default transport for libp2p is Websockets + libp2pOpts.modules = Object.assign( + { + transport: [Websockets], }, - modules: { - transport, - streamMuxer: [Mplex], - connEncryption: [new Noise(opts.staticNoiseKey)], - // eslint-disable-next-line @typescript-eslint/ban-ts-comment - // @ts-ignore: Type needs update - pubsub: WakuRelay, - }, - config: opts.config, + options?.libp2p?.modules + ); + + // streamMuxer, connection encryption and pubsub are overridden + // as those are the only ones currently supported by Waku nodes. + libp2pOpts.modules = Object.assign(libp2pOpts.modules, { + streamMuxer: [Mplex], + connEncryption: [new Noise(options?.staticNoiseKey)], + pubsub: WakuRelay, }); - const wakuStore = new WakuStore(libp2p); + // eslint-disable-next-line @typescript-eslint/ban-ts-comment + // @ts-ignore: modules property is correctly set thanks to voodoo + const libp2p = await Libp2p.create(libp2pOpts); + + const wakuStore = new WakuStore(libp2p, { + pubsubTopic: options?.pubsubTopic, + }); const wakuLightPush = new WakuLightPush(libp2p); await libp2p.start(); diff --git a/src/lib/waku_light_push/index.spec.ts b/src/lib/waku_light_push/index.spec.ts index a6f0450498..ad9e1e062d 100644 --- a/src/lib/waku_light_push/index.spec.ts +++ b/src/lib/waku_light_push/index.spec.ts @@ -23,7 +23,47 @@ describe('Waku Light Push', () => { waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + const nimPeerId = await nimWaku.getPeerId(); + + const messageText = 'Light Push works!'; + const message = WakuMessage.fromUtf8String(messageText); + + const pushResponse = await waku.lightPush.push(nimPeerId, message); + expect(pushResponse?.isSuccess).to.be.true; + + let msgs: WakuMessage[] = []; + + while (msgs.length === 0) { + await delay(200); + msgs = await nimWaku.messages(); + } + + expect(msgs[0].contentTopic).to.equal(message.contentTopic); + expect(msgs[0].version).to.equal(message.version); + expect(msgs[0].payloadAsUtf8).to.equal(messageText); + }); + + it('Push on custom pubsub topic', async function () { + this.timeout(5_000); + + const customPubSubTopic = '/waku/2/custom-dapp/proto'; + + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ lightpush: true, topics: customPubSubTopic }); + + waku = await Waku.create({ + pubsubTopic: customPubSubTopic, + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, }); await waku.dial(await nimWaku.getMultiaddrWithId()); diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts index f2189ec17e..c5a6c58046 100644 --- a/src/lib/waku_light_push/index.ts +++ b/src/lib/waku_light_push/index.ts @@ -13,16 +13,36 @@ import { PushRPC } from './push_rpc'; export const LightPushCodec = '/vac/waku/lightpush/2.0.0-alpha1'; export { PushResponse }; +export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; +} + /** * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ export class WakuLightPush { - constructor(public libp2p: Libp2p) {} + pubsubTopic: string; + + constructor(public libp2p: Libp2p, options?: CreateOptions) { + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = DefaultPubsubTopic; + } + } async push( peerId: PeerId, message: WakuMessage, - pubsubTopic: string = DefaultPubsubTopic + pubsubTopic: string = this.pubsubTopic ): Promise { const peer = this.libp2p.peerStore.get(peerId); if (!peer) throw 'Peer is unknown'; diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index d05eef324f..6c1c26e88c 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -31,7 +31,7 @@ describe('Waku Relay', () => { Waku.create({ staticNoiseKey: NOISE_KEY_1 }), Waku.create({ staticNoiseKey: NOISE_KEY_2, - listenAddresses: ['/ip4/0.0.0.0/tcp/0/wss'], + libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } }, }), ]); @@ -142,6 +142,72 @@ describe('Waku Relay', () => { }); }); + describe('Custom pubsub topic', () => { + it('Publish', async function () { + this.timeout(10000); + + const pubsubTopic = '/some/pubsub/topic'; + + // 1 and 2 uses a custom pubsub + const [waku1, waku2, waku3] = await Promise.all([ + Waku.create({ + pubsubTopic, + staticNoiseKey: NOISE_KEY_1, + }), + Waku.create({ + pubsubTopic, + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } }, + }), + Waku.create({ + staticNoiseKey: NOISE_KEY_2, + }), + ]); + + waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); + waku3.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs); + + await Promise.all([ + new Promise((resolve) => + waku1.libp2p.pubsub.once('pubsub:subscription-change', () => + resolve(null) + ) + ), + new Promise((resolve) => + waku2.libp2p.pubsub.once('pubsub:subscription-change', () => + resolve(null) + ) + ), + // No subscription change expected for Waku 3 + ]); + + const messageText = 'Communicating using a custom pubsub topic'; + const message = WakuMessage.fromUtf8String(messageText); + + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + waku2.relay.addObserver(resolve); + } + ); + + // The promise **fails** if we receive a message on the default + // pubsub topic. + const waku3NoMsgPromise: Promise = new Promise( + (resolve, reject) => { + waku3.relay.addObserver(reject); + setTimeout(resolve, 1000); + } + ); + + await waku1.relay.send(message); + + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; + await waku3NoMsgPromise; + + expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(messageText); + }); + }); + describe('Interop: Nim', function () { describe('Nim connects to js', function () { let waku: Waku; @@ -153,8 +219,10 @@ describe('Waku Relay', () => { log('Create waku node'); waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - listenAddresses: ['/ip4/0.0.0.0/tcp/0'], - modules: { transport: [TCP] }, + libp2p: { + addresses: { listen: ['/ip4/0.0.0.0/tcp/0'] }, + modules: { transport: [TCP] }, + }, }); const multiAddrWithId = waku.getLocalMultiaddrWithID(); @@ -231,7 +299,7 @@ describe('Waku Relay', () => { this.timeout(30_000); waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, }); nimWaku = new NimWaku(this.test?.ctx?.currentTest?.title + ''); @@ -328,11 +396,11 @@ describe('Waku Relay', () => { [waku1, waku2] = await Promise.all([ Waku.create({ staticNoiseKey: NOISE_KEY_1, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, }), Waku.create({ staticNoiseKey: NOISE_KEY_2, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, }), ]); diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 368e580fda..c846b7f85e 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -16,6 +16,7 @@ import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import PeerId from 'peer-id'; +import { CreateOptions } from '../waku'; import { WakuMessage } from '../waku_message'; import * as constants from './constants'; @@ -26,9 +27,9 @@ import { RelayHeartbeat } from './relay_heartbeat'; export { RelayCodec, DefaultPubsubTopic }; /** - * See {GossipOptions} from libp2p-gossipsub + * See constructor libp2p-gossipsub [API](https://github.com/ChainSafe/js-libp2p-gossipsub#api). */ -interface GossipOptions { +export interface GossipOptions { emitSelf: boolean; gossipIncoming: boolean; fallbackToFloodsub: boolean; @@ -36,7 +37,8 @@ interface GossipOptions { doPX: boolean; msgIdFn: MessageIdFunction; messageCache: MessageCache; - globalSignaturePolicy: string; + // This option is always overridden + // globalSignaturePolicy: string; scoreParams: Partial; scoreThresholds: Partial; directPeers: AddrInfo[]; @@ -57,6 +59,7 @@ interface GossipOptions { */ export class WakuRelay extends Gossipsub implements Pubsub { heartbeat: RelayHeartbeat; + pubsubTopic: string; /** * observers called when receiving new message. * Observers under key "" are always called. @@ -65,12 +68,10 @@ export class WakuRelay extends Gossipsub implements Pubsub { [contentTopic: string]: Array<(message: WakuMessage) => void>; }; - /** - * - * @param {Libp2p} libp2p - * @param {Partial} [options] - */ - constructor(libp2p: Libp2p, options?: Partial) { + constructor( + libp2p: Libp2p, + options?: Partial + ) { super( libp2p, Object.assign(options, { @@ -85,6 +86,12 @@ export class WakuRelay extends Gossipsub implements Pubsub { const multicodecs = [constants.RelayCodec]; Object.assign(this, { multicodecs }); + + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = constants.DefaultPubsubTopic; + } } /** @@ -95,24 +102,8 @@ export class WakuRelay extends Gossipsub implements Pubsub { * @returns {void} */ public start(): void { - this.on(constants.DefaultPubsubTopic, (event) => { - const wakuMsg = WakuMessage.decode(event.data); - if (this.observers['']) { - this.observers[''].forEach((callbackFn) => { - callbackFn(wakuMsg); - }); - } - if (wakuMsg.contentTopic) { - if (this.observers[wakuMsg.contentTopic]) { - this.observers[wakuMsg.contentTopic].forEach((callbackFn) => { - callbackFn(wakuMsg); - }); - } - } - }); - super.start(); - super.subscribe(constants.DefaultPubsubTopic); + this.subscribe(this.pubsubTopic); } /** @@ -123,7 +114,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { */ public async send(message: WakuMessage): Promise { const msg = message.encode(); - await super.publish(constants.DefaultPubsubTopic, Buffer.from(msg)); + await super.publish(this.pubsubTopic, Buffer.from(msg)); } /** @@ -157,7 +148,7 @@ export class WakuRelay extends Gossipsub implements Pubsub { * Return the relay peers we are connected to and we would publish a message to */ getPeers(): Set { - return getRelayPeers(this, DefaultPubsubTopic, this._options.D, (id) => { + return getRelayPeers(this, this.pubsubTopic, this._options.D, (id) => { // Filter peers we would not publish to return ( this.score.score(id) >= this._options.scoreThresholds.publishThreshold @@ -165,12 +156,37 @@ export class WakuRelay extends Gossipsub implements Pubsub { }); } + /** + * Subscribe to a pubsub topic and start emitting Waku messages to observers. + * + * @override + */ + subscribe(pubsubTopic: string): void { + this.on(pubsubTopic, (event) => { + const wakuMsg = WakuMessage.decode(event.data); + if (this.observers['']) { + this.observers[''].forEach((callbackFn) => { + callbackFn(wakuMsg); + }); + } + if (wakuMsg.contentTopic) { + if (this.observers[wakuMsg.contentTopic]) { + this.observers[wakuMsg.contentTopic].forEach((callbackFn) => { + callbackFn(wakuMsg); + }); + } + } + }); + + super.subscribe(pubsubTopic); + } + /** * Join pubsub topic. * This is present to override the behavior of Gossipsub and should not * be used by API Consumers * - * @ignore + * @internal * @param {string} topic * @returns {void} * @override diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts index 81eb4acbe6..3caf210352 100644 --- a/src/lib/waku_store/index.spec.ts +++ b/src/lib/waku_store/index.spec.ts @@ -30,7 +30,7 @@ describe('Waku Store', () => { waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, }); await waku.dial(await nimWaku.getMultiaddrWithId()); @@ -67,7 +67,7 @@ describe('Waku Store', () => { waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1, - modules: { transport: [TCP] }, + libp2p: { modules: { transport: [TCP] } }, }); await waku.dial(await nimWaku.getMultiaddrWithId()); @@ -93,4 +93,46 @@ describe('Waku Store', () => { ).to.eq(index); } }); + + it('Retrieves history using custom pubsub topic', async function () { + this.timeout(5_000); + + const customPubSubTopic = '/waku/2/custom-dapp/proto'; + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ persistMessages: true, topics: customPubSubTopic }); + + for (let i = 0; i < 2; i++) { + expect( + await nimWaku.sendMessage( + WakuMessage.fromUtf8String(`Message ${i}`), + customPubSubTopic + ) + ).to.be.true; + } + + waku = await Waku.create({ + pubsubTopic: customPubSubTopic, + staticNoiseKey: NOISE_KEY_1, + libp2p: { modules: { transport: [TCP] } }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + const nimPeerId = await nimWaku.getPeerId(); + + const messages = await waku.store.queryHistory({ + peerId: nimPeerId, + contentTopics: [], + }); + + expect(messages?.length).eq(2); + const result = messages?.findIndex((msg) => { + return msg.payloadAsUtf8 === 'Message 0'; + }); + expect(result).to.not.eq(-1); + }); }); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts index 619f4b8f88..bb14790f2d 100644 --- a/src/lib/waku_store/index.ts +++ b/src/lib/waku_store/index.ts @@ -13,7 +13,19 @@ export const StoreCodec = '/vac/waku/store/2.0.0-beta3'; export { Direction }; -export interface Options { +export interface CreateOptions { + /** + * The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}. + * + * The usage of the default pubsub topic is recommended. + * See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details. + * + * @default {@link DefaultPubsubTopic} + */ + pubsubTopic?: string; +} + +export interface QueryOptions { peerId: PeerId; contentTopics: string[]; pubsubTopic?: string; @@ -26,24 +38,32 @@ export interface Options { * Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/). */ export class WakuStore { - constructor(public libp2p: Libp2p) {} + pubsubTopic: string; + + constructor(public libp2p: Libp2p, options?: CreateOptions) { + if (options?.pubsubTopic) { + this.pubsubTopic = options.pubsubTopic; + } else { + this.pubsubTopic = DefaultPubsubTopic; + } + } /** * Query given peer using Waku Store. * * @param options - * @param options.peerId The peer to query. - * @param options.contentTopics The content topics to retrieve, leave empty to + * @param options.peerId The peer to query.Options + * @param options.contentTopics The content topics to pass to the query, leave empty to * retrieve all messages. - * @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes - * use the same pubsub topic. This is reserved for future applications. + * @param options.pubsubTopic The pubsub topic to pass to the query. Defaults + * to the value set at creation. See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/). * @param options.callback Callback called on page of stored messages as they are retrieved * @throws If not able to reach the peer to query. */ - async queryHistory(options: Options): Promise { + async queryHistory(options: QueryOptions): Promise { const opts = Object.assign( { - pubsubTopic: DefaultPubsubTopic, + pubsubTopic: this.pubsubTopic, direction: Direction.BACKWARD, pageSize: 10, }, diff --git a/src/test_utils/nim_waku.ts b/src/test_utils/nim_waku.ts index 0f43c5066a..8b5ac45179 100644 --- a/src/test_utils/nim_waku.ts +++ b/src/test_utils/nim_waku.ts @@ -40,6 +40,7 @@ export interface Args { logLevel?: LogLevel; persistMessages?: boolean; lightpush?: boolean; + topics?: string; } export enum LogLevel { @@ -155,7 +156,10 @@ export class NimWaku { return this.rpcCall('get_waku_v2_debug_v1_info', []); } - async sendMessage(message: WakuMessage): Promise { + async sendMessage( + message: WakuMessage, + pubsubTopic?: string + ): Promise { this.checkProcess(); if (!message.payload) { @@ -168,7 +172,7 @@ export class NimWaku { }; return this.rpcCall('post_waku_v2_relay_v1_message', [ - DefaultPubsubTopic, + pubsubTopic ? pubsubTopic : DefaultPubsubTopic, rpcMessage, ]); }