Use waku messages over waku relay

This commit is contained in:
Franck Royer 2021-03-10 16:22:49 +11:00
parent 070847d2c0
commit 4329b8006e
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
13 changed files with 1380 additions and 114 deletions

1
.gitignore vendored
View File

@ -4,6 +4,7 @@ build
node_modules
test
src/**.js
src/gen
coverage
*.log
yarn.lock

View File

@ -1,2 +1,3 @@
# package.json is formatted by package managers, so we ignore it here
package.json
package.json
gen

11
buf.gen.yaml Normal file
View File

@ -0,0 +1,11 @@
version: v1beta1
plugins:
- name: ts
out: src/gen/proto
opt: grpc_js
# protoc 3.13 our above is needed as the schema is v3 with optional fields
- name: js
out: build/main/gen/proto
opt: import_style=commonjs,binary

5
buf.yaml Normal file
View File

@ -0,0 +1,5 @@
version: v1beta1
build:
roots:
- ./proto

1277
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -12,6 +12,7 @@
"build": "run-p build:*",
"build:main": "tsc -p tsconfig.json",
"build:module": "tsc -p tsconfig.module.json",
"build:proto": "buf generate",
"fix": "run-s fix:*",
"fix:prettier": "prettier \"src/**/*.ts\" --write",
"fix:lint": "eslint src --ext .ts --fix",
@ -20,6 +21,7 @@
"test:prettier": "prettier \"src/**/*.ts\" --list-different",
"test:spelling": "cspell \"{README.md,.github/*.md,src/**/*.ts}\"",
"test:unit": "nyc --silent ava",
"test:lint-proto": "buf lint",
"check-cli": "run-s test diff-integration-tests check-integration-tests",
"check-integration-tests": "run-s check-integration-test:*",
"diff-integration-tests": "mkdir -p diff && rm -rf diff/test && cp -r test diff/test && rm -rf diff/test/test-*/.git && cd diff && git init --quiet && git add -A && git commit --quiet --no-verify --allow-empty -m 'WIP' && echo '\\n\\nCommitted most recent integration test output in the \"diff\" directory. Review the changes with \"cd diff && git diff HEAD\" or your preferred git diff viewer.'",
@ -43,13 +45,16 @@
},
"dependencies": {
"@bitauth/libauth": "^1.17.1",
"libp2p": "^0.30.9",
"libp2p-gossipsub": "^0.8.0",
"@types/prompt-sync": "^4.1.0",
"debug": "^4.3.1",
"libp2p": "^0.30.0",
"libp2p-gossipsub": "^0.7.0",
"libp2p-mplex": "^0.10.2",
"libp2p-noise": "^2.0.5",
"libp2p-secio": "^0.13.1",
"libp2p-tcp": "^0.15.3",
"multiaddr": "^8.1.2",
"prompt-sync": "^4.2.0",
"yarg": "^1.0.8"
},
"devDependencies": {
@ -58,7 +63,8 @@
"@types/node": "^14.14.31",
"@typescript-eslint/eslint-plugin": "^4.0.1",
"@typescript-eslint/parser": "^4.0.1",
"ava": "^3.12.1",
"ava": "^3.15.0",
"ava-fast-check": "^4.0.2",
"codecov": "^3.5.0",
"cspell": "^4.1.0",
"cz-conventional-changelog": "^3.3.0",
@ -67,7 +73,9 @@
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^3.0.2",
"eslint-plugin-import": "^2.22.0",
"fast-check": "^2.13.0",
"gh-pages": "^3.1.0",
"grpc_tools_node_protoc_ts": "^5.1.3",
"npm-run-all": "^4.1.5",
"nyc": "^15.1.0",
"open-cli": "^6.0.1",

9
proto/waku/v2/waku.proto Normal file
View File

@ -0,0 +1,9 @@
syntax = "proto3";
package waku.v2;
message WakuMessage {
optional bytes payload = 1;
optional fixed32 content_topic = 2;
optional uint32 version = 3;
}

View File

@ -1,74 +0,0 @@
import Libp2p from 'libp2p';
import Mplex from 'libp2p-mplex';
import { NOISE } from 'libp2p-noise';
import Secio from 'libp2p-secio';
import TCP from 'libp2p-tcp';
import Multiaddr from 'multiaddr';
import multiaddr from 'multiaddr';
import { CODEC, WakuRelay } from './lib/waku_relay';
(async () => {
// Handle arguments
const { peer } = args();
const libp2p = await Libp2p.create({
addresses: {
listen: ['/ip4/0.0.0.0/tcp/0', '/ip4/0.0.0.0/tcp/0/ws'],
},
modules: {
transport: [TCP],
streamMuxer: [Mplex],
connEncryption: [NOISE, Secio],
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: Key missing, see https://github.com/libp2p/js-libp2p/issues/830#issuecomment-791040021
pubsub: WakuRelay,
},
config: {
pubsub: {
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore
enabled: true,
emitSelf: true,
signMessages: false,
strictSigning: false,
},
},
});
libp2p.connectionManager.on('peer:connect', (connection) => {
console.info(`Connected to ${connection.remotePeer.toB58String()}!`);
});
// Start libp2p
await libp2p.start();
console.log('listening on addresses:');
libp2p.multiaddrs.forEach((addr) => {
console.log(`${addr.toString()}/p2p/${libp2p.peerId.toB58String()}`);
});
console.log('\nNode supports protocols:');
libp2p.upgrader.protocols.forEach((_, p) => console.log(p));
// Dial nim-waku using waku relay protocol
if (process.argv.length >= 3) {
console.log(`dialing remote peer at ${peer}`);
await libp2p.dialProtocol(peer, CODEC);
console.log(`dialed ${peer}`);
}
})();
function args(): { peer: Multiaddr } {
const args = process.argv.slice(2);
if (args.length != 1) {
console.log(`Usage:
${process.argv[0]} ${process.argv[1]} <peer multiaddress>`);
process.exit(1);
}
const peer = multiaddr(args[0]);
return { peer };
}

0
src/lib/cli.ts Normal file
View File

View File

@ -1,21 +1,20 @@
import { TextDecoder } from 'util';
import test from 'ava';
import Pubsub from 'libp2p-interfaces/src/pubsub';
import { createNode } from './node';
import { CODEC, TOPIC, WakuRelay, WakuRelayPubsub } from './waku_relay';
import { Message } from './waku_message';
import { CODEC, TOPIC, WakuRelay } from './waku_relay';
function delay(ms: number) {
return new Promise((resolve) => setTimeout(resolve, ms));
}
test('Can publish message', async (t) => {
const message = 'Bird bird bird, bird is the word!';
const message = Message.fromString('Bird bird bird, bird is the word!');
const [node1, node2] = await Promise.all([createNode(), createNode()]);
const wakuRelayNode1 = new WakuRelay(node1.pubsub as WakuRelayPubsub);
const wakuRelayNode2 = new WakuRelay(node2.pubsub as WakuRelayPubsub);
const wakuRelayNode1 = new WakuRelay(node1.pubsub);
const wakuRelayNode2 = new WakuRelay(node2.pubsub);
// Add node's 2 data to the PeerStore
node1.peerStore.addressBook.set(node2.peerId, node2.multiaddrs);
@ -34,7 +33,7 @@ test('Can publish message', async (t) => {
const node1Received = await promise;
t.deepEqual(node1Received, message);
t.true(node1Received.isEqualTo(message));
});
test('Register waku relay protocol', async (t) => {
@ -45,10 +44,10 @@ test('Register waku relay protocol', async (t) => {
t.truthy(protocols.findIndex((value) => value == CODEC));
});
function waitForNextData(pubsub: Pubsub) {
function waitForNextData(pubsub: Pubsub): Promise<Message> {
return new Promise((resolve) => {
pubsub.once(TOPIC, resolve);
}).then((msg: any) => {
return new TextDecoder().decode(msg.data);
return Message.fromBinary(msg.data);
});
}

View File

@ -0,0 +1,17 @@
import { fc, testProp } from 'ava-fast-check';
import { Message } from './waku_message';
// for all a, b, c strings
// b is a substring of a + b + c
testProp(
'Waku message round trip binary serialisation',
[fc.string()],
(t, s) => {
const msg = Message.fromString(s);
const binary = msg.toBinary();
const actual = Message.fromBinary(binary);
t.true(actual.isEqualTo(msg));
}
);

53
src/lib/waku_message.ts Normal file
View File

@ -0,0 +1,53 @@
import { WakuMessage } from '../gen/proto/waku/v2/waku_pb';
// Ensure that this class matches the proto interface while
// Protecting the user from protobuf oddities
export class Message {
public payload: Uint8Array | string;
public contentTopic: number;
public version: number;
private constructor(private protobuf: WakuMessage) {
this.protobuf = protobuf;
const msg = protobuf.toObject();
this.payload = msg.payload;
this.contentTopic = msg.contentTopic;
this.version = msg.version;
}
static fromString(message: string): Message {
const wakuMsg = new WakuMessage();
// Only Version 0 is implemented in Waku 2.
// 0: payload SHOULD be either unencrypted or that encryption is done at a separate layer outside of Waku.
wakuMsg.setVersion(0);
// This is the content topic commonly used at this time
wakuMsg.setContentTopic(1);
wakuMsg.setPayload(Buffer.from(message));
return new Message(wakuMsg);
}
static fromBinary(message: Uint8Array): Message {
const wakuMsg = WakuMessage.deserializeBinary(message);
return new Message(wakuMsg);
}
toBinary(): Uint8Array {
return this.protobuf.serializeBinary();
}
// Purely for tests purposes.
// We do not care about protobuf field when checking equality
isEqualTo(other: Message) {
return (
this.payload === other.payload &&
this.contentTopic === other.contentTopic &&
this.version === other.version
);
}
}

View File

@ -1,7 +1,8 @@
import { TextEncoder } from 'util';
import Libp2p from 'libp2p';
import Gossipsub from 'libp2p-gossipsub';
import { Libp2p } from 'libp2p-gossipsub/src/interfaces';
import Pubsub from 'libp2p-interfaces/src/pubsub';
import { Message } from './waku_message';
export const CODEC = '/vac/waku/relay/2.0.0-beta2';
@ -23,15 +24,15 @@ export class WakuRelayPubsub extends Gossipsub {
// This class provides an interface to execute the waku relay protocol
export class WakuRelay {
constructor(private pubsub: WakuRelayPubsub) {}
constructor(private pubsub: Pubsub) {}
// At this stage we are always using the same topic so we do not pass it as a parameter
async subscribe() {
await this.pubsub.subscribe(TOPIC);
}
async publish(message: string) {
const msg = new TextEncoder().encode(message);
async publish(message: Message) {
const msg = message.toBinary();
await this.pubsub.publish(TOPIC, msg);
}
}