Use waku relay interface to subscribe and publish messages

This commit is contained in:
Franck Royer 2021-03-10 15:15:36 +11:00
parent 12c2a93c7f
commit 070847d2c0
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
3 changed files with 36 additions and 12 deletions

View File

@ -1,34 +1,36 @@
import { TextDecoder, TextEncoder } from 'util'; import { TextDecoder } from 'util';
import test from 'ava'; import test from 'ava';
import Pubsub from 'libp2p-interfaces/src/pubsub'; import Pubsub from 'libp2p-interfaces/src/pubsub';
import { createNode } from './node'; import { createNode } from './node';
import { CODEC } from './waku_relay'; import { CODEC, TOPIC, WakuRelay, WakuRelayPubsub } from './waku_relay';
function delay(ms: number) { function delay(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms)); return new Promise((resolve) => setTimeout(resolve, ms));
} }
test('Can publish message', async (t) => { test('Can publish message', async (t) => {
const topic = 'news';
const message = 'Bird bird bird, bird is the word!'; const message = 'Bird bird bird, bird is the word!';
const [node1, node2] = await Promise.all([createNode(), createNode()]); const [node1, node2] = await Promise.all([createNode(), createNode()]);
const wakuRelayNode1 = new WakuRelay(node1.pubsub as WakuRelayPubsub);
const wakuRelayNode2 = new WakuRelay(node2.pubsub as WakuRelayPubsub);
// Add node's 2 data to the PeerStore // Add node's 2 data to the PeerStore
node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs); node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs);
await node1.dial(node2.peerId); await node1.dial(node2.peerId);
await node1.pubsub.subscribe(topic);
await node2.pubsub.subscribe(topic); await wakuRelayNode1.subscribe();
await wakuRelayNode2.subscribe();
// Setup the promise before publishing to ensure the event is not missed // Setup the promise before publishing to ensure the event is not missed
// TODO: Is it possible to import `Message` type? // TODO: Is it possible to import `Message` type?
const promise = waitForNextData(node1.pubsub, topic); const promise = waitForNextData(node1.pubsub);
await delay(500); await delay(500);
await node2.pubsub.publish(topic, new TextEncoder().encode(message)); await wakuRelayNode2.publish(message);
const node1Received = await promise; const node1Received = await promise;
@ -43,9 +45,9 @@ test('Register waku relay protocol', async (t) => {
t.truthy(protocols.findIndex((value) => value == CODEC)); t.truthy(protocols.findIndex((value) => value == CODEC));
}); });
function waitForNextData(pubsub: Pubsub, topic: string) { function waitForNextData(pubsub: Pubsub) {
return new Promise((resolve) => { return new Promise((resolve) => {
pubsub.once(topic, resolve); pubsub.once(TOPIC, resolve);
}).then((msg: any) => { }).then((msg: any) => {
return new TextDecoder().decode(msg.data); return new TextDecoder().decode(msg.data);
}); });

View File

@ -3,7 +3,7 @@ import Mplex from 'libp2p-mplex';
import { NOISE } from 'libp2p-noise'; import { NOISE } from 'libp2p-noise';
import TCP from 'libp2p-tcp'; import TCP from 'libp2p-tcp';
import { WakuRelay } from './waku_relay'; import { WakuRelayPubsub } from './waku_relay';
export async function createNode() { export async function createNode() {
const node = await Libp2p.create({ const node = await Libp2p.create({
@ -16,7 +16,7 @@ export async function createNode() {
connEncryption: [NOISE], connEncryption: [NOISE],
// eslint-disable-next-line @typescript-eslint/ban-ts-comment // eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: Type needs update // @ts-ignore: Type needs update
pubsub: WakuRelay, pubsub: WakuRelayPubsub,
}, },
config: { config: {
pubsub: { pubsub: {

View File

@ -1,9 +1,16 @@
import { TextEncoder } from 'util';
import Libp2p from 'libp2p'; import Libp2p from 'libp2p';
import Gossipsub from 'libp2p-gossipsub'; import Gossipsub from 'libp2p-gossipsub';
export const CODEC = '/vac/waku/relay/2.0.0-beta2'; export const CODEC = '/vac/waku/relay/2.0.0-beta2';
export class WakuRelay extends Gossipsub { // // As per waku specs, the topic is fixed
// // TODO: Double check the topic is correct (taken from nim-waku logs)
export const TOPIC = '/waku/2/default-waku/proto';
// This is the class to pass to libp2p as pubsub protocol
export class WakuRelayPubsub extends Gossipsub {
constructor(libp2p: Libp2p) { constructor(libp2p: Libp2p) {
super(libp2p); super(libp2p);
@ -13,3 +20,18 @@ export class WakuRelay extends Gossipsub {
Object.assign(this, { multicodecs }); Object.assign(this, { multicodecs });
} }
} }
// This class provides an interface to execute the waku relay protocol
export class WakuRelay {
constructor(private pubsub: WakuRelayPubsub) {}
// At this stage we are always using the same topic so we do not pass it as a parameter
async subscribe() {
await this.pubsub.subscribe(TOPIC);
}
async publish(message: string) {
const msg = new TextEncoder().encode(message);
await this.pubsub.publish(TOPIC, msg);
}
}