9: chat example r=D4nte a=D4nte

Resolves #15

Co-authored-by: Franck Royer <franck@royer.one>
This commit is contained in:
bors[bot] 2021-04-01 02:57:41 +00:00 committed by GitHub
commit 62b27fddd6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 458 additions and 57 deletions

View File

@ -2,7 +2,24 @@
A JavaScript implementation of the [Waku v2 protocol](https://specs.vac.dev/specs/waku/v2/waku-v2).
**This repo is a Work In Progress**
## This is a Work In Progress
You can track progress on the [project board](https://github.com/status-im/js-waku/projects/1).
## Examples
## Chat app
A node chat app is provided as a working example of the library.
It is interoperable with the [nim-waku chat app example](https://github.com/status-im/nim-waku/blob/master/examples/v2/chat2.nim).
To run the chat app:
```shell
npm install
npm run chat:app -- --staticNode /ip4/134.209.139.210/tcp/30303/p2p/16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ --listenAddr /ip4/0.0.0.0/tcp/55123
```
The `--listenAddr` parameter is optional, however [NAT passthrough](https://github.com/status-im/js-waku/issues/12) is not yet supported, so you'll need the listening port to be open to receive messages from the fleet.
## Contributing

View File

@ -3,4 +3,4 @@ version: v1beta1
plugins:
- name: ts_proto
out: ./src/proto
opt: grpc_js
opt: grpc_js,esModuleInterop=true

View File

@ -1,5 +1,5 @@
{
"name": "waku-js-chat",
"name": "js-waku",
"version": "1.0.0",
"description": "A chat application running on node and waku",
"main": "build/main/index.js",
@ -19,6 +19,7 @@
"pretest": "run-s pretest:*",
"pretest:1-init-git-submodules": "[ -f './nim-waku/build/wakunode2' ] || git submodule update --init --recursive",
"pretest:2-build-nim-waku": "cd nim-waku; [ -f './build/wakunode2' ] || make -j$(nproc --all 2>/dev/null || echo 2) wakunode2",
"chat:start": "ts-node src/chat/index.ts",
"test": "run-s build test:*",
"test:lint": "eslint src --ext .ts",
"test:prettier": "prettier \"src/**/*.ts\" --list-different",

View File

@ -0,0 +1,9 @@
syntax = "proto3";
package chat.v2;
message ChatMessageProto {
uint64 timestamp = 1;
string nick = 2;
bytes payload = 3;
}

View File

@ -2,7 +2,7 @@ syntax = "proto3";
package waku.v2;
message WakuMessage {
message WakuMessageProto {
optional bytes payload = 1;
optional uint32 content_topic = 2;
optional uint32 version = 3;

View File

@ -0,0 +1,26 @@
import { expect } from 'chai';
import fc from 'fast-check';
import { ChatMessage } from './chat_message';
describe('Chat Message', function () {
it('Chat message round trip binary serialization', function () {
fc.assert(
fc.property(
fc.date({ min: new Date(0) }),
fc.string(),
fc.string(),
(timestamp, nick, message) => {
const msg = new ChatMessage(timestamp, nick, message);
const buf = msg.encode();
const actual = ChatMessage.decode(buf);
// Date.toString does not include ms, as we loose this precision by design
expect(actual.timestamp.toString()).to.eq(timestamp.toString());
expect(actual.nick).to.eq(nick);
expect(actual.message).to.eq(message);
}
)
);
});
});

35
src/chat/chat_message.ts Normal file
View File

@ -0,0 +1,35 @@
import { Reader } from 'protobufjs/minimal';
import { ChatMessageProto } from '../proto/chat/v2/chat_message';
export class ChatMessage {
public constructor(
public timestamp: Date,
public nick: string,
public message: string
) {}
static decode(bytes: Uint8Array): ChatMessage {
const protoMsg = ChatMessageProto.decode(Reader.create(bytes));
const timestamp = new Date(protoMsg.timestamp * 1000);
const message = protoMsg.payload
? Array.from(protoMsg.payload)
.map((char) => {
return String.fromCharCode(char);
})
.join('')
: '';
return new ChatMessage(timestamp, protoMsg.nick, message);
}
encode(): Uint8Array {
const timestamp = Math.floor(this.timestamp.valueOf() / 1000);
const payload = Buffer.from(this.message, 'utf-8');
return ChatMessageProto.encode({
timestamp,
nick: this.nick,
payload,
}).finish();
}
}

108
src/chat/index.ts Normal file
View File

@ -0,0 +1,108 @@
import readline from 'readline';
import util from 'util';
import Waku from '../lib/waku';
import { WakuMessage } from '../lib/waku_message';
import { TOPIC } from '../lib/waku_relay';
import { delay } from '../test_utils/delay';
import { ChatMessage } from './chat_message';
(async function () {
const opts = processArguments();
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout,
});
const question = util.promisify(rl.question).bind(rl);
// Looks like wrong type definition of promisify is picked.
// May be related to https://github.com/DefinitelyTyped/DefinitelyTyped/issues/20497
const nick = ((await question(
'Please choose a nickname: '
)) as unknown) as string;
console.log(`Hi ${nick}!`);
const waku = await Waku.create({ listenAddresses: [opts.listenAddr] });
// TODO: Bubble event to waku, infer topic, decode msg
// Tracked with https://github.com/status-im/js-waku/issues/19
waku.libp2p.pubsub.on(TOPIC, (event) => {
const wakuMsg = WakuMessage.decode(event.data);
if (wakuMsg.payload) {
const chatMsg = ChatMessage.decode(wakuMsg.payload);
const timestamp = chatMsg.timestamp.toLocaleString([], {
month: 'short',
day: 'numeric',
hour: 'numeric',
minute: '2-digit',
hour12: false,
});
console.log(`<${timestamp}> ${chatMsg.nick}: ${chatMsg.message}`);
}
});
console.log('Waku started');
if (opts.staticNode) {
console.log(`dialing ${opts.staticNode}`);
await waku.dial(opts.staticNode);
}
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
// TODO: identify if it is possible to listen to an event to confirm dial
// finished instead of an arbitrary delay. Tracked with
// https://github.com/status-im/js-waku/issues/18
await delay(2000);
// TODO: Automatically subscribe, tracked with
// https://github.com/status-im/js-waku/issues/17
await waku.relay.subscribe();
console.log('Subscribed to waku relay');
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
console.log('Ready to chat!');
rl.prompt();
for await (const line of rl) {
rl.prompt();
const chatMessage = new ChatMessage(new Date(), nick, line);
const msg = WakuMessage.fromBytes(chatMessage.encode());
await waku.relay.publish(msg);
}
})();
interface Options {
staticNode?: string;
listenAddr: string;
}
function processArguments(): Options {
const passedArgs = process.argv.slice(2);
let opts: Options = { listenAddr: '/ip4/0.0.0.0/tcp/0' };
while (passedArgs.length) {
const arg = passedArgs.shift();
switch (arg) {
case '--staticNode':
opts = Object.assign(opts, { staticNode: passedArgs.shift() });
break;
case '--listenAddr':
opts = Object.assign(opts, { listenAddr: passedArgs.shift() });
break;
default:
console.log(`Unsupported argument: ${arg}`);
process.exit(1);
}
}
return opts;
}

View File

@ -11,7 +11,7 @@ describe('Waku', function () {
describe('Interop: Nim', function () {
it('nim connects to js', async function () {
this.timeout(10_000);
const waku = await Waku.create(NOISE_KEY_1);
const waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
const peerId = waku.libp2p.peerId.toB58String();

View File

@ -8,24 +8,38 @@ import PeerId from 'peer-id';
import { CODEC, WakuRelay, WakuRelayPubsub } from './waku_relay';
export interface CreateOptions {
listenAddresses: string[];
staticNoiseKey: bytes | undefined;
}
export default class Waku {
private constructor(public libp2p: Libp2p, public relay: WakuRelay) {}
/**
* Create new waku node
* @param listenAddresses: Array of Multiaddrs on which the node should listen. If not present, defaults to ['/ip4/0.0.0.0/tcp/0'].
* @param staticNoiseKey: A static key to use for noise,
* mainly used for test to reduce entropy usage.
* @returns {Promise<Waku>}
*/
static async create(staticNoiseKey?: bytes): Promise<Waku> {
static async create(options: Partial<CreateOptions>): Promise<Waku> {
const opts = Object.assign(
{
listenAddresses: ['/ip4/0.0.0.0/tcp/0'],
staticNoiseKey: undefined,
},
options
);
const libp2p = await Libp2p.create({
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0'],
listen: opts.listenAddresses,
},
modules: {
transport: [TCP],
streamMuxer: [Mplex],
connEncryption: [new Noise(staticNoiseKey)],
connEncryption: [new Noise(opts.staticNoiseKey)],
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: Type needs update
pubsub: WakuRelayPubsub,
@ -37,6 +51,14 @@ export default class Waku {
return new Waku(libp2p, new WakuRelay(libp2p.pubsub));
}
/**
* Dials to the provided peer. If successful, the known metadata of the peer will be added to the nodes peerStore, and the Connection will be returned
* @param peer The peer to dial
*/
async dial(peer: PeerId | Multiaddr | string) {
return this.libp2p.dialProtocol(peer, CODEC);
}
async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) {
this.libp2p.peerStore.addressBook.set(peerId, multiaddr);
await this.libp2p.dialProtocol(peerId, CODEC);

View File

@ -1,17 +1,28 @@
import fc from 'fast-check';
import { Message } from './waku_message';
import { WakuMessage } from './waku_message';
describe('Waku Message', function () {
it('Waku message round trip binary serialization', function () {
fc.assert(
fc.property(fc.string(), (s) => {
const msg = Message.fromUtf8String(s);
const msg = WakuMessage.fromUtf8String(s);
const binary = msg.toBinary();
const actual = Message.fromBinary(binary);
const actual = WakuMessage.decode(binary);
return actual.isEqualTo(msg);
})
);
});
it('Payload to utf-8', function () {
fc.assert(
fc.property(fc.string(), (s) => {
const msg = WakuMessage.fromUtf8String(s);
const utf8 = msg.utf8Payload();
return utf8 === s;
})
);
});
});

View File

@ -2,12 +2,12 @@
import { Reader } from 'protobufjs/minimal';
// Protecting the user from protobuf oddities
import { WakuMessage } from '../proto/waku/v2/waku';
import { WakuMessageProto } from '../proto/waku/v2/waku';
const DEFAULT_CONTENT_TOPIC = 1;
const DEFAULT_VERSION = 0;
export class Message {
export class WakuMessage {
private constructor(
public payload?: Uint8Array,
public contentTopic?: number,
@ -15,32 +15,57 @@ export class Message {
) {}
/**
* Create Message from utf-8 string
* @param message
* @returns {Message}
* Create Message with a utf-8 string as payload
* @param payload
* @returns {WakuMessage}
*/
static fromUtf8String(message: string): Message {
const payload = Buffer.from(message, 'utf-8');
return new Message(payload, DEFAULT_CONTENT_TOPIC, DEFAULT_VERSION);
static fromUtf8String(payload: string): WakuMessage {
const buf = Buffer.from(payload, 'utf-8');
return new WakuMessage(buf, DEFAULT_CONTENT_TOPIC, DEFAULT_VERSION);
}
static fromBinary(bytes: Uint8Array): Message {
const wakuMsg = WakuMessage.decode(Reader.create(bytes));
return new Message(wakuMsg.payload, wakuMsg.contentTopic, wakuMsg.version);
/**
* Create Message with a byte array as payload
* @param payload
* @returns {WakuMessage}
*/
static fromBytes(payload: Uint8Array): WakuMessage {
return new WakuMessage(payload, DEFAULT_CONTENT_TOPIC, DEFAULT_VERSION);
}
static decode(bytes: Uint8Array): WakuMessage {
const wakuMsg = WakuMessageProto.decode(Reader.create(bytes));
return new WakuMessage(
wakuMsg.payload,
wakuMsg.contentTopic,
wakuMsg.version
);
}
toBinary(): Uint8Array {
return WakuMessage.encode({
return WakuMessageProto.encode({
payload: this.payload,
version: this.version,
contentTopic: this.contentTopic,
}).finish();
}
utf8Payload(): string {
if (!this.payload) {
return '';
}
return Array.from(this.payload)
.map((char) => {
return String.fromCharCode(char);
})
.join('');
}
// Purely for tests purposes.
// We do consider protobuf field when checking equality
// As the content is held by the other fields.
isEqualTo(other: Message) {
isEqualTo(other: WakuMessage) {
const payloadsAreEqual =
this.payload && other.payload
? Buffer.compare(this.payload, other.payload) === 0

View File

@ -2,11 +2,12 @@ import { expect } from 'chai';
import Pubsub from 'libp2p-interfaces/src/pubsub';
import { NOISE_KEY_1, NOISE_KEY_2 } from '../test_utils/constants';
import { delay } from '../test_utils/delay';
import { makeLogFileName } from '../test_utils/log_file';
import { NimWaku } from '../test_utils/nim_waku';
import Waku from './waku';
import { Message } from './waku_message';
import { WakuMessage } from './waku_message';
import { CODEC, TOPIC } from './waku_relay';
describe('Waku Relay', () => {
@ -20,8 +21,8 @@ describe('Waku Relay', () => {
let waku2: Waku;
beforeEach(async function () {
[waku1, waku2] = await Promise.all([
Waku.create(NOISE_KEY_1),
Waku.create(NOISE_KEY_2),
Waku.create({ staticNoiseKey: NOISE_KEY_1 }),
Waku.create({ staticNoiseKey: NOISE_KEY_2 }),
]);
await waku1.dialWithMultiAddr(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
@ -76,7 +77,7 @@ describe('Waku Relay', () => {
it.skip('Publish', async function () {
this.timeout(10000);
const message = Message.fromUtf8String('JS to JS communication works');
const message = WakuMessage.fromUtf8String('JS to JS communication works');
// waku.libp2p.pubsub.globalSignaturePolicy = 'StrictSign';
const receivedPromise = waitForNextData(waku2.libp2p.pubsub);
@ -108,7 +109,7 @@ describe('Waku Relay', () => {
beforeEach(async function () {
this.timeout(10_000);
waku = await Waku.create(NOISE_KEY_1);
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
const peerId = waku.libp2p.peerId.toB58String();
const localMultiaddr = waku.libp2p.multiaddrs.find((addr) =>
@ -140,7 +141,7 @@ describe('Waku Relay', () => {
it('Js publishes to nim', async function () {
this.timeout(5000);
const message = Message.fromUtf8String('This is a message');
const message = WakuMessage.fromUtf8String('This is a message');
await waku.relay.publish(message);
@ -157,13 +158,7 @@ describe('Waku Relay', () => {
it('Nim publishes to js', async function () {
this.timeout(5000);
const message = Message.fromUtf8String('Here is another message.');
await waku.relay.subscribe();
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
const message = WakuMessage.fromUtf8String('Here is another message.');
const receivedPromise = waitForNextData(waku.libp2p.pubsub);
@ -185,14 +180,82 @@ describe('Waku Relay', () => {
beforeEach(async function () {
this.timeout(10_000);
waku = await Waku.create(NOISE_KEY_1);
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
nimWaku = new NimWaku(this.test!.ctx!.currentTest!.title);
await nimWaku.start();
await waku.dial(await nimWaku.getMultiaddrWithId());
await delay(100);
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
await waku.relay.subscribe();
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
});
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
});
it('nim subscribes to js', async function () {
const subscribers = waku.libp2p.pubsub.getSubscribers(TOPIC);
const nimPeerId = await nimWaku.getPeerId();
expect(subscribers).to.contain(nimPeerId.toB58String());
});
it('Js publishes to nim', async function () {
const message = WakuMessage.fromUtf8String('This is a message');
await waku.relay.publish(message);
await nimWaku.waitForLog('WakuMessage received');
const msgs = await nimWaku.messages();
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(msgs[0].version).to.equal(message.version);
const payload = Buffer.from(msgs[0].payload);
expect(Buffer.compare(payload, message.payload!)).to.equal(0);
});
it('Nim publishes to js', async function () {
const message = WakuMessage.fromUtf8String('Here is another message.');
const receivedPromise = waitForNextData(waku.libp2p.pubsub);
await nimWaku.sendMessage(message);
const receivedMsg = await receivedPromise;
expect(receivedMsg.contentTopic).to.eq(message.contentTopic);
expect(receivedMsg.version).to.eq(message.version);
const payload = Buffer.from(receivedMsg.payload!);
expect(Buffer.compare(payload, message.payload!)).to.eq(0);
});
});
describe('Js connects to nim', function () {
let waku: Waku;
let nimWaku: NimWaku;
beforeEach(async function () {
this.timeout(10_000);
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start();
const nimPeerId = await nimWaku.getPeerId();
await waku.dialWithMultiAddr(nimPeerId, [nimWaku.multiaddr]);
await waku.dial(await nimWaku.getMultiaddrWithId());
await new Promise((resolve) =>
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
@ -218,7 +281,7 @@ describe('Waku Relay', () => {
});
it('Js publishes to nim', async function () {
const message = Message.fromUtf8String('This is a message');
const message = WakuMessage.fromUtf8String('This is a message');
await waku.relay.publish(message);
@ -234,7 +297,7 @@ describe('Waku Relay', () => {
});
it('Nim publishes to js', async function () {
const message = Message.fromUtf8String('Here is another message.');
const message = WakuMessage.fromUtf8String('Here is another message.');
const receivedPromise = waitForNextData(waku.libp2p.pubsub);
@ -249,13 +312,86 @@ describe('Waku Relay', () => {
expect(Buffer.compare(payload, message.payload!)).to.eq(0);
});
});
describe('js to nim to js', function () {
let waku1: Waku;
let waku2: Waku;
let nimWaku: NimWaku;
beforeEach(async function () {
this.timeout(10_000);
[waku1, waku2] = await Promise.all([
Waku.create({ staticNoiseKey: NOISE_KEY_1 }),
Waku.create({ staticNoiseKey: NOISE_KEY_2 }),
]);
nimWaku = new NimWaku(this.test!.ctx!.currentTest!.title);
await nimWaku.start();
const nimWakuMultiaddr = await nimWaku.getMultiaddrWithId();
await Promise.all([
waku1.dial(nimWakuMultiaddr),
waku2.dial(nimWakuMultiaddr),
]);
await delay(100);
await Promise.all([
new Promise((resolve) =>
waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
]);
await Promise.all([waku1.relay.subscribe(), waku2.relay.subscribe()]);
await Promise.all([
new Promise((resolve) =>
waku1.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
new Promise((resolve) =>
waku2.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
),
]);
});
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
await Promise.all([
waku1 ? await waku1.stop() : null,
waku2 ? await waku2.stop() : null,
]);
});
it('Js publishes, other Js receives', async function () {
// Check that the two JS peers are NOT directly connected
expect(
waku1.libp2p.peerStore.peers.has(waku2.libp2p.peerId.toB58String())
).to.be.false;
expect(
waku2.libp2p.peerStore.peers.has(waku1.libp2p.peerId.toB58String())
).to.be.false;
const msgStr = 'Hello there!';
const message = WakuMessage.fromUtf8String(msgStr);
const waku2ReceivedPromise = waitForNextData(waku2.libp2p.pubsub);
await waku1.relay.publish(message);
const waku2ReceivedMsg = await waku2ReceivedPromise;
expect(waku2ReceivedMsg.utf8Payload()).to.eq(msgStr);
});
});
});
});
function waitForNextData(pubsub: Pubsub): Promise<Message> {
function waitForNextData(pubsub: Pubsub): Promise<WakuMessage> {
return new Promise((resolve) => {
pubsub.once(TOPIC, resolve);
}).then((msg: any) => {
return Message.fromBinary(msg.data);
return WakuMessage.decode(msg.data);
});
}

View File

@ -4,7 +4,7 @@ import Pubsub from 'libp2p-interfaces/src/pubsub';
import { SignaturePolicy } from 'libp2p-interfaces/src/pubsub/signature-policy';
import { getWakuPeers } from './get_waku_peers';
import { Message } from './waku_message';
import { WakuMessage } from './waku_message';
export const CODEC = '/vac/waku/relay/2.0.0-beta2';
@ -16,11 +16,10 @@ export class WakuRelayPubsub extends Gossipsub {
/**
*
* @param libp2p: Libp2p
* @param options: Partial<GossipInputOptions>
*/
constructor(libp2p: Libp2p) {
super(libp2p, {
emitSelf: true,
emitSelf: false,
// Ensure that no signature is expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
});
@ -100,7 +99,7 @@ export class WakuRelay {
await this.pubsub.subscribe(TOPIC);
}
async publish(message: Message) {
async publish(message: WakuMessage) {
const msg = message.toBinary();
await this.pubsub.publish(TOPIC, msg);
}

View File

@ -7,7 +7,7 @@ import Multiaddr from 'multiaddr';
import multiaddr from 'multiaddr';
import PeerId from 'peer-id';
import { Message } from '../lib/waku_message';
import { WakuMessage } from '../lib/waku_message';
import { TOPIC } from '../lib/waku_relay';
import { existsAsync, mkdirAsync, openAsync } from './async_fs';
@ -36,6 +36,7 @@ export class NimWaku {
private pid?: number;
private portsShift: number;
private peerId?: PeerId;
private multiaddrWithId?: Multiaddr;
private logPath: string;
constructor(logName: string) {
@ -131,7 +132,7 @@ export class NimWaku {
return res.result;
}
async sendMessage(message: Message) {
async sendMessage(message: WakuMessage) {
this.checkProcess();
if (!message.payload) {
@ -160,14 +161,25 @@ export class NimWaku {
}
async getPeerId(): Promise<PeerId> {
if (this.peerId) {
return this.peerId;
return await this.setPeerId().then((res) => res.peerId);
}
async getMultiaddrWithId(): Promise<Multiaddr> {
return await this.setPeerId().then((res) => res.multiaddrWithId);
}
private async setPeerId(): Promise<{
peerId: PeerId;
multiaddrWithId: Multiaddr;
}> {
if (this.peerId && this.multiaddrWithId) {
return { peerId: this.peerId, multiaddrWithId: this.multiaddrWithId };
}
const res = await this.info();
const strPeerId = multiaddr(res.listenStr).getPeerId();
return PeerId.createFromB58String(strPeerId);
this.multiaddrWithId = multiaddr(res.listenStr);
const peerIdStr = this.multiaddrWithId.getPeerId();
this.peerId = PeerId.createFromB58String(peerIdStr);
return { peerId: this.peerId, multiaddrWithId: this.multiaddrWithId };
}
get multiaddr(): Multiaddr {