Subscribe to default topic when starting, Remove dual Waku relay classes

This commit is contained in:
Franck Royer 2021-04-16 11:25:08 +10:00
parent bc1ba3f4e4
commit e0debac165
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
5 changed files with 44 additions and 30 deletions

View File

@ -85,7 +85,7 @@ const ChatContentTopic = 'dingpu';
const chatMessage = new ChatMessage(new Date(), nick, line); const chatMessage = new ChatMessage(new Date(), nick, line);
const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic); const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic);
await waku.relay.publish(msg); await waku.relay.send(msg);
} }
})(); })();

View File

@ -6,7 +6,7 @@ import Websockets from 'libp2p-websockets';
import Multiaddr from 'multiaddr'; import Multiaddr from 'multiaddr';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay'; import { RelayCodec, WakuRelay } from './waku_relay';
import { StoreCodec, WakuStore } from './waku_store'; import { StoreCodec, WakuStore } from './waku_store';
export interface CreateOptions { export interface CreateOptions {
@ -21,11 +21,15 @@ export interface CreateOptions {
} }
export default class Waku { export default class Waku {
private constructor( public libp2p: Libp2p;
public libp2p: Libp2p, public relay: WakuRelay;
public relay: WakuRelay, public store: WakuStore;
public store: WakuStore
) {} private constructor(libp2p: Libp2p, store: WakuStore) {
this.libp2p = libp2p;
this.relay = (libp2p.pubsub as unknown) as WakuRelay;
this.store = store;
}
/** /**
* Create new waku node * Create new waku node
@ -64,7 +68,7 @@ export default class Waku {
connEncryption: [new Noise(opts.staticNoiseKey)], connEncryption: [new Noise(opts.staticNoiseKey)],
// eslint-disable-next-line @typescript-eslint/ban-ts-comment // eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: Type needs update // @ts-ignore: Type needs update
pubsub: WakuRelayPubsub, pubsub: WakuRelay,
}, },
}); });
@ -72,7 +76,7 @@ export default class Waku {
await libp2p.start(); await libp2p.start();
return new Waku(libp2p, new WakuRelay(libp2p.pubsub), wakuStore); return new Waku(libp2p, wakuStore);
} }
/** /**

View File

@ -1,6 +1,7 @@
import Gossipsub from 'libp2p-gossipsub';
import { shuffle } from 'libp2p-gossipsub/src/utils'; import { shuffle } from 'libp2p-gossipsub/src/utils';
import { RelayCodec, WakuRelayPubsub } from './index'; import { RelayCodec } from './index';
/** /**
* Given a topic, returns up to count peers subscribed to that topic * Given a topic, returns up to count peers subscribed to that topic
@ -14,7 +15,7 @@ import { RelayCodec, WakuRelayPubsub } from './index';
* *
*/ */
export function getRelayPeers( export function getRelayPeers(
router: WakuRelayPubsub, router: Gossipsub,
topic: string, topic: string,
count: number, count: number,
filter: (id: string) => boolean = () => true filter: (id: string) => boolean = () => true

View File

@ -76,7 +76,7 @@ describe('Waku Relay', () => {
const receivedPromise = waitForNextData(waku2.libp2p.pubsub); const receivedPromise = waitForNextData(waku2.libp2p.pubsub);
await waku1.relay.publish(message); await waku1.relay.send(message);
const receivedMsg = await receivedPromise; const receivedMsg = await receivedPromise;
@ -128,7 +128,7 @@ describe('Waku Relay', () => {
const message = WakuMessage.fromUtf8String('This is a message'); const message = WakuMessage.fromUtf8String('This is a message');
await waku.relay.publish(message); await waku.relay.send(message);
let msgs = []; let msgs = [];
@ -208,7 +208,7 @@ describe('Waku Relay', () => {
const message = WakuMessage.fromUtf8String('This is a message'); const message = WakuMessage.fromUtf8String('This is a message');
await waku.relay.publish(message); await waku.relay.send(message);
let msgs = []; let msgs = [];
@ -304,7 +304,7 @@ describe('Waku Relay', () => {
const waku2ReceivedPromise = waitForNextData(waku2.libp2p.pubsub); const waku2ReceivedPromise = waitForNextData(waku2.libp2p.pubsub);
await waku1.relay.publish(message); await waku1.relay.send(message);
const waku2ReceivedMsg = await waku2ReceivedPromise; const waku2ReceivedMsg = await waku2ReceivedPromise;

View File

@ -6,7 +6,7 @@ import {
messageIdToString, messageIdToString,
shuffle, shuffle,
} from 'libp2p-gossipsub/src/utils'; } from 'libp2p-gossipsub/src/utils';
import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; import { InMessage } from 'libp2p-interfaces/src/pubsub';
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
import PeerId from 'peer-id'; import PeerId from 'peer-id';
@ -19,8 +19,7 @@ import { RelayHeartbeat } from './relay_heartbeat';
export * from './constants'; export * from './constants';
export * from './relay_heartbeat'; export * from './relay_heartbeat';
// This is the class to pass to libp2p as pubsub protocol export class WakuRelay extends Gossipsub {
export class WakuRelayPubsub extends Gossipsub {
heartbeat: RelayHeartbeat; heartbeat: RelayHeartbeat;
/** /**
@ -43,6 +42,28 @@ export class WakuRelayPubsub extends Gossipsub {
Object.assign(this, { multicodecs }); Object.assign(this, { multicodecs });
} }
/**
* Mounts the gossipsub protocol onto the libp2p node
* and subscribes to the default topic
* @override
* @returns {void}
*/
start() {
super.start();
super.subscribe(constants.RelayDefaultTopic);
}
/**
* Send Waku messages under default topic
* @override
* @param {WakuMessage} message
* @returns {Promise<void>}
*/
async send(message: WakuMessage) {
const msg = message.toBinary();
await super.publish(constants.RelayDefaultTopic, Buffer.from(msg));
}
/** /**
* Join topic * Join topic
* @param {string} topic * @param {string} topic
@ -291,15 +312,3 @@ export class WakuRelayPubsub extends Gossipsub {
}; };
} }
} }
// This class provides an interface to execute the waku relay protocol
export class WakuRelay {
constructor(private pubsub: Pubsub) {
this.pubsub.subscribe(constants.RelayDefaultTopic);
}
async publish(message: WakuMessage) {
const msg = message.toBinary();
await this.pubsub.publish(constants.RelayDefaultTopic, msg);
}
}