mirror of https://github.com/waku-org/js-waku.git
Merge pull request #201 from status-im/174-custom-pubsub-topic
This commit is contained in:
commit
0136720b48
|
@ -7,12 +7,19 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
### Changed
|
||||
- **Breaking**: Options passed to `Waku.create` used to be passed to `Libp2p.create`;
|
||||
Now, only the `libp2p` property is passed to `Libp2p.create`, allowing for a cleaner interface.
|
||||
|
||||
### Added
|
||||
- Enable access to `WakuMessage.timestamp`.
|
||||
- Examples (web chat): Use `WakuMessage.timestamp` as unique key for list items.
|
||||
- Doc: Link to new [topic guidelines](https://rfc.vac.dev/spec/23/) in README.
|
||||
- Doc: Link to [Waku v2 Toy Chat specs](https://rfc.vac.dev/spec/22/) in README.
|
||||
- Examples (web chat): Persist nick.
|
||||
- Support for custom PubSub Topics to `Waku`, `WakuRelay`, `WakuStore` and `WakuLightPush`;
|
||||
Passing a PubSub Topic is optional and still defaults to `/waku/2/default-waku/proto`;
|
||||
JS-Waku currently supports one, and only, PubSub topic per instance.
|
||||
|
||||
## [0.5.0] - 2021-05-21
|
||||
|
||||
|
|
|
@ -27,8 +27,10 @@ export default async function startChat(): Promise<void> {
|
|||
}
|
||||
|
||||
const waku = await Waku.create({
|
||||
listenAddresses: [opts.listenAddr],
|
||||
libp2p: {
|
||||
addresses: { listen: [opts.listenAddr] },
|
||||
modules: { transport: [TCP] },
|
||||
},
|
||||
});
|
||||
console.log('PeerId: ', waku.libp2p.peerId.toB58String());
|
||||
console.log('Listening on ');
|
||||
|
|
|
@ -181,12 +181,14 @@ export default function App() {
|
|||
async function initWaku(setter: (waku: Waku) => void) {
|
||||
try {
|
||||
const waku = await Waku.create({
|
||||
libp2p: {
|
||||
config: {
|
||||
pubsub: {
|
||||
enabled: true,
|
||||
emitSelf: true,
|
||||
},
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
setter(waku);
|
||||
|
|
|
@ -44,8 +44,8 @@
|
|||
"cov:send": "run-s cov:lcov && codecov",
|
||||
"cov:check": "nyc report && nyc check-coverage --lines 100 --functions 100 --branches 100",
|
||||
"doc": "run-s doc:html && open-cli build/docs/index.html",
|
||||
"doc:html": "typedoc --exclude **/*.spec.ts --out build/docs src/",
|
||||
"doc:json": "typedoc src/ --exclude **/*.spec.ts --json build/docs/typedoc.json",
|
||||
"doc:html": "typedoc --excludeInternal --listInvalidSymbolLinks --exclude **/*.spec.ts --out build/docs src/",
|
||||
"doc:json": "typedoc src/ --excludeInternal --listInvalidSymbolLinks --exclude **/*.spec.ts --json build/docs/typedoc.json",
|
||||
"doc:publish": "gh-pages -m \"[ci skip] Updates\" -d build/docs",
|
||||
"version": "standard-version",
|
||||
"reset-hard": "git clean -dfx && git reset --hard && npm i && npm run build && for d in examples/*; do (cd $d; npm i); done",
|
||||
|
|
|
@ -17,7 +17,7 @@ describe('Waku Dial', function () {
|
|||
const [waku1, waku2] = await Promise.all([
|
||||
Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
listenAddresses: ['/ip4/0.0.0.0/tcp/0/wss'],
|
||||
libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } },
|
||||
}),
|
||||
Waku.create({ staticNoiseKey: NOISE_KEY_2 }),
|
||||
]);
|
||||
|
@ -39,8 +39,10 @@ describe('Waku Dial', function () {
|
|||
this.timeout(10_000);
|
||||
const waku = await Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
listenAddresses: ['/ip4/0.0.0.0/tcp/0'],
|
||||
libp2p: {
|
||||
addresses: { listen: ['/ip4/0.0.0.0/tcp/0'] },
|
||||
modules: { transport: [TCP] },
|
||||
},
|
||||
});
|
||||
|
||||
const multiAddrWithId = waku.getLocalMultiaddrWithID();
|
||||
|
|
109
src/lib/waku.ts
109
src/lib/waku.ts
|
@ -1,4 +1,4 @@
|
|||
import Libp2p, { Libp2pConfig, Libp2pModules, Libp2pOptions } from 'libp2p';
|
||||
import Libp2p, { Libp2pModules, Libp2pOptions } from 'libp2p';
|
||||
import Mplex from 'libp2p-mplex';
|
||||
import { bytes } from 'libp2p-noise/dist/src/@types/basic';
|
||||
import { Noise } from 'libp2p-noise/dist/src/noise';
|
||||
|
@ -11,16 +11,40 @@ import { WakuLightPush } from './waku_light_push';
|
|||
import { RelayCodec, WakuRelay } from './waku_relay';
|
||||
import { StoreCodec, WakuStore } from './waku_store';
|
||||
|
||||
const transportKey = Websockets.prototype[Symbol.toStringTag];
|
||||
const websocketsTransportKey = Websockets.prototype[Symbol.toStringTag];
|
||||
|
||||
export type CreateOptions =
|
||||
| {
|
||||
listenAddresses: string[] | undefined;
|
||||
staticNoiseKey: bytes | undefined;
|
||||
modules: Partial<Libp2pModules>;
|
||||
config: Partial<Libp2pConfig>;
|
||||
}
|
||||
| (Libp2pOptions & import('libp2p').CreateOptions);
|
||||
export interface CreateOptions {
|
||||
/**
|
||||
* The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}.
|
||||
*
|
||||
* One and only one pubsub topic is used by Waku. This is used by:
|
||||
* - WakuRelay to receive, route and send messages,
|
||||
* - WakuLightPush to send messages,
|
||||
* - WakuStore to retrieve messages.
|
||||
*
|
||||
* The usage of the default pubsub topic is recommended.
|
||||
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
|
||||
*
|
||||
* @default {@link DefaultPubsubTopic}
|
||||
*/
|
||||
pubsubTopic?: string;
|
||||
/**
|
||||
* You can pass options to the `Libp2p` instance used by {@link Waku} using the {@link CreateOptions.libp2p} property.
|
||||
* This property is the same type than the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
|
||||
* apart that we made the `modules` property optional and partial,
|
||||
* allowing its omission and letting Waku set good defaults.
|
||||
* Notes that some values are overridden by {@link Waku} to ensure it implements the Waku protocol.
|
||||
*/
|
||||
libp2p?: Omit<Libp2pOptions & import('libp2p').CreateOptions, 'modules'> & {
|
||||
modules?: Partial<Libp2pModules>;
|
||||
};
|
||||
/**
|
||||
* Byte array used as key for the noise protocol used for connection encryption
|
||||
* by [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
|
||||
* This is only used for test purposes to not run out of entropy during CI runs.
|
||||
*/
|
||||
staticNoiseKey?: bytes;
|
||||
}
|
||||
|
||||
export class Waku {
|
||||
public libp2p: Libp2p;
|
||||
|
@ -44,53 +68,56 @@ export class Waku {
|
|||
*
|
||||
* @param options Takes the same options than `Libp2p`.
|
||||
*/
|
||||
static async create(options: Partial<CreateOptions>): Promise<Waku> {
|
||||
const opts = Object.assign(
|
||||
{
|
||||
listenAddresses: [],
|
||||
staticNoiseKey: undefined,
|
||||
},
|
||||
options
|
||||
);
|
||||
static async create(options?: CreateOptions): Promise<Waku> {
|
||||
// Get an object in case options or libp2p are undefined
|
||||
const libp2pOpts = Object.assign({}, options?.libp2p);
|
||||
|
||||
opts.config = Object.assign(
|
||||
// Default for Websocket filter is `all`:
|
||||
// Returns all TCP and DNS based addresses, both with ws or wss.
|
||||
libp2pOpts.config = Object.assign(
|
||||
{
|
||||
transport: {
|
||||
[transportKey]: {
|
||||
[websocketsTransportKey]: {
|
||||
filter: filters.all,
|
||||
},
|
||||
},
|
||||
},
|
||||
options.config
|
||||
options?.libp2p?.config
|
||||
);
|
||||
|
||||
opts.modules = Object.assign({}, options.modules);
|
||||
|
||||
let transport = [Websockets];
|
||||
if (opts.modules?.transport) {
|
||||
transport = transport.concat(opts.modules?.transport);
|
||||
// Pass pubsub topic to relay
|
||||
if (options?.pubsubTopic) {
|
||||
libp2pOpts.config.pubsub = Object.assign(
|
||||
{ pubsubTopic: options.pubsubTopic },
|
||||
libp2pOpts.config.pubsub
|
||||
);
|
||||
}
|
||||
|
||||
// FIXME: By controlling the creation of libp2p we have to think about what
|
||||
// needs to be exposed and what does not. Ideally, we should be able to let
|
||||
// the user create the WakuStore, WakuRelay instances and pass them when
|
||||
// creating the libp2p instance.
|
||||
const libp2p = await Libp2p.create({
|
||||
addresses: {
|
||||
listen: opts.listenAddresses,
|
||||
libp2pOpts.modules = Object.assign({}, options?.libp2p?.modules);
|
||||
|
||||
// Default transport for libp2p is Websockets
|
||||
libp2pOpts.modules = Object.assign(
|
||||
{
|
||||
transport: [Websockets],
|
||||
},
|
||||
modules: {
|
||||
transport,
|
||||
options?.libp2p?.modules
|
||||
);
|
||||
|
||||
// streamMuxer, connection encryption and pubsub are overridden
|
||||
// as those are the only ones currently supported by Waku nodes.
|
||||
libp2pOpts.modules = Object.assign(libp2pOpts.modules, {
|
||||
streamMuxer: [Mplex],
|
||||
connEncryption: [new Noise(opts.staticNoiseKey)],
|
||||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||
// @ts-ignore: Type needs update
|
||||
connEncryption: [new Noise(options?.staticNoiseKey)],
|
||||
pubsub: WakuRelay,
|
||||
},
|
||||
config: opts.config,
|
||||
});
|
||||
|
||||
const wakuStore = new WakuStore(libp2p);
|
||||
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
|
||||
// @ts-ignore: modules property is correctly set thanks to voodoo
|
||||
const libp2p = await Libp2p.create(libp2pOpts);
|
||||
|
||||
const wakuStore = new WakuStore(libp2p, {
|
||||
pubsubTopic: options?.pubsubTopic,
|
||||
});
|
||||
const wakuLightPush = new WakuLightPush(libp2p);
|
||||
|
||||
await libp2p.start();
|
||||
|
|
|
@ -23,7 +23,47 @@ describe('Waku Light Push', () => {
|
|||
|
||||
waku = await Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
modules: { transport: [TCP] },
|
||||
libp2p: { modules: { transport: [TCP] } },
|
||||
});
|
||||
await waku.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
// Wait for identify protocol to finish
|
||||
await new Promise((resolve) => {
|
||||
waku.libp2p.peerStore.once('change:protocols', resolve);
|
||||
});
|
||||
|
||||
const nimPeerId = await nimWaku.getPeerId();
|
||||
|
||||
const messageText = 'Light Push works!';
|
||||
const message = WakuMessage.fromUtf8String(messageText);
|
||||
|
||||
const pushResponse = await waku.lightPush.push(nimPeerId, message);
|
||||
expect(pushResponse?.isSuccess).to.be.true;
|
||||
|
||||
let msgs: WakuMessage[] = [];
|
||||
|
||||
while (msgs.length === 0) {
|
||||
await delay(200);
|
||||
msgs = await nimWaku.messages();
|
||||
}
|
||||
|
||||
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
|
||||
expect(msgs[0].version).to.equal(message.version);
|
||||
expect(msgs[0].payloadAsUtf8).to.equal(messageText);
|
||||
});
|
||||
|
||||
it('Push on custom pubsub topic', async function () {
|
||||
this.timeout(5_000);
|
||||
|
||||
const customPubSubTopic = '/waku/2/custom-dapp/proto';
|
||||
|
||||
nimWaku = new NimWaku(makeLogFileName(this));
|
||||
await nimWaku.start({ lightpush: true, topics: customPubSubTopic });
|
||||
|
||||
waku = await Waku.create({
|
||||
pubsubTopic: customPubSubTopic,
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
libp2p: { modules: { transport: [TCP] } },
|
||||
});
|
||||
await waku.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
|
|
|
@ -13,16 +13,36 @@ import { PushRPC } from './push_rpc';
|
|||
export const LightPushCodec = '/vac/waku/lightpush/2.0.0-alpha1';
|
||||
export { PushResponse };
|
||||
|
||||
export interface CreateOptions {
|
||||
/**
|
||||
* The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}.
|
||||
*
|
||||
* The usage of the default pubsub topic is recommended.
|
||||
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
|
||||
*
|
||||
* @default {@link DefaultPubsubTopic}
|
||||
*/
|
||||
pubsubTopic?: string;
|
||||
}
|
||||
|
||||
/**
|
||||
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
|
||||
*/
|
||||
export class WakuLightPush {
|
||||
constructor(public libp2p: Libp2p) {}
|
||||
pubsubTopic: string;
|
||||
|
||||
constructor(public libp2p: Libp2p, options?: CreateOptions) {
|
||||
if (options?.pubsubTopic) {
|
||||
this.pubsubTopic = options.pubsubTopic;
|
||||
} else {
|
||||
this.pubsubTopic = DefaultPubsubTopic;
|
||||
}
|
||||
}
|
||||
|
||||
async push(
|
||||
peerId: PeerId,
|
||||
message: WakuMessage,
|
||||
pubsubTopic: string = DefaultPubsubTopic
|
||||
pubsubTopic: string = this.pubsubTopic
|
||||
): Promise<PushResponse | null> {
|
||||
const peer = this.libp2p.peerStore.get(peerId);
|
||||
if (!peer) throw 'Peer is unknown';
|
||||
|
|
|
@ -31,7 +31,7 @@ describe('Waku Relay', () => {
|
|||
Waku.create({ staticNoiseKey: NOISE_KEY_1 }),
|
||||
Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_2,
|
||||
listenAddresses: ['/ip4/0.0.0.0/tcp/0/wss'],
|
||||
libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } },
|
||||
}),
|
||||
]);
|
||||
|
||||
|
@ -142,6 +142,72 @@ describe('Waku Relay', () => {
|
|||
});
|
||||
});
|
||||
|
||||
describe('Custom pubsub topic', () => {
|
||||
it('Publish', async function () {
|
||||
this.timeout(10000);
|
||||
|
||||
const pubsubTopic = '/some/pubsub/topic';
|
||||
|
||||
// 1 and 2 uses a custom pubsub
|
||||
const [waku1, waku2, waku3] = await Promise.all([
|
||||
Waku.create({
|
||||
pubsubTopic,
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
}),
|
||||
Waku.create({
|
||||
pubsubTopic,
|
||||
staticNoiseKey: NOISE_KEY_2,
|
||||
libp2p: { addresses: { listen: ['/ip4/0.0.0.0/tcp/0/wss'] } },
|
||||
}),
|
||||
Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_2,
|
||||
}),
|
||||
]);
|
||||
|
||||
waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
|
||||
waku3.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
|
||||
|
||||
await Promise.all([
|
||||
new Promise((resolve) =>
|
||||
waku1.libp2p.pubsub.once('pubsub:subscription-change', () =>
|
||||
resolve(null)
|
||||
)
|
||||
),
|
||||
new Promise((resolve) =>
|
||||
waku2.libp2p.pubsub.once('pubsub:subscription-change', () =>
|
||||
resolve(null)
|
||||
)
|
||||
),
|
||||
// No subscription change expected for Waku 3
|
||||
]);
|
||||
|
||||
const messageText = 'Communicating using a custom pubsub topic';
|
||||
const message = WakuMessage.fromUtf8String(messageText);
|
||||
|
||||
const waku2ReceivedMsgPromise: Promise<WakuMessage> = new Promise(
|
||||
(resolve) => {
|
||||
waku2.relay.addObserver(resolve);
|
||||
}
|
||||
);
|
||||
|
||||
// The promise **fails** if we receive a message on the default
|
||||
// pubsub topic.
|
||||
const waku3NoMsgPromise: Promise<WakuMessage> = new Promise(
|
||||
(resolve, reject) => {
|
||||
waku3.relay.addObserver(reject);
|
||||
setTimeout(resolve, 1000);
|
||||
}
|
||||
);
|
||||
|
||||
await waku1.relay.send(message);
|
||||
|
||||
const waku2ReceivedMsg = await waku2ReceivedMsgPromise;
|
||||
await waku3NoMsgPromise;
|
||||
|
||||
expect(waku2ReceivedMsg.payloadAsUtf8).to.eq(messageText);
|
||||
});
|
||||
});
|
||||
|
||||
describe('Interop: Nim', function () {
|
||||
describe('Nim connects to js', function () {
|
||||
let waku: Waku;
|
||||
|
@ -153,8 +219,10 @@ describe('Waku Relay', () => {
|
|||
log('Create waku node');
|
||||
waku = await Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
listenAddresses: ['/ip4/0.0.0.0/tcp/0'],
|
||||
libp2p: {
|
||||
addresses: { listen: ['/ip4/0.0.0.0/tcp/0'] },
|
||||
modules: { transport: [TCP] },
|
||||
},
|
||||
});
|
||||
|
||||
const multiAddrWithId = waku.getLocalMultiaddrWithID();
|
||||
|
@ -231,7 +299,7 @@ describe('Waku Relay', () => {
|
|||
this.timeout(30_000);
|
||||
waku = await Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
modules: { transport: [TCP] },
|
||||
libp2p: { modules: { transport: [TCP] } },
|
||||
});
|
||||
|
||||
nimWaku = new NimWaku(this.test?.ctx?.currentTest?.title + '');
|
||||
|
@ -328,11 +396,11 @@ describe('Waku Relay', () => {
|
|||
[waku1, waku2] = await Promise.all([
|
||||
Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
modules: { transport: [TCP] },
|
||||
libp2p: { modules: { transport: [TCP] } },
|
||||
}),
|
||||
Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_2,
|
||||
modules: { transport: [TCP] },
|
||||
libp2p: { modules: { transport: [TCP] } },
|
||||
}),
|
||||
]);
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@ import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub';
|
|||
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
|
||||
import PeerId from 'peer-id';
|
||||
|
||||
import { CreateOptions } from '../waku';
|
||||
import { WakuMessage } from '../waku_message';
|
||||
|
||||
import * as constants from './constants';
|
||||
|
@ -26,9 +27,9 @@ import { RelayHeartbeat } from './relay_heartbeat';
|
|||
export { RelayCodec, DefaultPubsubTopic };
|
||||
|
||||
/**
|
||||
* See {GossipOptions} from libp2p-gossipsub
|
||||
* See constructor libp2p-gossipsub [API](https://github.com/ChainSafe/js-libp2p-gossipsub#api).
|
||||
*/
|
||||
interface GossipOptions {
|
||||
export interface GossipOptions {
|
||||
emitSelf: boolean;
|
||||
gossipIncoming: boolean;
|
||||
fallbackToFloodsub: boolean;
|
||||
|
@ -36,7 +37,8 @@ interface GossipOptions {
|
|||
doPX: boolean;
|
||||
msgIdFn: MessageIdFunction;
|
||||
messageCache: MessageCache;
|
||||
globalSignaturePolicy: string;
|
||||
// This option is always overridden
|
||||
// globalSignaturePolicy: string;
|
||||
scoreParams: Partial<PeerScoreParams>;
|
||||
scoreThresholds: Partial<PeerScoreThresholds>;
|
||||
directPeers: AddrInfo[];
|
||||
|
@ -57,6 +59,7 @@ interface GossipOptions {
|
|||
*/
|
||||
export class WakuRelay extends Gossipsub implements Pubsub {
|
||||
heartbeat: RelayHeartbeat;
|
||||
pubsubTopic: string;
|
||||
/**
|
||||
* observers called when receiving new message.
|
||||
* Observers under key "" are always called.
|
||||
|
@ -65,12 +68,10 @@ export class WakuRelay extends Gossipsub implements Pubsub {
|
|||
[contentTopic: string]: Array<(message: WakuMessage) => void>;
|
||||
};
|
||||
|
||||
/**
|
||||
*
|
||||
* @param {Libp2p} libp2p
|
||||
* @param {Partial<GossipOptions>} [options]
|
||||
*/
|
||||
constructor(libp2p: Libp2p, options?: Partial<GossipOptions>) {
|
||||
constructor(
|
||||
libp2p: Libp2p,
|
||||
options?: Partial<CreateOptions & GossipOptions>
|
||||
) {
|
||||
super(
|
||||
libp2p,
|
||||
Object.assign(options, {
|
||||
|
@ -85,6 +86,12 @@ export class WakuRelay extends Gossipsub implements Pubsub {
|
|||
const multicodecs = [constants.RelayCodec];
|
||||
|
||||
Object.assign(this, { multicodecs });
|
||||
|
||||
if (options?.pubsubTopic) {
|
||||
this.pubsubTopic = options.pubsubTopic;
|
||||
} else {
|
||||
this.pubsubTopic = constants.DefaultPubsubTopic;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -95,24 +102,8 @@ export class WakuRelay extends Gossipsub implements Pubsub {
|
|||
* @returns {void}
|
||||
*/
|
||||
public start(): void {
|
||||
this.on(constants.DefaultPubsubTopic, (event) => {
|
||||
const wakuMsg = WakuMessage.decode(event.data);
|
||||
if (this.observers['']) {
|
||||
this.observers[''].forEach((callbackFn) => {
|
||||
callbackFn(wakuMsg);
|
||||
});
|
||||
}
|
||||
if (wakuMsg.contentTopic) {
|
||||
if (this.observers[wakuMsg.contentTopic]) {
|
||||
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
|
||||
callbackFn(wakuMsg);
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
super.start();
|
||||
super.subscribe(constants.DefaultPubsubTopic);
|
||||
this.subscribe(this.pubsubTopic);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -123,7 +114,7 @@ export class WakuRelay extends Gossipsub implements Pubsub {
|
|||
*/
|
||||
public async send(message: WakuMessage): Promise<void> {
|
||||
const msg = message.encode();
|
||||
await super.publish(constants.DefaultPubsubTopic, Buffer.from(msg));
|
||||
await super.publish(this.pubsubTopic, Buffer.from(msg));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -157,7 +148,7 @@ export class WakuRelay extends Gossipsub implements Pubsub {
|
|||
* Return the relay peers we are connected to and we would publish a message to
|
||||
*/
|
||||
getPeers(): Set<string> {
|
||||
return getRelayPeers(this, DefaultPubsubTopic, this._options.D, (id) => {
|
||||
return getRelayPeers(this, this.pubsubTopic, this._options.D, (id) => {
|
||||
// Filter peers we would not publish to
|
||||
return (
|
||||
this.score.score(id) >= this._options.scoreThresholds.publishThreshold
|
||||
|
@ -165,12 +156,37 @@ export class WakuRelay extends Gossipsub implements Pubsub {
|
|||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
|
||||
*
|
||||
* @override
|
||||
*/
|
||||
subscribe(pubsubTopic: string): void {
|
||||
this.on(pubsubTopic, (event) => {
|
||||
const wakuMsg = WakuMessage.decode(event.data);
|
||||
if (this.observers['']) {
|
||||
this.observers[''].forEach((callbackFn) => {
|
||||
callbackFn(wakuMsg);
|
||||
});
|
||||
}
|
||||
if (wakuMsg.contentTopic) {
|
||||
if (this.observers[wakuMsg.contentTopic]) {
|
||||
this.observers[wakuMsg.contentTopic].forEach((callbackFn) => {
|
||||
callbackFn(wakuMsg);
|
||||
});
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
super.subscribe(pubsubTopic);
|
||||
}
|
||||
|
||||
/**
|
||||
* Join pubsub topic.
|
||||
* This is present to override the behavior of Gossipsub and should not
|
||||
* be used by API Consumers
|
||||
*
|
||||
* @ignore
|
||||
* @internal
|
||||
* @param {string} topic
|
||||
* @returns {void}
|
||||
* @override
|
||||
|
|
|
@ -30,7 +30,7 @@ describe('Waku Store', () => {
|
|||
|
||||
waku = await Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
modules: { transport: [TCP] },
|
||||
libp2p: { modules: { transport: [TCP] } },
|
||||
});
|
||||
await waku.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
|
@ -67,7 +67,7 @@ describe('Waku Store', () => {
|
|||
|
||||
waku = await Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
modules: { transport: [TCP] },
|
||||
libp2p: { modules: { transport: [TCP] } },
|
||||
});
|
||||
await waku.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
|
@ -93,4 +93,46 @@ describe('Waku Store', () => {
|
|||
).to.eq(index);
|
||||
}
|
||||
});
|
||||
|
||||
it('Retrieves history using custom pubsub topic', async function () {
|
||||
this.timeout(5_000);
|
||||
|
||||
const customPubSubTopic = '/waku/2/custom-dapp/proto';
|
||||
nimWaku = new NimWaku(makeLogFileName(this));
|
||||
await nimWaku.start({ persistMessages: true, topics: customPubSubTopic });
|
||||
|
||||
for (let i = 0; i < 2; i++) {
|
||||
expect(
|
||||
await nimWaku.sendMessage(
|
||||
WakuMessage.fromUtf8String(`Message ${i}`),
|
||||
customPubSubTopic
|
||||
)
|
||||
).to.be.true;
|
||||
}
|
||||
|
||||
waku = await Waku.create({
|
||||
pubsubTopic: customPubSubTopic,
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
libp2p: { modules: { transport: [TCP] } },
|
||||
});
|
||||
await waku.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
// Wait for identify protocol to finish
|
||||
await new Promise((resolve) => {
|
||||
waku.libp2p.peerStore.once('change:protocols', resolve);
|
||||
});
|
||||
|
||||
const nimPeerId = await nimWaku.getPeerId();
|
||||
|
||||
const messages = await waku.store.queryHistory({
|
||||
peerId: nimPeerId,
|
||||
contentTopics: [],
|
||||
});
|
||||
|
||||
expect(messages?.length).eq(2);
|
||||
const result = messages?.findIndex((msg) => {
|
||||
return msg.payloadAsUtf8 === 'Message 0';
|
||||
});
|
||||
expect(result).to.not.eq(-1);
|
||||
});
|
||||
});
|
||||
|
|
|
@ -13,7 +13,19 @@ export const StoreCodec = '/vac/waku/store/2.0.0-beta3';
|
|||
|
||||
export { Direction };
|
||||
|
||||
export interface Options {
|
||||
export interface CreateOptions {
|
||||
/**
|
||||
* The PubSub Topic to use. Defaults to {@link DefaultPubsubTopic}.
|
||||
*
|
||||
* The usage of the default pubsub topic is recommended.
|
||||
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
|
||||
*
|
||||
* @default {@link DefaultPubsubTopic}
|
||||
*/
|
||||
pubsubTopic?: string;
|
||||
}
|
||||
|
||||
export interface QueryOptions {
|
||||
peerId: PeerId;
|
||||
contentTopics: string[];
|
||||
pubsubTopic?: string;
|
||||
|
@ -26,24 +38,32 @@ export interface Options {
|
|||
* Implements the [Waku v2 Store protocol](https://rfc.vac.dev/spec/13/).
|
||||
*/
|
||||
export class WakuStore {
|
||||
constructor(public libp2p: Libp2p) {}
|
||||
pubsubTopic: string;
|
||||
|
||||
constructor(public libp2p: Libp2p, options?: CreateOptions) {
|
||||
if (options?.pubsubTopic) {
|
||||
this.pubsubTopic = options.pubsubTopic;
|
||||
} else {
|
||||
this.pubsubTopic = DefaultPubsubTopic;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Query given peer using Waku Store.
|
||||
*
|
||||
* @param options
|
||||
* @param options.peerId The peer to query.
|
||||
* @param options.contentTopics The content topics to retrieve, leave empty to
|
||||
* @param options.peerId The peer to query.Options
|
||||
* @param options.contentTopics The content topics to pass to the query, leave empty to
|
||||
* retrieve all messages.
|
||||
* @param options.pubsubTopic The pubsub topic to retrieve. Currently, all waku nodes
|
||||
* use the same pubsub topic. This is reserved for future applications.
|
||||
* @param options.pubsubTopic The pubsub topic to pass to the query. Defaults
|
||||
* to the value set at creation. See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/).
|
||||
* @param options.callback Callback called on page of stored messages as they are retrieved
|
||||
* @throws If not able to reach the peer to query.
|
||||
*/
|
||||
async queryHistory(options: Options): Promise<WakuMessage[] | null> {
|
||||
async queryHistory(options: QueryOptions): Promise<WakuMessage[] | null> {
|
||||
const opts = Object.assign(
|
||||
{
|
||||
pubsubTopic: DefaultPubsubTopic,
|
||||
pubsubTopic: this.pubsubTopic,
|
||||
direction: Direction.BACKWARD,
|
||||
pageSize: 10,
|
||||
},
|
||||
|
|
|
@ -40,6 +40,7 @@ export interface Args {
|
|||
logLevel?: LogLevel;
|
||||
persistMessages?: boolean;
|
||||
lightpush?: boolean;
|
||||
topics?: string;
|
||||
}
|
||||
|
||||
export enum LogLevel {
|
||||
|
@ -155,7 +156,10 @@ export class NimWaku {
|
|||
return this.rpcCall<RpcInfoResponse>('get_waku_v2_debug_v1_info', []);
|
||||
}
|
||||
|
||||
async sendMessage(message: WakuMessage): Promise<boolean> {
|
||||
async sendMessage(
|
||||
message: WakuMessage,
|
||||
pubsubTopic?: string
|
||||
): Promise<boolean> {
|
||||
this.checkProcess();
|
||||
|
||||
if (!message.payload) {
|
||||
|
@ -168,7 +172,7 @@ export class NimWaku {
|
|||
};
|
||||
|
||||
return this.rpcCall<boolean>('post_waku_v2_relay_v1_message', [
|
||||
DefaultPubsubTopic,
|
||||
pubsubTopic ? pubsubTopic : DefaultPubsubTopic,
|
||||
rpcMessage,
|
||||
]);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue