From 070847d2c09fcede8f2698ce4b17fa3525dd9013 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 10 Mar 2021 15:15:36 +1100 Subject: [PATCH] Use waku relay interface to subscribe and publish messages --- src/lib/node.spec.ts | 20 +++++++++++--------- src/lib/node.ts | 4 ++-- src/lib/waku_relay.ts | 24 +++++++++++++++++++++++- 3 files changed, 36 insertions(+), 12 deletions(-) diff --git a/src/lib/node.spec.ts b/src/lib/node.spec.ts index e789a18081..1fabbf0652 100644 --- a/src/lib/node.spec.ts +++ b/src/lib/node.spec.ts @@ -1,34 +1,36 @@ -import { TextDecoder, TextEncoder } from 'util'; +import { TextDecoder } from 'util'; import test from 'ava'; import Pubsub from 'libp2p-interfaces/src/pubsub'; import { createNode } from './node'; -import { CODEC } from './waku_relay'; +import { CODEC, TOPIC, WakuRelay, WakuRelayPubsub } from './waku_relay'; function delay(ms: number) { return new Promise((resolve) => setTimeout(resolve, ms)); } test('Can publish message', async (t) => { - const topic = 'news'; const message = 'Bird bird bird, bird is the word!'; const [node1, node2] = await Promise.all([createNode(), createNode()]); + const wakuRelayNode1 = new WakuRelay(node1.pubsub as WakuRelayPubsub); + const wakuRelayNode2 = new WakuRelay(node2.pubsub as WakuRelayPubsub); // Add node's 2 data to the PeerStore node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs); await node1.dial(node2.peerId); - await node1.pubsub.subscribe(topic); - await node2.pubsub.subscribe(topic); + + await wakuRelayNode1.subscribe(); + await wakuRelayNode2.subscribe(); // Setup the promise before publishing to ensure the event is not missed // TODO: Is it possible to import `Message` type? - const promise = waitForNextData(node1.pubsub, topic); + const promise = waitForNextData(node1.pubsub); await delay(500); - await node2.pubsub.publish(topic, new TextEncoder().encode(message)); + await wakuRelayNode2.publish(message); const node1Received = await promise; @@ -43,9 +45,9 @@ test('Register waku relay protocol', async (t) => { t.truthy(protocols.findIndex((value) => value == CODEC)); }); -function waitForNextData(pubsub: Pubsub, topic: string) { +function waitForNextData(pubsub: Pubsub) { return new Promise((resolve) => { - pubsub.once(topic, resolve); + pubsub.once(TOPIC, resolve); }).then((msg: any) => { return new TextDecoder().decode(msg.data); }); diff --git a/src/lib/node.ts b/src/lib/node.ts index 5b26020f95..b7bfdc0daa 100644 --- a/src/lib/node.ts +++ b/src/lib/node.ts @@ -3,7 +3,7 @@ import Mplex from 'libp2p-mplex'; import { NOISE } from 'libp2p-noise'; import TCP from 'libp2p-tcp'; -import { WakuRelay } from './waku_relay'; +import { WakuRelayPubsub } from './waku_relay'; export async function createNode() { const node = await Libp2p.create({ @@ -16,7 +16,7 @@ export async function createNode() { connEncryption: [NOISE], // eslint-disable-next-line @typescript-eslint/ban-ts-comment // @ts-ignore: Type needs update - pubsub: WakuRelay, + pubsub: WakuRelayPubsub, }, config: { pubsub: { diff --git a/src/lib/waku_relay.ts b/src/lib/waku_relay.ts index 885c67ba63..7c1714a69c 100644 --- a/src/lib/waku_relay.ts +++ b/src/lib/waku_relay.ts @@ -1,9 +1,16 @@ +import { TextEncoder } from 'util'; + import Libp2p from 'libp2p'; import Gossipsub from 'libp2p-gossipsub'; export const CODEC = '/vac/waku/relay/2.0.0-beta2'; -export class WakuRelay extends Gossipsub { +// // As per waku specs, the topic is fixed +// // TODO: Double check the topic is correct (taken from nim-waku logs) +export const TOPIC = '/waku/2/default-waku/proto'; + +// This is the class to pass to libp2p as pubsub protocol +export class WakuRelayPubsub extends Gossipsub { constructor(libp2p: Libp2p) { super(libp2p); @@ -13,3 +20,18 @@ export class WakuRelay extends Gossipsub { Object.assign(this, { multicodecs }); } } + +// This class provides an interface to execute the waku relay protocol +export class WakuRelay { + constructor(private pubsub: WakuRelayPubsub) {} + + // At this stage we are always using the same topic so we do not pass it as a parameter + async subscribe() { + await this.pubsub.subscribe(TOPIC); + } + + async publish(message: string) { + const msg = new TextEncoder().encode(message); + await this.pubsub.publish(TOPIC, msg); + } +}