Enable passing a custom pubsub topic

Note that we currently only support one, and only one, pubsub topic for
a given instance across the codebase. The PubSub topic needs to be set
when instantiating the Waku* classes.

At this stage, we believe that most DApp will use, and only use, the
default PubSub topic. Some application want to use an alternative topic
but not use the default one so this behaviour should be fine. See #174
for details.
This commit is contained in:
Franck Royer 2021-06-09 12:25:56 +10:00
parent f0f14f9995
commit 5ce0717f05
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
9 changed files with 247 additions and 24 deletions

View File

@ -17,6 +17,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Doc: Link to new [topic guidelines](https://rfc.vac.dev/spec/23/) in README. - Doc: Link to new [topic guidelines](https://rfc.vac.dev/spec/23/) in README.
- Doc: Link to [Waku v2 Toy Chat specs](https://rfc.vac.dev/spec/22/) in README. - Doc: Link to [Waku v2 Toy Chat specs](https://rfc.vac.dev/spec/22/) in README.
- Examples (web chat): Persist nick. - Examples (web chat): Persist nick.
- Support for custom PubSub Topics to `Waku`, `WakuRelay`, `WakuStore` and `WakuLightPush`;
Passing a PubSub Topic is optional and still defaults to `/waku/2/default-waku/proto`;
JS-Waku currently supports one, and only, PubSub topic per instance.
## [0.5.0] - 2021-05-21 ## [0.5.0] - 2021-05-21

View File

@ -14,6 +14,20 @@ import { StoreCodec, WakuStore } from './waku_store';
const websocketsTransportKey = Websockets.prototype[Symbol.toStringTag]; const websocketsTransportKey = Websockets.prototype[Symbol.toStringTag];
export interface CreateOptions { export interface CreateOptions {
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}.
*
* One and only one pubsub topic is used by Waku. This is used by:
* - WakuRelay to receive, route and send messages,
* - WakuLightPush to send messages,
* - WakuStore to retrieve messages.
*
* The usage of the default pubsub topic is recommended.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
* @default {@link DefaultPubsubTopic}
*/
pubsubTopic?: string;
/** /**
* You can pass options to the `Libp2p` instance used by {@link Waku} using the {@link CreateOptions.libp2p} property. * You can pass options to the `Libp2p` instance used by {@link Waku} using the {@link CreateOptions.libp2p} property.
* This property is the same type than the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create) * This property is the same type than the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
@ -71,6 +85,14 @@ export class Waku {
options?.libp2p?.config options?.libp2p?.config
); );
// Pass pubsub topic to relay
if (options?.pubsubTopic) {
libp2pOpts.config.pubsub = Object.assign(
{ pubsubTopic: options.pubsubTopic },
libp2pOpts.config.pubsub
);
}
libp2pOpts.modules = Object.assign({}, options?.libp2p?.modules); libp2pOpts.modules = Object.assign({}, options?.libp2p?.modules);
// Default transport for libp2p is Websockets // Default transport for libp2p is Websockets
@ -93,7 +115,9 @@ export class Waku {
// @ts-ignore: modules property is correctly set thanks to voodoo // @ts-ignore: modules property is correctly set thanks to voodoo
const libp2p = await Libp2p.create(libp2pOpts); const libp2p = await Libp2p.create(libp2pOpts);
const wakuStore = new WakuStore(libp2p); const wakuStore = new WakuStore(libp2p, {
pubsubTopic: options?.pubsubTopic,
});
const wakuLightPush = new WakuLightPush(libp2p); const wakuLightPush = new WakuLightPush(libp2p);
await libp2p.start(); await libp2p.start();

View File

@ -51,4 +51,44 @@ describe('Waku Light Push', () => {
expect(msgs[0].version).to.equal(message.version); expect(msgs[0].version).to.equal(message.version);
expect(msgs[0].payloadAsUtf8).to.equal(messageText); expect(msgs[0].payloadAsUtf8).to.equal(messageText);
}); });
it('Push on custom pubsub topic', async function () {
this.timeout(5_000);
const customPubSubTopic = '/waku/2/custom-dapp/proto';
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ lightpush: true, topics: customPubSubTopic });
waku = await Waku.create({
pubsubTopic: customPubSubTopic,
staticNoiseKey: NOISE_KEY_1,
libp2p: { modules: { transport: [TCP] } },
});
await waku.dial(await nimWaku.getMultiaddrWithId());
// Wait for identify protocol to finish
await new Promise((resolve) => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
const nimPeerId = await nimWaku.getPeerId();
const messageText = 'Light Push works!';
const message = WakuMessage.fromUtf8String(messageText);
const pushResponse = await waku.lightPush.push(nimPeerId, message);
expect(pushResponse?.isSuccess).to.be.true;
let msgs: WakuMessage[] = [];
while (msgs.length === 0) {
await delay(200);
msgs = await nimWaku.messages();
}
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(msgs[0].version).to.equal(message.version);
expect(msgs[0].payloadAsUtf8).to.equal(messageText);
});
}); });

View File

@ -13,16 +13,36 @@ import { PushRPC } from './push_rpc';
export const LightPushCodec = '/vac/waku/lightpush/2.0.0-alpha1'; export const LightPushCodec = '/vac/waku/lightpush/2.0.0-alpha1';
export { PushResponse }; export { PushResponse };
export interface CreateOptions {
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}.
*
* The usage of the default pubsub topic is recommended.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
* @default {@link DefaultPubsubTopic}
*/
pubsubTopic?: string;
}
/** /**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/ */
export class WakuLightPush { export class WakuLightPush {
constructor(public libp2p: Libp2p) {} pubsubTopic: string;
constructor(public libp2p: Libp2p, options?: CreateOptions) {
if (options?.pubsubTopic) {
this.pubsubTopic = options.pubsubTopic;
} else {
this.pubsubTopic = DefaultPubsubTopic;
}
}
async push( async push(
peerId: PeerId, peerId: PeerId,
message: WakuMessage, message: WakuMessage,
pubsubTopic: string = DefaultPubsubTopic pubsubTopic: string = this.pubsubTopic
): Promise<PushResponse | null> { ): Promise<PushResponse | null> {
const peer = this.libp2p.peerStore.get(peerId); const peer = this.libp2p.peerStore.get(peerId);
if (!peer) throw 'Peer is unknown'; if (!peer) throw 'Peer is unknown';

View File

@ -142,6 +142,72 @@ describe('Waku Relay', () => {
}); });
}); });
describe('Custom pubsub topic', () => {
it('Publish', async function () {
this.timeout(10000);
const pubsubTopic = '/some/pubsub/topic';
// 1 and 2 uses a custom pubsub
const [waku1, waku2, waku3] = await Promise.all([
Waku.create({
pubsubTopic,
staticNoiseKey: NOISE_KEY_1,
}),
Waku.create({
pubsubTopic,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } },
}),
Waku.create({
staticNoiseKey: NOISE_KEY_2,
}),
]);
waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
waku3.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
await Promise.all([
new Promise((resolve) =>
waku1.libp2p.pubsub.once('pubsub:subscription-change', () =>
resolve(null)
)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('pubsub:subscription-change', () =>
resolve(null)
)
),
// No subscription change expected for Waku 3
]);
const messageText = 'Communicating using a custom pubsub topic';
const message = WakuMessage.fromUtf8String(messageText);
const waku2ReceivedMsgPromise: Promise<WakuMessage> = new Promise(
(resolve) => {
waku2.relay.addObserver(resolve);
}
);
// The promise **fails** if we receive a message on the default
// pubsub topic.
const waku3NoMsgPromise: Promise<WakuMessage> = new Promise(
(resolve, reject) => {
waku3.relay.addObserver(reject);
setTimeout(resolve, 1000);
}
);
await waku1.relay.send(message);
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
await waku3NoMsgPromise;
expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(messageText);
});
});
describe('Interop: Nim', function () { describe('Interop: Nim', function () {
describe('Nim connects to js', function () { describe('Nim connects to js', function () {
let waku: Waku; let waku: Waku;

View File

@ -16,6 +16,7 @@ import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub';
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
import { CreateOptions } from '../waku';
import { WakuMessage } from '../waku_message'; import { WakuMessage } from '../waku_message';
import * as constants from './constants'; import * as constants from './constants';
@ -26,9 +27,9 @@ import { RelayHeartbeat } from './relay_heartbeat';
export { RelayCodec, DefaultPubsubTopic }; export { RelayCodec, DefaultPubsubTopic };
/** /**
* See {GossipOptions} from libp2p-gossipsub * See constructor libp2p-gossipsub [API](https://github.com/ChainSafe/js-libp2p-gossipsub#api).
*/ */
interface GossipOptions { export interface GossipOptions {
emitSelf: boolean; emitSelf: boolean;
gossipIncoming: boolean; gossipIncoming: boolean;
fallbackToFloodsub: boolean; fallbackToFloodsub: boolean;
@ -49,8 +50,6 @@ interface GossipOptions {
Dlazy: number; Dlazy: number;
} }
export type WakuRelayOptions = GossipOptions;
/** /**
* Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}. * Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}.
* Must be passed as a `pubsub` module to a {Libp2p} instance. * Must be passed as a `pubsub` module to a {Libp2p} instance.
@ -60,6 +59,7 @@ export type WakuRelayOptions = GossipOptions;
*/ */
export class WakuRelay extends Gossipsub implements Pubsub { export class WakuRelay extends Gossipsub implements Pubsub {
heartbeat: RelayHeartbeat; heartbeat: RelayHeartbeat;
pubsubTopic: string;
/** /**
* observers called when receiving new message. * observers called when receiving new message.
* Observers under key "" are always called. * Observers under key "" are always called.
@ -68,12 +68,10 @@ export class WakuRelay extends Gossipsub implements Pubsub {
[contentTopic: string]: Array<(message: WakuMessage) => void>; [contentTopic: string]: Array<(message: WakuMessage) => void>;
}; };
/** constructor(
* libp2p: Libp2p,
* @param {Libp2p} libp2p options?: Partial<CreateOptions & GossipOptions>
* @param {Partial<GossipOptions>} [options] ) {
*/
constructor(libp2p: Libp2p, options?: Partial<WakuRelayOptions>) {
super( super(
libp2p, libp2p,
Object.assign(options, { Object.assign(options, {
@ -88,6 +86,12 @@ export class WakuRelay extends Gossipsub implements Pubsub {
const multicodecs = [constants.RelayCodec]; const multicodecs = [constants.RelayCodec];
Object.assign(this, { multicodecs }); Object.assign(this, { multicodecs });
if (options?.pubsubTopic) {
this.pubsubTopic = options.pubsubTopic;
} else {
this.pubsubTopic = constants.DefaultPubsubTopic;
}
} }
/** /**
@ -99,7 +103,7 @@ export class WakuRelay extends Gossipsub implements Pubsub {
*/ */
public start(): void { public start(): void {
super.start(); super.start();
this.subscribe(constants.DefaultPubsubTopic); this.subscribe(this.pubsubTopic);
} }
/** /**
@ -110,7 +114,7 @@ export class WakuRelay extends Gossipsub implements Pubsub {
*/ */
public async send(message: WakuMessage): Promise<void> { public async send(message: WakuMessage): Promise<void> {
const msg = message.encode(); const msg = message.encode();
await super.publish(constants.DefaultPubsubTopic, Buffer.from(msg)); await super.publish(this.pubsubTopic, Buffer.from(msg));
} }
/** /**
@ -144,7 +148,7 @@ export class WakuRelay extends Gossipsub implements Pubsub {
* Return the relay peers we are connected to and we would publish a message to * Return the relay peers we are connected to and we would publish a message to
*/ */
getPeers(): Set<string> { getPeers(): Set<string> {
return getRelayPeers(this, DefaultPubsubTopic, this._options.D, (id) => { return getRelayPeers(this, this.pubsubTopic, this._options.D, (id) => {
// Filter peers we would not publish to // Filter peers we would not publish to
return ( return (
this.score.score(id) >= this._options.scoreThresholds.publishThreshold this.score.score(id) >= this._options.scoreThresholds.publishThreshold

View File

@ -93,4 +93,46 @@ describe('Waku Store', () => {
).to.eq(index); ).to.eq(index);
} }
}); });
it('Retrieves history using custom pubsub topic', async function () {
this.timeout(5_000);
const customPubSubTopic = '/waku/2/custom-dapp/proto';
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ persistMessages: true, topics: customPubSubTopic });
for (let i = 0; i < 2; i++) {
expect(
await nimWaku.sendMessage(
WakuMessage.fromUtf8String(`Message ${i}`),
customPubSubTopic
)
).to.be.true;
}
waku = await Waku.create({
pubsubTopic: customPubSubTopic,
staticNoiseKey: NOISE_KEY_1,
libp2p: { modules: { transport: [TCP] } },
});
await waku.dial(await nimWaku.getMultiaddrWithId());
// Wait for identify protocol to finish
await new Promise((resolve) => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
const nimPeerId = await nimWaku.getPeerId();
const messages = await waku.store.queryHistory({
peerId: nimPeerId,
contentTopics: [],
});
expect(messages?.length).eq(2);
const result = messages?.findIndex((msg) => {
return msg.payloadAsUtf8 === 'Message 0';
});
expect(result).to.not.eq(-1);
});
}); });

View File

@ -13,6 +13,18 @@ export const StoreCodec = '/vac/waku/store/2.0.0-beta3';
export { Direction }; export { Direction };
export interface CreateOptions {
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}.
*
* The usage of the default pubsub topic is recommended.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
* @default {@link DefaultPubsubTopic}
*/
pubsubTopic?: string;
}
export interface QueryOptions { export interface QueryOptions {
peerId: PeerId; peerId: PeerId;
contentTopics: string[]; contentTopics: string[];
@ -26,24 +38,32 @@ export interface QueryOptions {
* Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/). * Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/).
*/ */
export class WakuStore { export class WakuStore {
constructor(public libp2p: Libp2p) {} pubsubTopic: string;
constructor(public libp2p: Libp2p, options?: CreateOptions) {
if (options?.pubsubTopic) {
this.pubsubTopic = options.pubsubTopic;
} else {
this.pubsubTopic = DefaultPubsubTopic;
}
}
/** /**
* Query given peer using Waku Store. * Query given peer using Waku Store.
* *
* @param options * @param options
* @param options.peerId The peer to query. * @param options.peerId The peer to query.Options
* @param options.contentTopics The content topics to retrieve, leave empty to * @param options.contentTopics The content topics to pass to the query, leave empty to
* retrieve all messages. * retrieve all messages.
* @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes * @param options.pubsubTopic The pubsub topic to pass to the query. Defaults
* use the same pubsub topic. This is reserved for future applications. * to the value set at creation. See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/).
* @param options.callback Callback called on page of stored messages as they are retrieved * @param options.callback Callback called on page of stored messages as they are retrieved
* @throws If not able to reach the peer to query. * @throws If not able to reach the peer to query.
*/ */
async queryHistory(options: QueryOptions): Promise<WakuMessage[] | null> { async queryHistory(options: QueryOptions): Promise<WakuMessage[] | null> {
const opts = Object.assign( const opts = Object.assign(
{ {
pubsubTopic: DefaultPubsubTopic, pubsubTopic: this.pubsubTopic,
direction: Direction.BACKWARD, direction: Direction.BACKWARD,
pageSize: 10, pageSize: 10,
}, },

View File

@ -40,6 +40,7 @@ export interface Args {
logLevel?: LogLevel; logLevel?: LogLevel;
persistMessages?: boolean; persistMessages?: boolean;
lightpush?: boolean; lightpush?: boolean;
topics?: string;
} }
export enum LogLevel { export enum LogLevel {
@ -155,7 +156,10 @@ export class NimWaku {
return this.rpcCall<RpcInfoResponse>('get_waku_v2_debug_v1_info', []); return this.rpcCall<RpcInfoResponse>('get_waku_v2_debug_v1_info', []);
} }
async sendMessage(message: WakuMessage): Promise<boolean> { async sendMessage(
message: WakuMessage,
pubsubTopic?: string
): Promise<boolean> {
this.checkProcess(); this.checkProcess();
if (!message.payload) { if (!message.payload) {
@ -168,7 +172,7 @@ export class NimWaku {
}; };
return this.rpcCall<boolean>('post_waku_v2_relay_v1_message', [ return this.rpcCall<boolean>('post_waku_v2_relay_v1_message', [
DefaultPubsubTopic, pubsubTopic ? pubsubTopic : DefaultPubsubTopic,
rpcMessage, rpcMessage,
]); ]);
} }