Implement Waku store protocol

This commit is contained in:
Franck Royer 2021-04-07 11:04:30 +10:00
parent 3333a9c6c0
commit e9d51a6c57
No known key found for this signature in database
GPG Key ID: A82ED75A8DFC50A4
12 changed files with 315 additions and 22 deletions

View File

@ -13,6 +13,7 @@
"codecov",
"commitlint",
"dependabot",
"dingpu",
"Dlazy",
"Dout",
"Dscore",
@ -36,6 +37,7 @@
"multiaddrs",
"multicodecs",
"mplex",
"muxed",
"muxer",
"nodekey",
"peerhave",
@ -64,6 +66,7 @@
"tsconfig.json",
"node_modules/**",
"build",
"gen"
"gen",
"proto"
]
}

35
package-lock.json generated
View File

@ -1,16 +1,17 @@
{
"name": "waku-js-chat",
"name": "js-waku",
"version": "1.0.0",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"name": "waku-js-chat",
"version": "1.0.0",
"license": "MIT",
"dependencies": {
"@bitauth/libauth": "^1.17.1",
"debug": "^4.3.1",
"it-concat": "^1.0.3",
"it-length-prefixed": "^3.1.0",
"libp2p": "^0.30.0",
"libp2p-gossipsub": "^0.7.0",
"libp2p-mplex": "^0.10.2",
@ -20,6 +21,7 @@
"multiaddr": "^8.1.2",
"prompt-sync": "^4.2.0",
"ts-proto": "^1.74.0",
"uuid": "^3.4.0",
"yarg": "^1.0.8"
},
"devDependencies": {
@ -31,6 +33,7 @@
"@types/mocha": "^8.2.2",
"@types/node": "^14.14.31",
"@types/tail": "^2.0.0",
"@types/uuid": "^8.3.0",
"@typescript-eslint/eslint-plugin": "^4.0.1",
"@typescript-eslint/parser": "^4.0.1",
"app-root-path": "^3.0.0",
@ -1054,6 +1057,12 @@
"integrity": "sha512-TYTfnILhrZUAZKGNgot5+sBDap7oPIzV3818p7g4VhKGc81+/eoEZ93wKBTGnSg/tpDjzWSb8Wx5E737FCH/Sw==",
"dev": true
},
"node_modules/@types/uuid": {
"version": "8.3.0",
"resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-8.3.0.tgz",
"integrity": "sha512-eQ9qFW/fhfGJF8WKHGEHZEyVWfZxrT+6CLIJGBcZPfxUh/+BnEj+UCGYMlr9qZuX/2AltsvwrGqp0LhEW8D0zQ==",
"dev": true
},
"node_modules/@typescript-eslint/eslint-plugin": {
"version": "4.16.1",
"resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-4.16.1.tgz",
@ -7516,6 +7525,14 @@
"buffer": "^5.5.0"
}
},
"node_modules/it-concat": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/it-concat/-/it-concat-1.0.3.tgz",
"integrity": "sha512-sjeZQ1BWQ9U/W2oI09kZgUyvSWzQahTkOkLIsnEPgyqZFaF9ME5gV6An4nMjlyhXKWQMKEakQU8oRHs2SdmeyA==",
"dependencies": {
"bl": "^4.0.0"
}
},
"node_modules/it-drain": {
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/it-drain/-/it-drain-1.0.4.tgz",
@ -14367,6 +14384,12 @@
"integrity": "sha512-TYTfnILhrZUAZKGNgot5+sBDap7oPIzV3818p7g4VhKGc81+/eoEZ93wKBTGnSg/tpDjzWSb8Wx5E737FCH/Sw==",
"dev": true
},
"@types/uuid": {
"version": "8.3.0",
"resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-8.3.0.tgz",
"integrity": "sha512-eQ9qFW/fhfGJF8WKHGEHZEyVWfZxrT+6CLIJGBcZPfxUh/+BnEj+UCGYMlr9qZuX/2AltsvwrGqp0LhEW8D0zQ==",
"dev": true
},
"@typescript-eslint/eslint-plugin": {
"version": "4.16.1",
"resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-4.16.1.tgz",
@ -19583,6 +19606,14 @@
"buffer": "^5.5.0"
}
},
"it-concat": {
"version": "1.0.3",
"resolved": "https://registry.npmjs.org/it-concat/-/it-concat-1.0.3.tgz",
"integrity": "sha512-sjeZQ1BWQ9U/W2oI09kZgUyvSWzQahTkOkLIsnEPgyqZFaF9ME5gV6An4nMjlyhXKWQMKEakQU8oRHs2SdmeyA==",
"requires": {
"bl": "^4.0.0"
}
},
"it-drain": {
"version": "1.0.4",
"resolved": "https://registry.npmjs.org/it-drain/-/it-drain-1.0.4.tgz",

View File

@ -50,6 +50,8 @@
"dependencies": {
"@bitauth/libauth": "^1.17.1",
"debug": "^4.3.1",
"it-concat": "^1.0.3",
"it-length-prefixed": "^3.1.0",
"libp2p": "^0.30.0",
"libp2p-gossipsub": "^0.7.0",
"libp2p-mplex": "^0.10.2",
@ -59,6 +61,7 @@
"multiaddr": "^8.1.2",
"prompt-sync": "^4.2.0",
"ts-proto": "^1.74.0",
"uuid": "^3.4.0",
"yarg": "^1.0.8"
},
"devDependencies": {
@ -70,6 +73,7 @@
"@types/mocha": "^8.2.2",
"@types/node": "^14.14.31",
"@types/tail": "^2.0.0",
"@types/uuid": "^8.3.0",
"@typescript-eslint/eslint-plugin": "^4.0.1",
"@typescript-eslint/parser": "^4.0.1",
"app-root-path": "^3.0.0",

45
proto/waku/v2/store.proto Normal file
View File

@ -0,0 +1,45 @@
syntax = "proto3";
package waku.v2;
import "waku/v2/message.proto";
message Index {
bytes digest = 1;
double received_time = 2;
}
message PagingInfoQuery {
uint32 page_size = 1; // TODO: See https://github.com/vacp2p/rfc/issues/326
Index cursor = 2;
Direction direction = 3;
}
enum Direction {
DIRECTION_BACKWARD_UNSPECIFIED = 0;
DIRECTION_FORWARD = 1;
}
message PagingInfoResponse {
uint64 page_size = 1; // TODO: See https://github.com/vacp2p/rfc/issues/326
Index cursor = 2;
Direction direction = 3;
}
message HistoryQuery {
repeated string topics = 1;
optional PagingInfoQuery paging_info = 2;
optional double start_time = 3;
optional double end_time = 4;
}
message HistoryResponse {
repeated WakuMessageProto messages = 1;
PagingInfoResponse paging_info = 2;
}
message HistoryRPC {
string request_id = 1;
HistoryQuery query = 2;
HistoryResponse response = 3;
}

View File

@ -1,6 +1,9 @@
import readline from 'readline';
import util from 'util';
import Multiaddr from 'multiaddr';
import PeerId from 'peer-id';
import Waku from '../lib/waku';
import { WakuMessage } from '../lib/waku_message';
import { RelayDefaultTopic } from '../lib/waku_relay';
@ -8,6 +11,8 @@ import { delay } from '../test_utils/';
import { ChatMessage } from './chat_message';
const ChatContentTopic = 'dingpu';
(async function () {
const opts = processArguments();
@ -33,14 +38,7 @@ import { ChatMessage } from './chat_message';
const wakuMsg = WakuMessage.decode(event.data);
if (wakuMsg.payload) {
const chatMsg = ChatMessage.decode(wakuMsg.payload);
const timestamp = chatMsg.timestamp.toLocaleString([], {
month: 'short',
day: 'numeric',
hour: 'numeric',
minute: '2-digit',
hour12: false,
});
console.log(`<${timestamp}> ${chatMsg.nick}: ${chatMsg.message}`);
printMessage(chatMsg);
}
});
@ -68,19 +66,35 @@ import { ChatMessage } from './chat_message';
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
const staticNodeId = opts.staticNode?.getPeerId();
if (staticNodeId) {
const storePeerId = PeerId.createFromB58String(staticNodeId);
console.log(
`Retrieving archived messages from ${storePeerId.toB58String()}`
);
const msg = await waku.store.queryHistory(storePeerId, [ChatContentTopic]);
msg?.messages.map((msg) => {
const wakuMsg = WakuMessage.fromProto(msg);
if (wakuMsg.payload) {
const chatMsg = ChatMessage.decode(wakuMsg.payload);
printMessage(chatMsg);
}
});
}
console.log('Ready to chat!');
rl.prompt();
for await (const line of rl) {
rl.prompt();
const chatMessage = new ChatMessage(new Date(), nick, line);
const msg = WakuMessage.fromBytes(chatMessage.encode());
const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic);
await waku.relay.publish(msg);
}
})();
interface Options {
staticNode?: string;
staticNode?: Multiaddr;
listenAddr: string;
}
@ -93,7 +107,9 @@ function processArguments(): Options {
const arg = passedArgs.shift();
switch (arg) {
case '--staticNode':
opts = Object.assign(opts, { staticNode: passedArgs.shift() });
opts = Object.assign(opts, {
staticNode: new Multiaddr(passedArgs.shift()),
});
break;
case '--listenAddr':
opts = Object.assign(opts, { listenAddr: passedArgs.shift() });
@ -106,3 +122,14 @@ function processArguments(): Options {
return opts;
}
function printMessage(chatMsg: ChatMessage) {
const timestamp = chatMsg.timestamp.toLocaleString([], {
month: 'short',
day: 'numeric',
hour: 'numeric',
minute: '2-digit',
hour12: false,
});
console.log(`<${timestamp}> ${chatMsg.nick}: ${chatMsg.message}`);
}

View File

@ -7,6 +7,7 @@ import Multiaddr from 'multiaddr';
import PeerId from 'peer-id';
import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay';
import { StoreCodec, WakuStore } from './waku_store';
export interface CreateOptions {
listenAddresses: string[];
@ -14,7 +15,11 @@ export interface CreateOptions {
}
export default class Waku {
private constructor(public libp2p: Libp2p, public relay: WakuRelay) {}
private constructor(
public libp2p: Libp2p,
public relay: WakuRelay,
public store: WakuStore
) {}
/**
* Create new waku node
@ -46,17 +51,19 @@ export default class Waku {
},
});
const wakuStore = new WakuStore(libp2p);
await libp2p.start();
return new Waku(libp2p, new WakuRelay(libp2p.pubsub));
return new Waku(libp2p, new WakuRelay(libp2p.pubsub), wakuStore);
}
/**
* Dials to the provided peer. If successful, the known metadata of the peer will be added to the nodes peerStore, and the Connection will be returned
* Dials to the provided peer.
* @param peer The peer to dial
*/
async dial(peer: PeerId | Multiaddr | string) {
return this.libp2p.dialProtocol(peer, RelayCodec);
return this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]);
}
async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) {

View File

@ -2,18 +2,23 @@
import { Reader } from 'protobufjs/minimal';
// Protecting the user from protobuf oddities
import { WakuMessageProto } from '../proto/waku/v2/waku';
import { WakuMessageProto } from '../proto/waku/v2/message';
const DEFAULT_CONTENT_TOPIC = '/waku/2/default-content/proto';
export const DEFAULT_CONTENT_TOPIC = '/waku/2/default-content/proto';
const DEFAULT_VERSION = 0;
export class WakuMessage {
// TODO: Adopt similar design to HistoryRPC
private constructor(
public payload?: Uint8Array,
public contentTopic?: string,
public version?: number
) {}
static fromProto(proto: WakuMessageProto) {
return new WakuMessage(proto.payload, proto.contentTopic, proto.version);
}
/**
* Create Message with a utf-8 string as payload
* @param payload
@ -27,10 +32,14 @@ export class WakuMessage {
/**
* Create Message with a byte array as payload
* @param payload
* @param contentTopic
* @returns {WakuMessage}
*/
static fromBytes(payload: Uint8Array): WakuMessage {
return new WakuMessage(payload, DEFAULT_CONTENT_TOPIC, DEFAULT_VERSION);
static fromBytes(
payload: Uint8Array,
contentTopic: string = DEFAULT_CONTENT_TOPIC
): WakuMessage {
return new WakuMessage(payload, contentTopic, DEFAULT_VERSION);
}
static decode(bytes: Uint8Array): WakuMessage {

View File

@ -0,0 +1,35 @@
import { Reader } from 'protobufjs/minimal';
import { v4 as uuid } from 'uuid';
import * as proto from '../../proto/waku/v2/store';
import { DEFAULT_CONTENT_TOPIC } from '../waku_message';
export class HistoryRPC {
public constructor(public proto: proto.HistoryRPC) {}
static query(topics: string[] = [DEFAULT_CONTENT_TOPIC]): HistoryRPC {
const pagingInfo = {
pageSize: 10,
cursor: undefined,
direction: proto.Direction.DIRECTION_BACKWARD_UNSPECIFIED,
};
return new HistoryRPC({
requestId: uuid(),
query: { topics, pagingInfo, startTime: undefined, endTime: undefined },
response: undefined,
});
}
static decode(bytes: Uint8Array): HistoryRPC {
const res = proto.HistoryRPC.decode(Reader.create(bytes));
return new HistoryRPC(res);
}
encode(): Uint8Array {
return proto.HistoryRPC.encode(this.proto).finish();
}
get response(): proto.HistoryResponse | undefined {
return this.proto.response;
}
}

View File

@ -0,0 +1,73 @@
import { expect } from 'chai';
import {
delay,
makeLogFileName,
NimWaku,
NOISE_KEY_1,
NOISE_KEY_2,
} from '../../test_utils';
import Waku from '../waku';
import { WakuMessage } from '../waku_message';
describe('Waku Store', () => {
let waku: Waku;
let nimWaku: NimWaku;
beforeEach(async function () {
this.timeout(5_000);
nimWaku = new NimWaku(makeLogFileName(this));
await nimWaku.start({ store: true });
const waku0 = await Waku.create({ staticNoiseKey: NOISE_KEY_2 });
await waku0.dial(await nimWaku.getMultiaddrWithId());
await delay(100);
await new Promise((resolve) =>
waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
await waku0.relay.subscribe();
await new Promise((resolve) =>
waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
);
await waku0.relay.publish(
WakuMessage.fromUtf8String('A message from relay.')
);
await nimWaku.sendMessage(
WakuMessage.fromUtf8String('Another message from json rpc.')
);
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
await waku.dial(await nimWaku.getMultiaddrWithId());
await delay(500);
});
afterEach(async function () {
nimWaku ? nimWaku.stop() : null;
waku ? await waku.stop() : null;
});
it('Retrieves history', async function () {
const nimPeerId = await nimWaku.getPeerId();
const response = await waku.store.queryHistory(nimPeerId);
const messages = response?.messages;
// TODO: Should be fixed with https://github.com/status-im/nim-waku/issues/471
// expect(messages?.length).eq(2);
const result = messages
?.map((protoMsg) => {
return WakuMessage.fromProto(protoMsg);
})
.findIndex((msg) => {
return msg.utf8Payload() === 'A message from relay.';
});
expect(result).to.not.eq(-1);
});
});

View File

@ -0,0 +1,58 @@
import concat from 'it-concat';
import lp from 'it-length-prefixed';
import pipe from 'it-pipe';
import Libp2p from 'libp2p';
import PeerId from 'peer-id';
import { HistoryRPC } from './history_rpc';
export const StoreCodec = '/vac/waku/store/2.0.0-beta1';
export class WakuStore {
constructor(public libp2p: Libp2p) {}
/**
* Retrieve history from given peer
* @param peerId
* @param topics
* @throws if not able to reach peer
*/
async queryHistory(peerId: PeerId, topics?: string[]) {
const peer = this.libp2p.peerStore.get(peerId);
if (!peer) throw 'Peer is unknown';
if (!peer.protocols.includes(StoreCodec))
throw 'Peer does not register waku store protocol';
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) throw 'Failed to get a connection to the peer';
try {
const { stream } = await connection.newStream(StoreCodec);
const historyRpc = HistoryRPC.query(topics).encode();
try {
const res = await pipe(
[historyRpc],
lp.encode(),
stream,
lp.decode(),
concat
);
const buf = res.slice();
try {
const reply = HistoryRPC.decode(buf);
return reply.response;
} catch (err) {
console.log('Failed to decode store reply', err);
}
} catch (err) {
console.log('Failed to send waku store query', err);
}
} catch (err) {
console.log(
'Failed to negotiate waku store protocol stream with peer',
err
);
}
return null;
}
}

View File

@ -33,6 +33,7 @@ export interface Args {
nodekey?: string;
portsShift?: number;
logLevel?: LogLevel;
store?: boolean;
}
export enum LogLevel {