From de3aea626a2fe0dbbcaac1e743296ddf3922011d Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 19 May 2021 11:00:43 +1000 Subject: [PATCH] Implement Light Push protocol --- .cspell.json | 1 + CHANGELOG.md | 3 + README.md | 1 + proto/waku/v2/light_push.proto | 21 ++ src/index.ts | 6 + src/lib/waku.ts | 12 +- src/lib/waku_light_push/index.spec.ts | 54 +++++ src/lib/waku_light_push/index.ts | 62 ++++++ src/lib/waku_light_push/push_rpc.ts | 41 ++++ src/proto/waku/v2/light_push.ts | 297 ++++++++++++++++++++++++++ src/test_utils/nim_waku.ts | 1 + 11 files changed, 497 insertions(+), 2 deletions(-) create mode 100644 proto/waku/v2/light_push.proto create mode 100644 src/lib/waku_light_push/index.spec.ts create mode 100644 src/lib/waku_light_push/index.ts create mode 100644 src/lib/waku_light_push/push_rpc.ts create mode 100644 src/proto/waku/v2/light_push.ts diff --git a/.cspell.json b/.cspell.json index 382509d833..cf05fa86da 100644 --- a/.cspell.json +++ b/.cspell.json @@ -34,6 +34,7 @@ "lastpub", "libauth", "libp", + "lightpush", "livechat", "mkdir", "multiaddr", diff --git a/CHANGELOG.md b/CHANGELOG.md index a28b4af64a..e740caf87e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,9 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] +### Added +- Implement [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). + ## [0.4.0] - 2021-05-18 ### Added diff --git a/README.md b/README.md index a54bc2bf88..895f5e3f8e 100644 --- a/README.md +++ b/README.md @@ -131,6 +131,7 @@ You can track progress on the [project board](https://github.com/status-im/js-wa |[16/WAKU2-RPC](https://rfc.vac.dev/spec/16)|⛔| |[17/WAKU2-RLNRELAY](https://rfc.vac.dev/spec/17)|| |[18/WAKU2-SWAP](https://rfc.vac.dev/spec/18)|| +|[19/WAKU2-LIGHTPUSH](https://rfc.vac.dev/spec/19/)|✔| ## Bugs, Questions & Features diff --git a/proto/waku/v2/light_push.proto b/proto/waku/v2/light_push.proto new file mode 100644 index 0000000000..0200c9d2e9 --- /dev/null +++ b/proto/waku/v2/light_push.proto @@ -0,0 +1,21 @@ +syntax = "proto3"; + +package waku.v2; + +import "waku/v2/message.proto"; + +message PushRequest { + string pubsub_topic = 1; + WakuMessage message = 2; +} + +message PushResponse { + bool is_success = 1; + string info = 2; +} + +message PushRPC { + string request_id = 1; + PushRequest request = 2; + PushResponse response = 3; +} diff --git a/src/index.ts b/src/index.ts index a9194a64ec..ed957085b4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -5,6 +5,12 @@ export { WakuMessage } from './lib/waku_message'; export { ChatMessage } from './lib/chat_message'; +export { + WakuLightPush, + LightPushCodec, + PushResponse, +} from './lib/waku_light_push'; + export { WakuRelay, RelayCodec } from './lib/waku_relay'; export { WakuStore, StoreCodec } from './lib/waku_store'; diff --git a/src/lib/waku.ts b/src/lib/waku.ts index e5b944bb18..4184d092d5 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -7,6 +7,7 @@ import filters from 'libp2p-websockets/src/filters'; import { Multiaddr, multiaddr } from 'multiaddr'; import PeerId from 'peer-id'; +import { WakuLightPush } from './waku_light_push'; import { RelayCodec, WakuRelay } from './waku_relay'; import { StoreCodec, WakuStore } from './waku_store'; @@ -25,11 +26,17 @@ export class Waku { public libp2p: Libp2p; public relay: WakuRelay; public store: WakuStore; + public lightPush: WakuLightPush; - private constructor(libp2p: Libp2p, store: WakuStore) { + private constructor( + libp2p: Libp2p, + store: WakuStore, + lightPush: WakuLightPush + ) { this.libp2p = libp2p; this.relay = (libp2p.pubsub as unknown) as WakuRelay; this.store = store; + this.lightPush = lightPush; } /** @@ -84,10 +91,11 @@ export class Waku { }); const wakuStore = new WakuStore(libp2p); + const wakuLightPush = new WakuLightPush(libp2p); await libp2p.start(); - return new Waku(libp2p, wakuStore); + return new Waku(libp2p, wakuStore, wakuLightPush); } /** diff --git a/src/lib/waku_light_push/index.spec.ts b/src/lib/waku_light_push/index.spec.ts new file mode 100644 index 0000000000..a6f0450498 --- /dev/null +++ b/src/lib/waku_light_push/index.spec.ts @@ -0,0 +1,54 @@ +import { expect } from 'chai'; +import TCP from 'libp2p-tcp'; + +import { makeLogFileName, NimWaku, NOISE_KEY_1 } from '../../test_utils'; +import { delay } from '../delay'; +import { Waku } from '../waku'; +import { WakuMessage } from '../waku_message'; + +describe('Waku Light Push', () => { + let waku: Waku; + let nimWaku: NimWaku; + + afterEach(async function () { + nimWaku ? nimWaku.stop() : null; + waku ? await waku.stop() : null; + }); + + it('Push successfully', async function () { + this.timeout(5_000); + + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ lightpush: true }); + + waku = await Waku.create({ + staticNoiseKey: NOISE_KEY_1, + modules: { transport: [TCP] }, + }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + // Wait for identify protocol to finish + await new Promise((resolve) => { + waku.libp2p.peerStore.once('change:protocols', resolve); + }); + + const nimPeerId = await nimWaku.getPeerId(); + + const messageText = 'Light Push works!'; + const message = WakuMessage.fromUtf8String(messageText); + + const pushResponse = await waku.lightPush.push(nimPeerId, message); + expect(pushResponse?.isSuccess).to.be.true; + + let msgs: WakuMessage[] = []; + + while (msgs.length === 0) { + await delay(200); + msgs = await nimWaku.messages(); + } + + expect(msgs[0].contentTopic).to.equal(message.contentTopic); + expect(msgs[0].version).to.equal(message.version); + expect(msgs[0].payloadAsUtf8).to.equal(messageText); + }); +}); diff --git a/src/lib/waku_light_push/index.ts b/src/lib/waku_light_push/index.ts new file mode 100644 index 0000000000..f2189ec17e --- /dev/null +++ b/src/lib/waku_light_push/index.ts @@ -0,0 +1,62 @@ +import concat from 'it-concat'; +import lp from 'it-length-prefixed'; +import pipe from 'it-pipe'; +import Libp2p from 'libp2p'; +import PeerId from 'peer-id'; + +import { PushResponse } from '../../proto/waku/v2/light_push'; +import { WakuMessage } from '../waku_message'; +import { DefaultPubsubTopic } from '../waku_relay'; + +import { PushRPC } from './push_rpc'; + +export const LightPushCodec = '/vac/waku/lightpush/2.0.0-alpha1'; +export { PushResponse }; + +/** + * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). + */ +export class WakuLightPush { + constructor(public libp2p: Libp2p) {} + + async push( + peerId: PeerId, + message: WakuMessage, + pubsubTopic: string = DefaultPubsubTopic + ): Promise { + const peer = this.libp2p.peerStore.get(peerId); + if (!peer) throw 'Peer is unknown'; + if (!peer.protocols.includes(LightPushCodec)) + throw 'Peer does not register waku light push protocol'; + + const connection = this.libp2p.connectionManager.get(peer.id); + if (!connection) throw 'Failed to get a connection to the peer'; + + const { stream } = await connection.newStream(LightPushCodec); + try { + const query = PushRPC.createRequest(message, pubsubTopic); + const res = await pipe( + [query.encode()], + lp.encode(), + stream, + lp.decode(), + concat + ); + try { + const response = PushRPC.decode(res.slice()).response; + + if (!response) { + console.log('No response in PushRPC'); + return null; + } + + return response; + } catch (err) { + console.log('Failed to decode push reply', err); + } + } catch (err) { + console.log('Failed to send waku light push request', err); + } + return null; + } +} diff --git a/src/lib/waku_light_push/push_rpc.ts b/src/lib/waku_light_push/push_rpc.ts new file mode 100644 index 0000000000..118fc3b0e0 --- /dev/null +++ b/src/lib/waku_light_push/push_rpc.ts @@ -0,0 +1,41 @@ +import { Reader } from 'protobufjs/minimal'; +import { v4 as uuid } from 'uuid'; + +import * as proto from '../../proto/waku/v2/light_push'; +import { WakuMessage } from '../waku_message'; +import { DefaultPubsubTopic } from '../waku_relay'; + +export class PushRPC { + public constructor(public proto: proto.PushRPC) {} + + static createRequest( + message: WakuMessage, + pubsubTopic: string = DefaultPubsubTopic + ): PushRPC { + return new PushRPC({ + requestId: uuid(), + request: { + message, + pubsubTopic, + }, + response: undefined, + }); + } + + static decode(bytes: Uint8Array): PushRPC { + const res = proto.PushRPC.decode(Reader.create(bytes)); + return new PushRPC(res); + } + + encode(): Uint8Array { + return proto.PushRPC.encode(this.proto).finish(); + } + + get query(): proto.PushRequest | undefined { + return this.proto.request; + } + + get response(): proto.PushResponse | undefined { + return this.proto.response; + } +} diff --git a/src/proto/waku/v2/light_push.ts b/src/proto/waku/v2/light_push.ts new file mode 100644 index 0000000000..74dc68b013 --- /dev/null +++ b/src/proto/waku/v2/light_push.ts @@ -0,0 +1,297 @@ +/* eslint-disable */ +import Long from 'long'; +import _m0 from 'protobufjs/minimal'; +import { WakuMessage } from '../../waku/v2/message'; + +export const protobufPackage = 'waku.v2'; + +export interface PushRequest { + pubsubTopic: string; + message: WakuMessage | undefined; +} + +export interface PushResponse { + isSuccess: boolean; + info: string; +} + +export interface PushRPC { + requestId: string; + request: PushRequest | undefined; + response: PushResponse | undefined; +} + +const basePushRequest: object = { pubsubTopic: '' }; + +export const PushRequest = { + encode( + message: PushRequest, + writer: _m0.Writer = _m0.Writer.create() + ): _m0.Writer { + if (message.pubsubTopic !== '') { + writer.uint32(10).string(message.pubsubTopic); + } + if (message.message !== undefined) { + WakuMessage.encode(message.message, writer.uint32(18).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PushRequest { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = { ...basePushRequest } as PushRequest; + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.pubsubTopic = reader.string(); + break; + case 2: + message.message = WakuMessage.decode(reader, reader.uint32()); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): PushRequest { + const message = { ...basePushRequest } as PushRequest; + if (object.pubsubTopic !== undefined && object.pubsubTopic !== null) { + message.pubsubTopic = String(object.pubsubTopic); + } else { + message.pubsubTopic = ''; + } + if (object.message !== undefined && object.message !== null) { + message.message = WakuMessage.fromJSON(object.message); + } else { + message.message = undefined; + } + return message; + }, + + toJSON(message: PushRequest): unknown { + const obj: any = {}; + message.pubsubTopic !== undefined && + (obj.pubsubTopic = message.pubsubTopic); + message.message !== undefined && + (obj.message = message.message + ? WakuMessage.toJSON(message.message) + : undefined); + return obj; + }, + + fromPartial(object: DeepPartial): PushRequest { + const message = { ...basePushRequest } as PushRequest; + if (object.pubsubTopic !== undefined && object.pubsubTopic !== null) { + message.pubsubTopic = object.pubsubTopic; + } else { + message.pubsubTopic = ''; + } + if (object.message !== undefined && object.message !== null) { + message.message = WakuMessage.fromPartial(object.message); + } else { + message.message = undefined; + } + return message; + }, +}; + +const basePushResponse: object = { isSuccess: false, info: '' }; + +export const PushResponse = { + encode( + message: PushResponse, + writer: _m0.Writer = _m0.Writer.create() + ): _m0.Writer { + if (message.isSuccess === true) { + writer.uint32(8).bool(message.isSuccess); + } + if (message.info !== '') { + writer.uint32(18).string(message.info); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PushResponse { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = { ...basePushResponse } as PushResponse; + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.isSuccess = reader.bool(); + break; + case 2: + message.info = reader.string(); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): PushResponse { + const message = { ...basePushResponse } as PushResponse; + if (object.isSuccess !== undefined && object.isSuccess !== null) { + message.isSuccess = Boolean(object.isSuccess); + } else { + message.isSuccess = false; + } + if (object.info !== undefined && object.info !== null) { + message.info = String(object.info); + } else { + message.info = ''; + } + return message; + }, + + toJSON(message: PushResponse): unknown { + const obj: any = {}; + message.isSuccess !== undefined && (obj.isSuccess = message.isSuccess); + message.info !== undefined && (obj.info = message.info); + return obj; + }, + + fromPartial(object: DeepPartial): PushResponse { + const message = { ...basePushResponse } as PushResponse; + if (object.isSuccess !== undefined && object.isSuccess !== null) { + message.isSuccess = object.isSuccess; + } else { + message.isSuccess = false; + } + if (object.info !== undefined && object.info !== null) { + message.info = object.info; + } else { + message.info = ''; + } + return message; + }, +}; + +const basePushRPC: object = { requestId: '' }; + +export const PushRPC = { + encode( + message: PushRPC, + writer: _m0.Writer = _m0.Writer.create() + ): _m0.Writer { + if (message.requestId !== '') { + writer.uint32(10).string(message.requestId); + } + if (message.request !== undefined) { + PushRequest.encode(message.request, writer.uint32(18).fork()).ldelim(); + } + if (message.response !== undefined) { + PushResponse.encode(message.response, writer.uint32(26).fork()).ldelim(); + } + return writer; + }, + + decode(input: _m0.Reader | Uint8Array, length?: number): PushRPC { + const reader = input instanceof _m0.Reader ? input : new _m0.Reader(input); + let end = length === undefined ? reader.len : reader.pos + length; + const message = { ...basePushRPC } as PushRPC; + while (reader.pos < end) { + const tag = reader.uint32(); + switch (tag >>> 3) { + case 1: + message.requestId = reader.string(); + break; + case 2: + message.request = PushRequest.decode(reader, reader.uint32()); + break; + case 3: + message.response = PushResponse.decode(reader, reader.uint32()); + break; + default: + reader.skipType(tag & 7); + break; + } + } + return message; + }, + + fromJSON(object: any): PushRPC { + const message = { ...basePushRPC } as PushRPC; + if (object.requestId !== undefined && object.requestId !== null) { + message.requestId = String(object.requestId); + } else { + message.requestId = ''; + } + if (object.request !== undefined && object.request !== null) { + message.request = PushRequest.fromJSON(object.request); + } else { + message.request = undefined; + } + if (object.response !== undefined && object.response !== null) { + message.response = PushResponse.fromJSON(object.response); + } else { + message.response = undefined; + } + return message; + }, + + toJSON(message: PushRPC): unknown { + const obj: any = {}; + message.requestId !== undefined && (obj.requestId = message.requestId); + message.request !== undefined && + (obj.request = message.request + ? PushRequest.toJSON(message.request) + : undefined); + message.response !== undefined && + (obj.response = message.response + ? PushResponse.toJSON(message.response) + : undefined); + return obj; + }, + + fromPartial(object: DeepPartial): PushRPC { + const message = { ...basePushRPC } as PushRPC; + if (object.requestId !== undefined && object.requestId !== null) { + message.requestId = object.requestId; + } else { + message.requestId = ''; + } + if (object.request !== undefined && object.request !== null) { + message.request = PushRequest.fromPartial(object.request); + } else { + message.request = undefined; + } + if (object.response !== undefined && object.response !== null) { + message.response = PushResponse.fromPartial(object.response); + } else { + message.response = undefined; + } + return message; + }, +}; + +type Builtin = + | Date + | Function + | Uint8Array + | string + | number + | boolean + | undefined; +export type DeepPartial = T extends Builtin + ? T + : T extends Array + ? Array> + : T extends ReadonlyArray + ? ReadonlyArray> + : T extends {} + ? { [K in keyof T]?: DeepPartial } + : Partial; + +if (_m0.util.Long !== Long) { + _m0.util.Long = Long as any; + _m0.configure(); +} diff --git a/src/test_utils/nim_waku.ts b/src/test_utils/nim_waku.ts index b20d3d5cb6..0f43c5066a 100644 --- a/src/test_utils/nim_waku.ts +++ b/src/test_utils/nim_waku.ts @@ -39,6 +39,7 @@ export interface Args { portsShift?: number; logLevel?: LogLevel; persistMessages?: boolean; + lightpush?: boolean; } export enum LogLevel {