From e9d51a6c5705de857183be6877c9cecec2198050 Mon Sep 17 00:00:00 2001 From: Franck Royer Date: Wed, 7 Apr 2021 11:04:30 +1000 Subject: [PATCH] Implement Waku store protocol --- .cspell.json | 5 +- package-lock.json | 35 +++++++++- package.json | 4 ++ proto/waku/v2/{waku.proto => message.proto} | 0 proto/waku/v2/store.proto | 45 +++++++++++++ src/chat/index.ts | 49 ++++++++++---- src/lib/waku.ts | 15 +++-- src/lib/waku_message.ts | 17 +++-- src/lib/waku_store/history_rpc.ts | 35 ++++++++++ src/lib/waku_store/index.spec.ts | 73 +++++++++++++++++++++ src/lib/waku_store/index.ts | 58 ++++++++++++++++ src/test_utils/nim_waku.ts | 1 + 12 files changed, 315 insertions(+), 22 deletions(-) rename proto/waku/v2/{waku.proto => message.proto} (100%) create mode 100644 proto/waku/v2/store.proto create mode 100644 src/lib/waku_store/history_rpc.ts create mode 100644 src/lib/waku_store/index.spec.ts create mode 100644 src/lib/waku_store/index.ts diff --git a/.cspell.json b/.cspell.json index 39207f7fb3..966aaf6880 100644 --- a/.cspell.json +++ b/.cspell.json @@ -13,6 +13,7 @@ "codecov", "commitlint", "dependabot", + "dingpu", "Dlazy", "Dout", "Dscore", @@ -36,6 +37,7 @@ "multiaddrs", "multicodecs", "mplex", + "muxed", "muxer", "nodekey", "peerhave", @@ -64,6 +66,7 @@ "tsconfig.json", "node_modules/**", "build", - "gen" + "gen", + "proto" ] } diff --git a/package-lock.json b/package-lock.json index f8a884f53e..a2099c2e6c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,16 +1,17 @@ { - "name": "waku-js-chat", + "name": "js-waku", "version": "1.0.0", "lockfileVersion": 2, "requires": true, "packages": { "": { - "name": "waku-js-chat", "version": "1.0.0", "license": "MIT", "dependencies": { "@bitauth/libauth": "^1.17.1", "debug": "^4.3.1", + "it-concat": "^1.0.3", + "it-length-prefixed": "^3.1.0", "libp2p": "^0.30.0", "libp2p-gossipsub": "^0.7.0", "libp2p-mplex": "^0.10.2", @@ -20,6 +21,7 @@ "multiaddr": "^8.1.2", "prompt-sync": "^4.2.0", "ts-proto": "^1.74.0", + "uuid": "^3.4.0", "yarg": "^1.0.8" }, "devDependencies": { @@ -31,6 +33,7 @@ "@types/mocha": "^8.2.2", "@types/node": "^14.14.31", "@types/tail": "^2.0.0", + "@types/uuid": "^8.3.0", "@typescript-eslint/eslint-plugin": "^4.0.1", "@typescript-eslint/parser": "^4.0.1", "app-root-path": "^3.0.0", @@ -1054,6 +1057,12 @@ "integrity": "sha512-TYTfnILhrZUAZKGNgot5+sBDap7oPIzV3818p7g4VhKGc81+/eoEZ93wKBTGnSg/tpDjzWSb8Wx5E737FCH/Sw==", "dev": true }, + "node_modules/@types/uuid": { + "version": "8.3.0", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-8.3.0.tgz", + "integrity": "sha512-eQ9qFW/fhfGJF8WKHGEHZEyVWfZxrT+6CLIJGBcZPfxUh/+BnEj+UCGYMlr9qZuX/2AltsvwrGqp0LhEW8D0zQ==", + "dev": true + }, "node_modules/@typescript-eslint/eslint-plugin": { "version": "4.16.1", "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-4.16.1.tgz", @@ -7516,6 +7525,14 @@ "buffer": "^5.5.0" } }, + "node_modules/it-concat": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/it-concat/-/it-concat-1.0.3.tgz", + "integrity": "sha512-sjeZQ1BWQ9U/W2oI09kZgUyvSWzQahTkOkLIsnEPgyqZFaF9ME5gV6An4nMjlyhXKWQMKEakQU8oRHs2SdmeyA==", + "dependencies": { + "bl": "^4.0.0" + } + }, "node_modules/it-drain": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/it-drain/-/it-drain-1.0.4.tgz", @@ -14367,6 +14384,12 @@ "integrity": "sha512-TYTfnILhrZUAZKGNgot5+sBDap7oPIzV3818p7g4VhKGc81+/eoEZ93wKBTGnSg/tpDjzWSb8Wx5E737FCH/Sw==", "dev": true }, + "@types/uuid": { + "version": "8.3.0", + "resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-8.3.0.tgz", + "integrity": "sha512-eQ9qFW/fhfGJF8WKHGEHZEyVWfZxrT+6CLIJGBcZPfxUh/+BnEj+UCGYMlr9qZuX/2AltsvwrGqp0LhEW8D0zQ==", + "dev": true + }, "@typescript-eslint/eslint-plugin": { "version": "4.16.1", "resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-4.16.1.tgz", @@ -19583,6 +19606,14 @@ "buffer": "^5.5.0" } }, + "it-concat": { + "version": "1.0.3", + "resolved": "https://registry.npmjs.org/it-concat/-/it-concat-1.0.3.tgz", + "integrity": "sha512-sjeZQ1BWQ9U/W2oI09kZgUyvSWzQahTkOkLIsnEPgyqZFaF9ME5gV6An4nMjlyhXKWQMKEakQU8oRHs2SdmeyA==", + "requires": { + "bl": "^4.0.0" + } + }, "it-drain": { "version": "1.0.4", "resolved": "https://registry.npmjs.org/it-drain/-/it-drain-1.0.4.tgz", diff --git a/package.json b/package.json index 4970d432d1..20f2bf2a6c 100644 --- a/package.json +++ b/package.json @@ -50,6 +50,8 @@ "dependencies": { "@bitauth/libauth": "^1.17.1", "debug": "^4.3.1", + "it-concat": "^1.0.3", + "it-length-prefixed": "^3.1.0", "libp2p": "^0.30.0", "libp2p-gossipsub": "^0.7.0", "libp2p-mplex": "^0.10.2", @@ -59,6 +61,7 @@ "multiaddr": "^8.1.2", "prompt-sync": "^4.2.0", "ts-proto": "^1.74.0", + "uuid": "^3.4.0", "yarg": "^1.0.8" }, "devDependencies": { @@ -70,6 +73,7 @@ "@types/mocha": "^8.2.2", "@types/node": "^14.14.31", "@types/tail": "^2.0.0", + "@types/uuid": "^8.3.0", "@typescript-eslint/eslint-plugin": "^4.0.1", "@typescript-eslint/parser": "^4.0.1", "app-root-path": "^3.0.0", diff --git a/proto/waku/v2/waku.proto b/proto/waku/v2/message.proto similarity index 100% rename from proto/waku/v2/waku.proto rename to proto/waku/v2/message.proto diff --git a/proto/waku/v2/store.proto b/proto/waku/v2/store.proto new file mode 100644 index 0000000000..b2d078b7b4 --- /dev/null +++ b/proto/waku/v2/store.proto @@ -0,0 +1,45 @@ +syntax = "proto3"; + +package waku.v2; + +import "waku/v2/message.proto"; + +message Index { + bytes digest = 1; + double received_time = 2; +} + +message PagingInfoQuery { + uint32 page_size = 1; // TODO: See https://github.com/vacp2p/rfc/issues/326 + Index cursor = 2; + Direction direction = 3; +} + +enum Direction { + DIRECTION_BACKWARD_UNSPECIFIED = 0; + DIRECTION_FORWARD = 1; +} + +message PagingInfoResponse { + uint64 page_size = 1; // TODO: See https://github.com/vacp2p/rfc/issues/326 + Index cursor = 2; + Direction direction = 3; +} + +message HistoryQuery { + repeated string topics = 1; + optional PagingInfoQuery paging_info = 2; + optional double start_time = 3; + optional double end_time = 4; +} + +message HistoryResponse { + repeated WakuMessageProto messages = 1; + PagingInfoResponse paging_info = 2; +} + +message HistoryRPC { + string request_id = 1; + HistoryQuery query = 2; + HistoryResponse response = 3; +} diff --git a/src/chat/index.ts b/src/chat/index.ts index 2b079380a2..d50dfa2f40 100644 --- a/src/chat/index.ts +++ b/src/chat/index.ts @@ -1,6 +1,9 @@ import readline from 'readline'; import util from 'util'; +import Multiaddr from 'multiaddr'; +import PeerId from 'peer-id'; + import Waku from '../lib/waku'; import { WakuMessage } from '../lib/waku_message'; import { RelayDefaultTopic } from '../lib/waku_relay'; @@ -8,6 +11,8 @@ import { delay } from '../test_utils/'; import { ChatMessage } from './chat_message'; +const ChatContentTopic = 'dingpu'; + (async function () { const opts = processArguments(); @@ -33,14 +38,7 @@ import { ChatMessage } from './chat_message'; const wakuMsg = WakuMessage.decode(event.data); if (wakuMsg.payload) { const chatMsg = ChatMessage.decode(wakuMsg.payload); - const timestamp = chatMsg.timestamp.toLocaleString([], { - month: 'short', - day: 'numeric', - hour: 'numeric', - minute: '2-digit', - hour12: false, - }); - console.log(`<${timestamp}> ${chatMsg.nick}: ${chatMsg.message}`); + printMessage(chatMsg); } }); @@ -68,19 +66,35 @@ import { ChatMessage } from './chat_message'; waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve) ); + const staticNodeId = opts.staticNode?.getPeerId(); + if (staticNodeId) { + const storePeerId = PeerId.createFromB58String(staticNodeId); + console.log( + `Retrieving archived messages from ${storePeerId.toB58String()}` + ); + const msg = await waku.store.queryHistory(storePeerId, [ChatContentTopic]); + msg?.messages.map((msg) => { + const wakuMsg = WakuMessage.fromProto(msg); + if (wakuMsg.payload) { + const chatMsg = ChatMessage.decode(wakuMsg.payload); + printMessage(chatMsg); + } + }); + } + console.log('Ready to chat!'); rl.prompt(); for await (const line of rl) { rl.prompt(); const chatMessage = new ChatMessage(new Date(), nick, line); - const msg = WakuMessage.fromBytes(chatMessage.encode()); + const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic); await waku.relay.publish(msg); } })(); interface Options { - staticNode?: string; + staticNode?: Multiaddr; listenAddr: string; } @@ -93,7 +107,9 @@ function processArguments(): Options { const arg = passedArgs.shift(); switch (arg) { case '--staticNode': - opts = Object.assign(opts, { staticNode: passedArgs.shift() }); + opts = Object.assign(opts, { + staticNode: new Multiaddr(passedArgs.shift()), + }); break; case '--listenAddr': opts = Object.assign(opts, { listenAddr: passedArgs.shift() }); @@ -106,3 +122,14 @@ function processArguments(): Options { return opts; } + +function printMessage(chatMsg: ChatMessage) { + const timestamp = chatMsg.timestamp.toLocaleString([], { + month: 'short', + day: 'numeric', + hour: 'numeric', + minute: '2-digit', + hour12: false, + }); + console.log(`<${timestamp}> ${chatMsg.nick}: ${chatMsg.message}`); +} diff --git a/src/lib/waku.ts b/src/lib/waku.ts index 22584aafcd..357d84537e 100644 --- a/src/lib/waku.ts +++ b/src/lib/waku.ts @@ -7,6 +7,7 @@ import Multiaddr from 'multiaddr'; import PeerId from 'peer-id'; import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay'; +import { StoreCodec, WakuStore } from './waku_store'; export interface CreateOptions { listenAddresses: string[]; @@ -14,7 +15,11 @@ export interface CreateOptions { } export default class Waku { - private constructor(public libp2p: Libp2p, public relay: WakuRelay) {} + private constructor( + public libp2p: Libp2p, + public relay: WakuRelay, + public store: WakuStore + ) {} /** * Create new waku node @@ -46,17 +51,19 @@ export default class Waku { }, }); + const wakuStore = new WakuStore(libp2p); + await libp2p.start(); - return new Waku(libp2p, new WakuRelay(libp2p.pubsub)); + return new Waku(libp2p, new WakuRelay(libp2p.pubsub), wakuStore); } /** - * Dials to the provided peer. If successful, the known metadata of the peer will be added to the nodes peerStore, and the Connection will be returned + * Dials to the provided peer. * @param peer The peer to dial */ async dial(peer: PeerId | Multiaddr | string) { - return this.libp2p.dialProtocol(peer, RelayCodec); + return this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]); } async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) { diff --git a/src/lib/waku_message.ts b/src/lib/waku_message.ts index b55420cfd2..81d372242d 100644 --- a/src/lib/waku_message.ts +++ b/src/lib/waku_message.ts @@ -2,18 +2,23 @@ import { Reader } from 'protobufjs/minimal'; // Protecting the user from protobuf oddities -import { WakuMessageProto } from '../proto/waku/v2/waku'; +import { WakuMessageProto } from '../proto/waku/v2/message'; -const DEFAULT_CONTENT_TOPIC = '/waku/2/default-content/proto'; +export const DEFAULT_CONTENT_TOPIC = '/waku/2/default-content/proto'; const DEFAULT_VERSION = 0; export class WakuMessage { + // TODO: Adopt similar design to HistoryRPC private constructor( public payload?: Uint8Array, public contentTopic?: string, public version?: number ) {} + static fromProto(proto: WakuMessageProto) { + return new WakuMessage(proto.payload, proto.contentTopic, proto.version); + } + /** * Create Message with a utf-8 string as payload * @param payload @@ -27,10 +32,14 @@ export class WakuMessage { /** * Create Message with a byte array as payload * @param payload + * @param contentTopic * @returns {WakuMessage} */ - static fromBytes(payload: Uint8Array): WakuMessage { - return new WakuMessage(payload, DEFAULT_CONTENT_TOPIC, DEFAULT_VERSION); + static fromBytes( + payload: Uint8Array, + contentTopic: string = DEFAULT_CONTENT_TOPIC + ): WakuMessage { + return new WakuMessage(payload, contentTopic, DEFAULT_VERSION); } static decode(bytes: Uint8Array): WakuMessage { diff --git a/src/lib/waku_store/history_rpc.ts b/src/lib/waku_store/history_rpc.ts new file mode 100644 index 0000000000..bee9c84343 --- /dev/null +++ b/src/lib/waku_store/history_rpc.ts @@ -0,0 +1,35 @@ +import { Reader } from 'protobufjs/minimal'; +import { v4 as uuid } from 'uuid'; + +import * as proto from '../../proto/waku/v2/store'; +import { DEFAULT_CONTENT_TOPIC } from '../waku_message'; + +export class HistoryRPC { + public constructor(public proto: proto.HistoryRPC) {} + + static query(topics: string[] = [DEFAULT_CONTENT_TOPIC]): HistoryRPC { + const pagingInfo = { + pageSize: 10, + cursor: undefined, + direction: proto.Direction.DIRECTION_BACKWARD_UNSPECIFIED, + }; + return new HistoryRPC({ + requestId: uuid(), + query: { topics, pagingInfo, startTime: undefined, endTime: undefined }, + response: undefined, + }); + } + + static decode(bytes: Uint8Array): HistoryRPC { + const res = proto.HistoryRPC.decode(Reader.create(bytes)); + return new HistoryRPC(res); + } + + encode(): Uint8Array { + return proto.HistoryRPC.encode(this.proto).finish(); + } + + get response(): proto.HistoryResponse | undefined { + return this.proto.response; + } +} diff --git a/src/lib/waku_store/index.spec.ts b/src/lib/waku_store/index.spec.ts new file mode 100644 index 0000000000..6ceffb691a --- /dev/null +++ b/src/lib/waku_store/index.spec.ts @@ -0,0 +1,73 @@ +import { expect } from 'chai'; + +import { + delay, + makeLogFileName, + NimWaku, + NOISE_KEY_1, + NOISE_KEY_2, +} from '../../test_utils'; +import Waku from '../waku'; +import { WakuMessage } from '../waku_message'; + +describe('Waku Store', () => { + let waku: Waku; + let nimWaku: NimWaku; + + beforeEach(async function () { + this.timeout(5_000); + + nimWaku = new NimWaku(makeLogFileName(this)); + await nimWaku.start({ store: true }); + + const waku0 = await Waku.create({ staticNoiseKey: NOISE_KEY_2 }); + await waku0.dial(await nimWaku.getMultiaddrWithId()); + + await delay(100); + await new Promise((resolve) => + waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve) + ); + + await waku0.relay.subscribe(); + + await new Promise((resolve) => + waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve) + ); + + await waku0.relay.publish( + WakuMessage.fromUtf8String('A message from relay.') + ); + + await nimWaku.sendMessage( + WakuMessage.fromUtf8String('Another message from json rpc.') + ); + + waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 }); + await waku.dial(await nimWaku.getMultiaddrWithId()); + + await delay(500); + }); + + afterEach(async function () { + nimWaku ? nimWaku.stop() : null; + waku ? await waku.stop() : null; + }); + + it('Retrieves history', async function () { + const nimPeerId = await nimWaku.getPeerId(); + + const response = await waku.store.queryHistory(nimPeerId); + const messages = response?.messages; + + // TODO: Should be fixed with https://github.com/status-im/nim-waku/issues/471 + // expect(messages?.length).eq(2); + const result = messages + ?.map((protoMsg) => { + return WakuMessage.fromProto(protoMsg); + }) + .findIndex((msg) => { + return msg.utf8Payload() === 'A message from relay.'; + }); + expect(result).to.not.eq(-1); + }); +}); diff --git a/src/lib/waku_store/index.ts b/src/lib/waku_store/index.ts new file mode 100644 index 0000000000..ce4341a758 --- /dev/null +++ b/src/lib/waku_store/index.ts @@ -0,0 +1,58 @@ +import concat from 'it-concat'; +import lp from 'it-length-prefixed'; +import pipe from 'it-pipe'; +import Libp2p from 'libp2p'; +import PeerId from 'peer-id'; + +import { HistoryRPC } from './history_rpc'; + +export const StoreCodec = '/vac/waku/store/2.0.0-beta1'; + +export class WakuStore { + constructor(public libp2p: Libp2p) {} + + /** + * Retrieve history from given peer + * @param peerId + * @param topics + * @throws if not able to reach peer + */ + async queryHistory(peerId: PeerId, topics?: string[]) { + const peer = this.libp2p.peerStore.get(peerId); + if (!peer) throw 'Peer is unknown'; + if (!peer.protocols.includes(StoreCodec)) + throw 'Peer does not register waku store protocol'; + const connection = this.libp2p.connectionManager.get(peer.id); + if (!connection) throw 'Failed to get a connection to the peer'; + + try { + const { stream } = await connection.newStream(StoreCodec); + + const historyRpc = HistoryRPC.query(topics).encode(); + try { + const res = await pipe( + [historyRpc], + lp.encode(), + stream, + lp.decode(), + concat + ); + const buf = res.slice(); + try { + const reply = HistoryRPC.decode(buf); + return reply.response; + } catch (err) { + console.log('Failed to decode store reply', err); + } + } catch (err) { + console.log('Failed to send waku store query', err); + } + } catch (err) { + console.log( + 'Failed to negotiate waku store protocol stream with peer', + err + ); + } + return null; + } +} diff --git a/src/test_utils/nim_waku.ts b/src/test_utils/nim_waku.ts index ba556cf547..9b70b11201 100644 --- a/src/test_utils/nim_waku.ts +++ b/src/test_utils/nim_waku.ts @@ -33,6 +33,7 @@ export interface Args { nodekey?: string; portsShift?: number; logLevel?: LogLevel; + store?: boolean; } export enum LogLevel {