Merge pull request #170 from status-im/151-light-push

This commit is contained in:
Franck Royer 2021-05-19 12:49:37 +10:00 committed by GitHub
commit 8e1f82563e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
14 changed files with 549 additions and 6 deletions

View File

@ -34,6 +34,7 @@
"lastpub",
"libauth",
"libp",
"lightpush",
"livechat",
"mkdir",
"multiaddr",

View File

@ -7,6 +7,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- Implement [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
- Expose `Direction` enum from js-waku root (it was only accessible via the proto module).
- Examples (cli chat): Use light push to send messages if `--lightPush` is passed.
- Examples (cli chat): Print usage if `--help` is passed.
## [0.4.0] - 2021-05-18
### Added

View File

@ -131,6 +131,7 @@ You can track progress on the [project board](https://github.com/status-im/js-wa
|[16/WAKU2-RPC](https://rfc.vac.dev/spec/16)|⛔|
|[17/WAKU2-RLNRELAY](https://rfc.vac.dev/spec/17)||
|[18/WAKU2-SWAP](https://rfc.vac.dev/spec/18)||
|[19/WAKU2-LIGHTPUSH](https://rfc.vac.dev/spec/19/)|✔|
## Bugs, Questions & Features

View File

@ -3,20 +3,25 @@ import util from 'util';
import {
ChatMessage,
Direction,
Environment,
getStatusFleetNodes,
LightPushCodec,
StoreCodec,
Waku,
WakuMessage,
} from 'js-waku';
import TCP from 'libp2p-tcp';
import { multiaddr, Multiaddr } from 'multiaddr';
import PeerId from 'peer-id';
const ChatContentTopic = 'dingpu';
export default async function startChat(): Promise<void> {
let opts = processArguments();
if (!opts) return;
if (opts.autoDial) {
opts = await addFleetNodes(opts);
}
@ -80,6 +85,7 @@ export default async function startChat(): Promise<void> {
const messages = await waku.store.queryHistory({
peerId,
contentTopics: [ChatContentTopic],
direction: Direction.FORWARD,
});
messages?.map((msg) => {
if (msg.payload) {
@ -91,6 +97,20 @@ export default async function startChat(): Promise<void> {
}
);
let lightPushNode: PeerId | undefined = undefined;
// Select a node for light pushing (any node).
if (opts.lightPush) {
waku.libp2p.peerStore.on(
'change:protocols',
async ({ peerId, protocols }) => {
if (!lightPushNode && protocols.includes(LightPushCodec)) {
console.log(`Using ${peerId.toB58String()} to light push messages`);
lightPushNode = peerId;
}
}
);
}
console.log('Ready to chat!');
rl.prompt();
for await (const line of rl) {
@ -98,7 +118,11 @@ export default async function startChat(): Promise<void> {
const chatMessage = ChatMessage.fromUtf8String(new Date(), nick, line);
const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic);
await waku.relay.send(msg);
if (lightPushNode && opts.lightPush) {
await waku.lightPush.push(lightPushNode, msg);
} else {
await waku.relay.send(msg);
}
}
}
@ -107,9 +131,10 @@ interface Options {
listenAddr: string;
autoDial: boolean;
prod: boolean;
lightPush: boolean;
}
function processArguments(): Options {
function processArguments(): Options | null {
const passedArgs = process.argv.slice(2);
let opts: Options = {
@ -117,11 +142,27 @@ function processArguments(): Options {
staticNodes: [],
autoDial: false,
prod: false,
lightPush: false,
};
while (passedArgs.length) {
const arg = passedArgs.shift();
switch (arg) {
case `--help`:
console.log('Usage:');
console.log(' --help This help message');
console.log(
' --staticNode {multiaddr} Connect to this static node, can be set multiple time'
);
console.log(' --listenAddr {addr} Listen on this address');
console.log(' --autoDial Automatically dial Status fleet nodes');
console.log(
' --prod With `autoDial`, connect ot Status prod fleet (test fleet is dialed if not set)'
);
console.log(
' --lightPush Use Waku v2 Light Push protocol to send messages, instead of Waku v2 relay'
);
return null;
case '--staticNode':
opts.staticNodes.push(multiaddr(passedArgs.shift()!));
break;
@ -134,6 +175,9 @@ function processArguments(): Options {
case '--prod':
opts.prod = true;
break;
case '--lightPush':
opts.lightPush = true;
break;
default:
console.log(`Unsupported argument: ${arg}`);
process.exit(1);

View File

@ -85,7 +85,6 @@ export default function App() {
}
};
// TODO: Split this
const handleProtocolChange = async (
waku: Waku,
{ peerId, protocols }: { peerId: PeerId; protocols: string[] }

View File

@ -0,0 +1,21 @@
syntax = "proto3";
package waku.v2;
import "waku/v2/message.proto";
message PushRequest {
string pubsub_topic = 1;
WakuMessage message = 2;
}
message PushResponse {
bool is_success = 1;
string info = 2;
}
message PushRPC {
string request_id = 1;
PushRequest request = 2;
PushResponse response = 3;
}

View File

@ -5,8 +5,14 @@ export { WakuMessage } from './lib/waku_message';
export { ChatMessage } from './lib/chat_message';
export {
WakuLightPush,
LightPushCodec,
PushResponse,
} from './lib/waku_light_push';
export { WakuRelay, RelayCodec } from './lib/waku_relay';
export { WakuStore, StoreCodec } from './lib/waku_store';
export { Direction, WakuStore, StoreCodec } from './lib/waku_store';
export * as proto from './proto';

View File

@ -7,6 +7,7 @@ import filters from 'libp2p-websockets/src/filters';
import { Multiaddr, multiaddr } from 'multiaddr';
import PeerId from 'peer-id';
import { WakuLightPush } from './waku_light_push';
import { RelayCodec, WakuRelay } from './waku_relay';
import { StoreCodec, WakuStore } from './waku_store';
@ -25,11 +26,17 @@ export class Waku {
public libp2p: Libp2p;
public relay: WakuRelay;
public store: WakuStore;
public lightPush: WakuLightPush;
private constructor(libp2p: Libp2p, store: WakuStore) {
private constructor(
libp2p: Libp2p,
store: WakuStore,
lightPush: WakuLightPush
) {
this.libp2p = libp2p;
this.relay = (libp2p.pubsub as unknown) as WakuRelay;
this.store = store;
this.lightPush = lightPush;
}
/**
@ -84,10 +91,11 @@ export class Waku {
});
const wakuStore = new WakuStore(libp2p);
const wakuLightPush = new WakuLightPush(libp2p);
await libp2p.start();
return new Waku(libp2p, wakuStore);
return new Waku(libp2p, wakuStore, wakuLightPush);
}
/**

View File

@ -0,0 +1,54 @@
import { expect } from 'chai';
import TCP from 'libp2p-tcp';
import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils';
import { delay } from '../delay';
import { Waku } from '../waku';
import { WakuMessage } from '../waku_message';
describe('Waku Light Push', () => {
let waku: Waku;
let nimWaku: NimWaku;
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
});
it('Push successfully', async function () {
this.timeout(5_000);
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ lightpush: true });
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
modules: { transport: [TCP] },
});
await waku.dial(await nimWaku.getMultiaddrWithId());
// Wait for identify protocol to finish
await new Promise((resolve) => {
waku.libp2p.peerStore.once('change:protocols', resolve);
});
const nimPeerId = await nimWaku.getPeerId();
const messageText = 'Light Push works!';
const message = WakuMessage.fromUtf8String(messageText);
const pushResponse = await waku.lightPush.push(nimPeerId, message);
expect(pushResponse?.isSuccess).to.be.true;
let msgs: WakuMessage[] = [];
while (msgs.length === 0) {
await delay(200);
msgs = await nimWaku.messages();
}
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
expect(msgs[0].version).to.equal(message.version);
expect(msgs[0].payloadAsUtf8).to.equal(messageText);
});
});

View File

@ -0,0 +1,62 @@
import concat from 'it-concat';
import lp from 'it-length-prefixed';
import pipe from 'it-pipe';
import Libp2p from 'libp2p';
import PeerId from 'peer-id';
import { PushResponse } from '../../proto/waku/v2/light_push';
import { WakuMessage } from '../waku_message';
import { DefaultPubsubTopic } from '../waku_relay';
import { PushRPC } from './push_rpc';
export const LightPushCodec = '/vac/waku/lightpush/2.0.0-alpha1';
export { PushResponse };
/**
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
*/
export class WakuLightPush {
constructor(public libp2p: Libp2p) {}
async push(
peerId: PeerId,
message: WakuMessage,
pubsubTopic: string = DefaultPubsubTopic
): Promise<PushResponse | null> {
const peer = this.libp2p.peerStore.get(peerId);
if (!peer) throw 'Peer is unknown';
if (!peer.protocols.includes(LightPushCodec))
throw 'Peer does not register waku light push protocol';
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) throw 'Failed to get a connection to the peer';
const { stream } = await connection.newStream(LightPushCodec);
try {
const query = PushRPC.createRequest(message, pubsubTopic);
const res = await pipe(
[query.encode()],
lp.encode(),
stream,
lp.decode(),
concat
);
try {
const response = PushRPC.decode(res.slice()).response;
if (!response) {
console.log('No response in PushRPC');
return null;
}
return response;
} catch (err) {
console.log('Failed to decode push reply', err);
}
} catch (err) {
console.log('Failed to send waku light push request', err);
}
return null;
}
}

View File

@ -0,0 +1,41 @@
import { Reader } from 'protobufjs/minimal';
import { v4 as uuid } from 'uuid';
import * as proto from '../../proto/waku/v2/light_push';
import { WakuMessage } from '../waku_message';
import { DefaultPubsubTopic } from '../waku_relay';
export class PushRPC {
public constructor(public proto: proto.PushRPC) {}
static createRequest(
message: WakuMessage,
pubsubTopic: string = DefaultPubsubTopic
): PushRPC {
return new PushRPC({
requestId: uuid(),
request: {
message,
pubsubTopic,
},
response: undefined,
});
}
static decode(bytes: Uint8Array): PushRPC {
const res = proto.PushRPC.decode(Reader.create(bytes));
return new PushRPC(res);
}
encode(): Uint8Array {
return proto.PushRPC.encode(this.proto).finish();
}
get query(): proto.PushRequest | undefined {
return this.proto.request;
}
get response(): proto.PushResponse | undefined {
return this.proto.response;
}
}

View File

@ -11,6 +11,8 @@ import { Direction, HistoryRPC } from './history_rpc';
export const StoreCodec = '/vac/waku/store/2.0.0-beta3';
export { Direction };
export interface Options {
peerId: PeerId;
contentTopics: string[];

View File

@ -0,0 +1,297 @@
/* eslint-disable */
import Long from 'long';
import _m0 from 'protobufjs/minimal';
import { WakuMessage } from '../../waku/v2/message';
export const protobufPackage = 'waku.v2';
export interface PushRequest {
pubsubTopic: string;
message: WakuMessage | undefined;
}
export interface PushResponse {
isSuccess: boolean;
info: string;
}
export interface PushRPC {
requestId: string;
request: PushRequest | undefined;
response: PushResponse | undefined;
}
const basePushRequest: object = { pubsubTopic: '' };
export const PushRequest = {
encode(
message: PushRequest,
writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer {
if (message.pubsubTopic !== '') {
writer.uint32(10).string(message.pubsubTopic);
}
if (message.message !== undefined) {
WakuMessage.encode(message.message, writer.uint32(18).fork()).ldelim();
}
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): PushRequest {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = { ...basePushRequest } as PushRequest;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.pubsubTopic = reader.string();
break;
case 2:
message.message = WakuMessage.decode(reader, reader.uint32());
break;
default:
reader.skipType(tag & 7);
break;
}
}
return message;
},
fromJSON(object: any): PushRequest {
const message = { ...basePushRequest } as PushRequest;
if (object.pubsubTopic !== undefined && object.pubsubTopic !== null) {
message.pubsubTopic = String(object.pubsubTopic);
} else {
message.pubsubTopic = '';
}
if (object.message !== undefined && object.message !== null) {
message.message = WakuMessage.fromJSON(object.message);
} else {
message.message = undefined;
}
return message;
},
toJSON(message: PushRequest): unknown {
const obj: any = {};
message.pubsubTopic !== undefined &&
(obj.pubsubTopic = message.pubsubTopic);
message.message !== undefined &&
(obj.message = message.message
? WakuMessage.toJSON(message.message)
: undefined);
return obj;
},
fromPartial(object: DeepPartial<PushRequest>): PushRequest {
const message = { ...basePushRequest } as PushRequest;
if (object.pubsubTopic !== undefined && object.pubsubTopic !== null) {
message.pubsubTopic = object.pubsubTopic;
} else {
message.pubsubTopic = '';
}
if (object.message !== undefined && object.message !== null) {
message.message = WakuMessage.fromPartial(object.message);
} else {
message.message = undefined;
}
return message;
},
};
const basePushResponse: object = { isSuccess: false, info: '' };
export const PushResponse = {
encode(
message: PushResponse,
writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer {
if (message.isSuccess === true) {
writer.uint32(8).bool(message.isSuccess);
}
if (message.info !== '') {
writer.uint32(18).string(message.info);
}
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): PushResponse {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = { ...basePushResponse } as PushResponse;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.isSuccess = reader.bool();
break;
case 2:
message.info = reader.string();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return message;
},
fromJSON(object: any): PushResponse {
const message = { ...basePushResponse } as PushResponse;
if (object.isSuccess !== undefined && object.isSuccess !== null) {
message.isSuccess = Boolean(object.isSuccess);
} else {
message.isSuccess = false;
}
if (object.info !== undefined && object.info !== null) {
message.info = String(object.info);
} else {
message.info = '';
}
return message;
},
toJSON(message: PushResponse): unknown {
const obj: any = {};
message.isSuccess !== undefined && (obj.isSuccess = message.isSuccess);
message.info !== undefined && (obj.info = message.info);
return obj;
},
fromPartial(object: DeepPartial<PushResponse>): PushResponse {
const message = { ...basePushResponse } as PushResponse;
if (object.isSuccess !== undefined && object.isSuccess !== null) {
message.isSuccess = object.isSuccess;
} else {
message.isSuccess = false;
}
if (object.info !== undefined && object.info !== null) {
message.info = object.info;
} else {
message.info = '';
}
return message;
},
};
const basePushRPC: object = { requestId: '' };
export const PushRPC = {
encode(
message: PushRPC,
writer: _m0.Writer = _m0.Writer.create()
): _m0.Writer {
if (message.requestId !== '') {
writer.uint32(10).string(message.requestId);
}
if (message.request !== undefined) {
PushRequest.encode(message.request, writer.uint32(18).fork()).ldelim();
}
if (message.response !== undefined) {
PushResponse.encode(message.response, writer.uint32(26).fork()).ldelim();
}
return writer;
},
decode(input: _m0.Reader | Uint8Array, length?: number): PushRPC {
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
let end = length === undefined ? reader.len : reader.pos + length;
const message = { ...basePushRPC } as PushRPC;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
message.requestId = reader.string();
break;
case 2:
message.request = PushRequest.decode(reader, reader.uint32());
break;
case 3:
message.response = PushResponse.decode(reader, reader.uint32());
break;
default:
reader.skipType(tag & 7);
break;
}
}
return message;
},
fromJSON(object: any): PushRPC {
const message = { ...basePushRPC } as PushRPC;
if (object.requestId !== undefined && object.requestId !== null) {
message.requestId = String(object.requestId);
} else {
message.requestId = '';
}
if (object.request !== undefined && object.request !== null) {
message.request = PushRequest.fromJSON(object.request);
} else {
message.request = undefined;
}
if (object.response !== undefined && object.response !== null) {
message.response = PushResponse.fromJSON(object.response);
} else {
message.response = undefined;
}
return message;
},
toJSON(message: PushRPC): unknown {
const obj: any = {};
message.requestId !== undefined && (obj.requestId = message.requestId);
message.request !== undefined &&
(obj.request = message.request
? PushRequest.toJSON(message.request)
: undefined);
message.response !== undefined &&
(obj.response = message.response
? PushResponse.toJSON(message.response)
: undefined);
return obj;
},
fromPartial(object: DeepPartial<PushRPC>): PushRPC {
const message = { ...basePushRPC } as PushRPC;
if (object.requestId !== undefined && object.requestId !== null) {
message.requestId = object.requestId;
} else {
message.requestId = '';
}
if (object.request !== undefined && object.request !== null) {
message.request = PushRequest.fromPartial(object.request);
} else {
message.request = undefined;
}
if (object.response !== undefined && object.response !== null) {
message.response = PushResponse.fromPartial(object.response);
} else {
message.response = undefined;
}
return message;
},
};
type Builtin =
| Date
| Function
| Uint8Array
| string
| number
| boolean
| undefined;
export type DeepPartial<T> = T extends Builtin
? T
: T extends Array<infer U>
? Array<DeepPartial<U>>
: T extends ReadonlyArray<infer U>
? ReadonlyArray<DeepPartial<U>>
: T extends {}
? { [K in keyof T]?: DeepPartial<T[K]> }
: Partial<T>;
if (_m0.util.Long !== Long) {
_m0.util.Long = Long as any;
_m0.configure();
}

View File

@ -39,6 +39,7 @@ export interface Args {
portsShift?: number;
logLevel?: LogLevel;
persistMessages?: boolean;
lightpush?: boolean;
}
export enum LogLevel {