mirror of https://github.com/waku-org/js-waku.git
Implement Light Push protocol
This commit is contained in:
parent
a0699c423b
commit
de3aea626a
|
@ -34,6 +34,7 @@
|
|||
"lastpub",
|
||||
"libauth",
|
||||
"libp",
|
||||
"lightpush",
|
||||
"livechat",
|
||||
"mkdir",
|
||||
"multiaddr",
|
||||
|
|
|
@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
|||
|
||||
## [Unreleased]
|
||||
|
||||
### Added
|
||||
- Implement [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
|
||||
|
||||
## [0.4.0] - 2021-05-18
|
||||
|
||||
### Added
|
||||
|
|
|
@ -131,6 +131,7 @@ You can track progress on the [project board](https://github.com/status-im/js-wa
|
|||
|[16/WAKU2-RPC](https://rfc.vac.dev/spec/16)|⛔|
|
||||
|[17/WAKU2-RLNRELAY](https://rfc.vac.dev/spec/17)||
|
||||
|[18/WAKU2-SWAP](https://rfc.vac.dev/spec/18)||
|
||||
|[19/WAKU2-LIGHTPUSH](https://rfc.vac.dev/spec/19/)|✔|
|
||||
|
||||
## Bugs, Questions & Features
|
||||
|
||||
|
|
|
@ -0,0 +1,21 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package waku.v2;
|
||||
|
||||
import "waku/v2/message.proto";
|
||||
|
||||
message PushRequest {
|
||||
string pubsub_topic = 1;
|
||||
WakuMessage message = 2;
|
||||
}
|
||||
|
||||
message PushResponse {
|
||||
bool is_success = 1;
|
||||
string info = 2;
|
||||
}
|
||||
|
||||
message PushRPC {
|
||||
string request_id = 1;
|
||||
PushRequest request = 2;
|
||||
PushResponse response = 3;
|
||||
}
|
|
@ -5,6 +5,12 @@ export { WakuMessage } from './lib/waku_message';
|
|||
|
||||
export { ChatMessage } from './lib/chat_message';
|
||||
|
||||
export {
|
||||
WakuLightPush,
|
||||
LightPushCodec,
|
||||
PushResponse,
|
||||
} from './lib/waku_light_push';
|
||||
|
||||
export { WakuRelay, RelayCodec } from './lib/waku_relay';
|
||||
|
||||
export { WakuStore, StoreCodec } from './lib/waku_store';
|
||||
|
|
|
@ -7,6 +7,7 @@ import filters from 'libp2p-websockets/src/filters';
|
|||
import { Multiaddr, multiaddr } from 'multiaddr';
|
||||
import PeerId from 'peer-id';
|
||||
|
||||
import { WakuLightPush } from './waku_light_push';
|
||||
import { RelayCodec, WakuRelay } from './waku_relay';
|
||||
import { StoreCodec, WakuStore } from './waku_store';
|
||||
|
||||
|
@ -25,11 +26,17 @@ export class Waku {
|
|||
public libp2p: Libp2p;
|
||||
public relay: WakuRelay;
|
||||
public store: WakuStore;
|
||||
public lightPush: WakuLightPush;
|
||||
|
||||
private constructor(libp2p: Libp2p, store: WakuStore) {
|
||||
private constructor(
|
||||
libp2p: Libp2p,
|
||||
store: WakuStore,
|
||||
lightPush: WakuLightPush
|
||||
) {
|
||||
this.libp2p = libp2p;
|
||||
this.relay = (libp2p.pubsub as unknown) as WakuRelay;
|
||||
this.store = store;
|
||||
this.lightPush = lightPush;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -84,10 +91,11 @@ export class Waku {
|
|||
});
|
||||
|
||||
const wakuStore = new WakuStore(libp2p);
|
||||
const wakuLightPush = new WakuLightPush(libp2p);
|
||||
|
||||
await libp2p.start();
|
||||
|
||||
return new Waku(libp2p, wakuStore);
|
||||
return new Waku(libp2p, wakuStore, wakuLightPush);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -0,0 +1,54 @@
|
|||
import { expect } from 'chai';
|
||||
import TCP from 'libp2p-tcp';
|
||||
|
||||
import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils';
|
||||
import { delay } from '../delay';
|
||||
import { Waku } from '../waku';
|
||||
import { WakuMessage } from '../waku_message';
|
||||
|
||||
describe('Waku Light Push', () => {
|
||||
let waku: Waku;
|
||||
let nimWaku: NimWaku;
|
||||
|
||||
afterEach(async function () {
|
||||
nimWaku ? nimWaku.stop() : null;
|
||||
waku ? await waku.stop() : null;
|
||||
});
|
||||
|
||||
it('Push successfully', async function () {
|
||||
this.timeout(5_000);
|
||||
|
||||
nimWaku = new NimWaku(makeLogFileName(this));
|
||||
await nimWaku.start({ lightpush: true });
|
||||
|
||||
waku = await Waku.create({
|
||||
staticNoiseKey: NOISE_KEY_1,
|
||||
modules: { transport: [TCP] },
|
||||
});
|
||||
await waku.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
// Wait for identify protocol to finish
|
||||
await new Promise((resolve) => {
|
||||
waku.libp2p.peerStore.once('change:protocols', resolve);
|
||||
});
|
||||
|
||||
const nimPeerId = await nimWaku.getPeerId();
|
||||
|
||||
const messageText = 'Light Push works!';
|
||||
const message = WakuMessage.fromUtf8String(messageText);
|
||||
|
||||
const pushResponse = await waku.lightPush.push(nimPeerId, message);
|
||||
expect(pushResponse?.isSuccess).to.be.true;
|
||||
|
||||
let msgs: WakuMessage[] = [];
|
||||
|
||||
while (msgs.length === 0) {
|
||||
await delay(200);
|
||||
msgs = await nimWaku.messages();
|
||||
}
|
||||
|
||||
expect(msgs[0].contentTopic).to.equal(message.contentTopic);
|
||||
expect(msgs[0].version).to.equal(message.version);
|
||||
expect(msgs[0].payloadAsUtf8).to.equal(messageText);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,62 @@
|
|||
import concat from 'it-concat';
|
||||
import lp from 'it-length-prefixed';
|
||||
import pipe from 'it-pipe';
|
||||
import Libp2p from 'libp2p';
|
||||
import PeerId from 'peer-id';
|
||||
|
||||
import { PushResponse } from '../../proto/waku/v2/light_push';
|
||||
import { WakuMessage } from '../waku_message';
|
||||
import { DefaultPubsubTopic } from '../waku_relay';
|
||||
|
||||
import { PushRPC } from './push_rpc';
|
||||
|
||||
export const LightPushCodec = '/vac/waku/lightpush/2.0.0-alpha1';
|
||||
export { PushResponse };
|
||||
|
||||
/**
|
||||
* Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/).
|
||||
*/
|
||||
export class WakuLightPush {
|
||||
constructor(public libp2p: Libp2p) {}
|
||||
|
||||
async push(
|
||||
peerId: PeerId,
|
||||
message: WakuMessage,
|
||||
pubsubTopic: string = DefaultPubsubTopic
|
||||
): Promise<PushResponse | null> {
|
||||
const peer = this.libp2p.peerStore.get(peerId);
|
||||
if (!peer) throw 'Peer is unknown';
|
||||
if (!peer.protocols.includes(LightPushCodec))
|
||||
throw 'Peer does not register waku light push protocol';
|
||||
|
||||
const connection = this.libp2p.connectionManager.get(peer.id);
|
||||
if (!connection) throw 'Failed to get a connection to the peer';
|
||||
|
||||
const { stream } = await connection.newStream(LightPushCodec);
|
||||
try {
|
||||
const query = PushRPC.createRequest(message, pubsubTopic);
|
||||
const res = await pipe(
|
||||
[query.encode()],
|
||||
lp.encode(),
|
||||
stream,
|
||||
lp.decode(),
|
||||
concat
|
||||
);
|
||||
try {
|
||||
const response = PushRPC.decode(res.slice()).response;
|
||||
|
||||
if (!response) {
|
||||
console.log('No response in PushRPC');
|
||||
return null;
|
||||
}
|
||||
|
||||
return response;
|
||||
} catch (err) {
|
||||
console.log('Failed to decode push reply', err);
|
||||
}
|
||||
} catch (err) {
|
||||
console.log('Failed to send waku light push request', err);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,41 @@
|
|||
import { Reader } from 'protobufjs/minimal';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import * as proto from '../../proto/waku/v2/light_push';
|
||||
import { WakuMessage } from '../waku_message';
|
||||
import { DefaultPubsubTopic } from '../waku_relay';
|
||||
|
||||
export class PushRPC {
|
||||
public constructor(public proto: proto.PushRPC) {}
|
||||
|
||||
static createRequest(
|
||||
message: WakuMessage,
|
||||
pubsubTopic: string = DefaultPubsubTopic
|
||||
): PushRPC {
|
||||
return new PushRPC({
|
||||
requestId: uuid(),
|
||||
request: {
|
||||
message,
|
||||
pubsubTopic,
|
||||
},
|
||||
response: undefined,
|
||||
});
|
||||
}
|
||||
|
||||
static decode(bytes: Uint8Array): PushRPC {
|
||||
const res = proto.PushRPC.decode(Reader.create(bytes));
|
||||
return new PushRPC(res);
|
||||
}
|
||||
|
||||
encode(): Uint8Array {
|
||||
return proto.PushRPC.encode(this.proto).finish();
|
||||
}
|
||||
|
||||
get query(): proto.PushRequest | undefined {
|
||||
return this.proto.request;
|
||||
}
|
||||
|
||||
get response(): proto.PushResponse | undefined {
|
||||
return this.proto.response;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,297 @@
|
|||
/* eslint-disable */
|
||||
import Long from 'long';
|
||||
import _m0 from 'protobufjs/minimal';
|
||||
import { WakuMessage } from '../../waku/v2/message';
|
||||
|
||||
export const protobufPackage = 'waku.v2';
|
||||
|
||||
export interface PushRequest {
|
||||
pubsubTopic: string;
|
||||
message: WakuMessage | undefined;
|
||||
}
|
||||
|
||||
export interface PushResponse {
|
||||
isSuccess: boolean;
|
||||
info: string;
|
||||
}
|
||||
|
||||
export interface PushRPC {
|
||||
requestId: string;
|
||||
request: PushRequest | undefined;
|
||||
response: PushResponse | undefined;
|
||||
}
|
||||
|
||||
const basePushRequest: object = { pubsubTopic: '' };
|
||||
|
||||
export const PushRequest = {
|
||||
encode(
|
||||
message: PushRequest,
|
||||
writer: _m0.Writer = _m0.Writer.create()
|
||||
): _m0.Writer {
|
||||
if (message.pubsubTopic !== '') {
|
||||
writer.uint32(10).string(message.pubsubTopic);
|
||||
}
|
||||
if (message.message !== undefined) {
|
||||
WakuMessage.encode(message.message, writer.uint32(18).fork()).ldelim();
|
||||
}
|
||||
return writer;
|
||||
},
|
||||
|
||||
decode(input: _m0.Reader | Uint8Array, length?: number): PushRequest {
|
||||
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
|
||||
let end = length === undefined ? reader.len : reader.pos + length;
|
||||
const message = { ...basePushRequest } as PushRequest;
|
||||
while (reader.pos < end) {
|
||||
const tag = reader.uint32();
|
||||
switch (tag >>> 3) {
|
||||
case 1:
|
||||
message.pubsubTopic = reader.string();
|
||||
break;
|
||||
case 2:
|
||||
message.message = WakuMessage.decode(reader, reader.uint32());
|
||||
break;
|
||||
default:
|
||||
reader.skipType(tag & 7);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return message;
|
||||
},
|
||||
|
||||
fromJSON(object: any): PushRequest {
|
||||
const message = { ...basePushRequest } as PushRequest;
|
||||
if (object.pubsubTopic !== undefined && object.pubsubTopic !== null) {
|
||||
message.pubsubTopic = String(object.pubsubTopic);
|
||||
} else {
|
||||
message.pubsubTopic = '';
|
||||
}
|
||||
if (object.message !== undefined && object.message !== null) {
|
||||
message.message = WakuMessage.fromJSON(object.message);
|
||||
} else {
|
||||
message.message = undefined;
|
||||
}
|
||||
return message;
|
||||
},
|
||||
|
||||
toJSON(message: PushRequest): unknown {
|
||||
const obj: any = {};
|
||||
message.pubsubTopic !== undefined &&
|
||||
(obj.pubsubTopic = message.pubsubTopic);
|
||||
message.message !== undefined &&
|
||||
(obj.message = message.message
|
||||
? WakuMessage.toJSON(message.message)
|
||||
: undefined);
|
||||
return obj;
|
||||
},
|
||||
|
||||
fromPartial(object: DeepPartial<PushRequest>): PushRequest {
|
||||
const message = { ...basePushRequest } as PushRequest;
|
||||
if (object.pubsubTopic !== undefined && object.pubsubTopic !== null) {
|
||||
message.pubsubTopic = object.pubsubTopic;
|
||||
} else {
|
||||
message.pubsubTopic = '';
|
||||
}
|
||||
if (object.message !== undefined && object.message !== null) {
|
||||
message.message = WakuMessage.fromPartial(object.message);
|
||||
} else {
|
||||
message.message = undefined;
|
||||
}
|
||||
return message;
|
||||
},
|
||||
};
|
||||
|
||||
const basePushResponse: object = { isSuccess: false, info: '' };
|
||||
|
||||
export const PushResponse = {
|
||||
encode(
|
||||
message: PushResponse,
|
||||
writer: _m0.Writer = _m0.Writer.create()
|
||||
): _m0.Writer {
|
||||
if (message.isSuccess === true) {
|
||||
writer.uint32(8).bool(message.isSuccess);
|
||||
}
|
||||
if (message.info !== '') {
|
||||
writer.uint32(18).string(message.info);
|
||||
}
|
||||
return writer;
|
||||
},
|
||||
|
||||
decode(input: _m0.Reader | Uint8Array, length?: number): PushResponse {
|
||||
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
|
||||
let end = length === undefined ? reader.len : reader.pos + length;
|
||||
const message = { ...basePushResponse } as PushResponse;
|
||||
while (reader.pos < end) {
|
||||
const tag = reader.uint32();
|
||||
switch (tag >>> 3) {
|
||||
case 1:
|
||||
message.isSuccess = reader.bool();
|
||||
break;
|
||||
case 2:
|
||||
message.info = reader.string();
|
||||
break;
|
||||
default:
|
||||
reader.skipType(tag & 7);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return message;
|
||||
},
|
||||
|
||||
fromJSON(object: any): PushResponse {
|
||||
const message = { ...basePushResponse } as PushResponse;
|
||||
if (object.isSuccess !== undefined && object.isSuccess !== null) {
|
||||
message.isSuccess = Boolean(object.isSuccess);
|
||||
} else {
|
||||
message.isSuccess = false;
|
||||
}
|
||||
if (object.info !== undefined && object.info !== null) {
|
||||
message.info = String(object.info);
|
||||
} else {
|
||||
message.info = '';
|
||||
}
|
||||
return message;
|
||||
},
|
||||
|
||||
toJSON(message: PushResponse): unknown {
|
||||
const obj: any = {};
|
||||
message.isSuccess !== undefined && (obj.isSuccess = message.isSuccess);
|
||||
message.info !== undefined && (obj.info = message.info);
|
||||
return obj;
|
||||
},
|
||||
|
||||
fromPartial(object: DeepPartial<PushResponse>): PushResponse {
|
||||
const message = { ...basePushResponse } as PushResponse;
|
||||
if (object.isSuccess !== undefined && object.isSuccess !== null) {
|
||||
message.isSuccess = object.isSuccess;
|
||||
} else {
|
||||
message.isSuccess = false;
|
||||
}
|
||||
if (object.info !== undefined && object.info !== null) {
|
||||
message.info = object.info;
|
||||
} else {
|
||||
message.info = '';
|
||||
}
|
||||
return message;
|
||||
},
|
||||
};
|
||||
|
||||
const basePushRPC: object = { requestId: '' };
|
||||
|
||||
export const PushRPC = {
|
||||
encode(
|
||||
message: PushRPC,
|
||||
writer: _m0.Writer = _m0.Writer.create()
|
||||
): _m0.Writer {
|
||||
if (message.requestId !== '') {
|
||||
writer.uint32(10).string(message.requestId);
|
||||
}
|
||||
if (message.request !== undefined) {
|
||||
PushRequest.encode(message.request, writer.uint32(18).fork()).ldelim();
|
||||
}
|
||||
if (message.response !== undefined) {
|
||||
PushResponse.encode(message.response, writer.uint32(26).fork()).ldelim();
|
||||
}
|
||||
return writer;
|
||||
},
|
||||
|
||||
decode(input: _m0.Reader | Uint8Array, length?: number): PushRPC {
|
||||
const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input);
|
||||
let end = length === undefined ? reader.len : reader.pos + length;
|
||||
const message = { ...basePushRPC } as PushRPC;
|
||||
while (reader.pos < end) {
|
||||
const tag = reader.uint32();
|
||||
switch (tag >>> 3) {
|
||||
case 1:
|
||||
message.requestId = reader.string();
|
||||
break;
|
||||
case 2:
|
||||
message.request = PushRequest.decode(reader, reader.uint32());
|
||||
break;
|
||||
case 3:
|
||||
message.response = PushResponse.decode(reader, reader.uint32());
|
||||
break;
|
||||
default:
|
||||
reader.skipType(tag & 7);
|
||||
break;
|
||||
}
|
||||
}
|
||||
return message;
|
||||
},
|
||||
|
||||
fromJSON(object: any): PushRPC {
|
||||
const message = { ...basePushRPC } as PushRPC;
|
||||
if (object.requestId !== undefined && object.requestId !== null) {
|
||||
message.requestId = String(object.requestId);
|
||||
} else {
|
||||
message.requestId = '';
|
||||
}
|
||||
if (object.request !== undefined && object.request !== null) {
|
||||
message.request = PushRequest.fromJSON(object.request);
|
||||
} else {
|
||||
message.request = undefined;
|
||||
}
|
||||
if (object.response !== undefined && object.response !== null) {
|
||||
message.response = PushResponse.fromJSON(object.response);
|
||||
} else {
|
||||
message.response = undefined;
|
||||
}
|
||||
return message;
|
||||
},
|
||||
|
||||
toJSON(message: PushRPC): unknown {
|
||||
const obj: any = {};
|
||||
message.requestId !== undefined && (obj.requestId = message.requestId);
|
||||
message.request !== undefined &&
|
||||
(obj.request = message.request
|
||||
? PushRequest.toJSON(message.request)
|
||||
: undefined);
|
||||
message.response !== undefined &&
|
||||
(obj.response = message.response
|
||||
? PushResponse.toJSON(message.response)
|
||||
: undefined);
|
||||
return obj;
|
||||
},
|
||||
|
||||
fromPartial(object: DeepPartial<PushRPC>): PushRPC {
|
||||
const message = { ...basePushRPC } as PushRPC;
|
||||
if (object.requestId !== undefined && object.requestId !== null) {
|
||||
message.requestId = object.requestId;
|
||||
} else {
|
||||
message.requestId = '';
|
||||
}
|
||||
if (object.request !== undefined && object.request !== null) {
|
||||
message.request = PushRequest.fromPartial(object.request);
|
||||
} else {
|
||||
message.request = undefined;
|
||||
}
|
||||
if (object.response !== undefined && object.response !== null) {
|
||||
message.response = PushResponse.fromPartial(object.response);
|
||||
} else {
|
||||
message.response = undefined;
|
||||
}
|
||||
return message;
|
||||
},
|
||||
};
|
||||
|
||||
type Builtin =
|
||||
| Date
|
||||
| Function
|
||||
| Uint8Array
|
||||
| string
|
||||
| number
|
||||
| boolean
|
||||
| undefined;
|
||||
export type DeepPartial<T> = T extends Builtin
|
||||
? T
|
||||
: T extends Array<infer U>
|
||||
? Array<DeepPartial<U>>
|
||||
: T extends ReadonlyArray<infer U>
|
||||
? ReadonlyArray<DeepPartial<U>>
|
||||
: T extends {}
|
||||
? { [K in keyof T]?: DeepPartial<T[K]> }
|
||||
: Partial<T>;
|
||||
|
||||
if (_m0.util.Long !== Long) {
|
||||
_m0.util.Long = Long as any;
|
||||
_m0.configure();
|
||||
}
|
|
@ -39,6 +39,7 @@ export interface Args {
|
|||
portsShift?: number;
|
||||
logLevel?: LogLevel;
|
||||
persistMessages?: boolean;
|
||||
lightpush?: boolean;
|
||||
}
|
||||
|
||||
export enum LogLevel {
|
||||
|
|
Loading…
Reference in New Issue