diff --git a/src/chat/index.ts b/src/chat/index.ts index 4d9ddaf9bb..dc1e74f1ab 100644 --- a/src/chat/index.ts +++ b/src/chat/index.ts @@ -85,7 +85,7 @@ const ChatContentTopic = 'dingpu'; const chatMessage = new ChatMessage(new Date(), nick, line); const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic); - await waku.relay.publish(msg); + await waku.relay.send(msg); } })(); diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 5040743932..0645e60ae2 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -6,7 +6,7 @@ import Websockets from 'libp2p-websockets'; import Multiaddr from 'multiaddr'; import PeerId from 'peer-id'; -import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay'; +import { RelayCodec, WakuRelay } from './waku_relay'; import { StoreCodec, WakuStore } from './waku_store'; export interface CreateOptions { @@ -21,11 +21,15 @@ export interface CreateOptions { } export default class Waku { - private constructor( - public libp2p: Libp2p, - public relay: WakuRelay, - public store: WakuStore - ) {} + public libp2p: Libp2p; + public relay: WakuRelay; + public store: WakuStore; + + private constructor(libp2p: Libp2p, store: WakuStore) { + this.libp2p = libp2p; + this.relay = (libp2p.pubsub as unknown) as WakuRelay; + this.store = store; + } /** * Create new waku node @@ -64,7 +68,7 @@ export default class Waku { connEncryption: [new Noise(opts.staticNoiseKey)], // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore: Type needs update - pubsub: WakuRelayPubsub, + pubsub: WakuRelay, }, }); @@ -72,7 +76,7 @@ export default class Waku { await libp2p.start(); - return new Waku(libp2p, new WakuRelay(libp2p.pubsub), wakuStore); + return new Waku(libp2p, wakuStore); } /** diff --git a/src/lib/waku_relay/get_relay_peers.ts b/src/lib/waku_relay/get_relay_peers.ts index e31a5280f3..2331da24f5 100644 --- a/src/lib/waku_relay/get_relay_peers.ts +++ b/src/lib/waku_relay/get_relay_peers.ts @@ -1,6 +1,7 @@ +import Gossipsub from 'libp2p-gossipsub'; import { shuffle } from 'libp2p-gossipsub/src/utils'; -import { RelayCodec, WakuRelayPubsub } from './index'; +import { RelayCodec } from './index'; /** * Given a topic, returns up to count peers subscribed to that topic @@ -14,7 +15,7 @@ import { RelayCodec, WakuRelayPubsub } from './index'; * */ export function getRelayPeers( - router: WakuRelayPubsub, + router: Gossipsub, topic: string, count: number, filter: (id: string) => boolean = () => true diff --git a/src/lib/waku_relay/index.spec.ts b/src/lib/waku_relay/index.spec.ts index d7f7065de7..06d7c52e5d 100644 --- a/src/lib/waku_relay/index.spec.ts +++ b/src/lib/waku_relay/index.spec.ts @@ -76,7 +76,7 @@ describe('Waku Relay', () => { const receivedPromise = waitForNextData(waku2.libp2p.pubsub); - await waku1.relay.publish(message); + await waku1.relay.send(message); const receivedMsg = await receivedPromise; @@ -128,7 +128,7 @@ describe('Waku Relay', () => { const message = WakuMessage.fromUtf8String('This is a message'); - await waku.relay.publish(message); + await waku.relay.send(message); let msgs = []; @@ -208,7 +208,7 @@ describe('Waku Relay', () => { const message = WakuMessage.fromUtf8String('This is a message'); - await waku.relay.publish(message); + await waku.relay.send(message); let msgs = []; @@ -304,7 +304,7 @@ describe('Waku Relay', () => { const waku2ReceivedPromise = waitForNextData(waku2.libp2p.pubsub); - await waku1.relay.publish(message); + await waku1.relay.send(message); const waku2ReceivedMsg = await waku2ReceivedPromise; diff --git a/src/lib/waku_relay/index.ts b/src/lib/waku_relay/index.ts index 58362b18bc..487ca819bd 100644 --- a/src/lib/waku_relay/index.ts +++ b/src/lib/waku_relay/index.ts @@ -6,7 +6,7 @@ import { messageIdToString, shuffle, } from 'libp2p-gossipsub/src/utils'; -import Pubsub, { InMessage } from 'libp2p-interfaces/src/pubsub'; +import { InMessage } from 'libp2p-interfaces/src/pubsub'; import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy'; import PeerId from 'peer-id'; @@ -19,8 +19,7 @@ import { RelayHeartbeat } from './relay_heartbeat'; export * from './constants'; export * from './relay_heartbeat'; -// This is the class to pass to libp2p as pubsub protocol -export class WakuRelayPubsub extends Gossipsub { +export class WakuRelay extends Gossipsub { heartbeat: RelayHeartbeat; /** @@ -43,6 +42,28 @@ export class WakuRelayPubsub extends Gossipsub { Object.assign(this, { multicodecs }); } + /** + * Mounts the gossipsub protocol onto the libp2p node + * and subscribes to the default topic + * @override + * @returns {void} + */ + start() { + super.start(); + super.subscribe(constants.RelayDefaultTopic); + } + + /** + * Send Waku messages under default topic + * @override + * @param {WakuMessage} message + * @returns {Promise} + */ + async send(message: WakuMessage) { + const msg = message.toBinary(); + await super.publish(constants.RelayDefaultTopic, Buffer.from(msg)); + } + /** * Join topic * @param {string} topic @@ -291,15 +312,3 @@ export class WakuRelayPubsub extends Gossipsub { }; } } - -// This class provides an interface to execute the waku relay protocol -export class WakuRelay { - constructor(private pubsub: Pubsub) { - this.pubsub.subscribe(constants.RelayDefaultTopic); - } - - async publish(message: WakuMessage) { - const msg = message.toBinary(); - await this.pubsub.publish(constants.RelayDefaultTopic, msg); - } -}