mirror of https://github.com/waku-org/js-waku.git
Merge #27
27: Implement Waku Store protocol r=D4nte a=D4nte Resolves #21 Co-authored-by: Franck Royer <franck@royer.one>
This commit is contained in:
commit
29a3ec403c
|
@ -13,6 +13,7 @@
|
|||
"codecov",
|
||||
"commitlint",
|
||||
"dependabot",
|
||||
"dingpu",
|
||||
"Dlazy",
|
||||
"Dout",
|
||||
"Dscore",
|
||||
|
@ -36,6 +37,7 @@
|
|||
"multiaddrs",
|
||||
"multicodecs",
|
||||
"mplex",
|
||||
"muxed",
|
||||
"muxer",
|
||||
"nodekey",
|
||||
"peerhave",
|
||||
|
@ -64,6 +66,7 @@
|
|||
"tsconfig.json",
|
||||
"node_modules/**",
|
||||
"build",
|
||||
"gen"
|
||||
"gen",
|
||||
"proto"
|
||||
]
|
||||
}
|
||||
|
|
|
@ -1,16 +1,17 @@
|
|||
{
|
||||
"name": "waku-js-chat",
|
||||
"name": "js-waku",
|
||||
"version": "1.0.0",
|
||||
"lockfileVersion": 2,
|
||||
"requires": true,
|
||||
"packages": {
|
||||
"": {
|
||||
"name": "waku-js-chat",
|
||||
"version": "1.0.0",
|
||||
"license": "MIT",
|
||||
"dependencies": {
|
||||
"@bitauth/libauth": "^1.17.1",
|
||||
"debug": "^4.3.1",
|
||||
"it-concat": "^1.0.3",
|
||||
"it-length-prefixed": "^3.1.0",
|
||||
"libp2p": "^0.30.0",
|
||||
"libp2p-gossipsub": "^0.7.0",
|
||||
"libp2p-mplex": "^0.10.2",
|
||||
|
@ -20,6 +21,7 @@
|
|||
"multiaddr": "^8.1.2",
|
||||
"prompt-sync": "^4.2.0",
|
||||
"ts-proto": "^1.74.0",
|
||||
"uuid": "^3.4.0",
|
||||
"yarg": "^1.0.8"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
@ -31,6 +33,7 @@
|
|||
"@types/mocha": "^8.2.2",
|
||||
"@types/node": "^14.14.31",
|
||||
"@types/tail": "^2.0.0",
|
||||
"@types/uuid": "^8.3.0",
|
||||
"@typescript-eslint/eslint-plugin": "^4.0.1",
|
||||
"@typescript-eslint/parser": "^4.0.1",
|
||||
"app-root-path": "^3.0.0",
|
||||
|
@ -1054,6 +1057,12 @@
|
|||
"integrity": "sha512-TYTfnILhrZUAZKGNgot5+sBDap7oPIzV3818p7g4VhKGc81+/eoEZ93wKBTGnSg/tpDjzWSb8Wx5E737FCH/Sw==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/@types/uuid": {
|
||||
"version": "8.3.0",
|
||||
"resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-8.3.0.tgz",
|
||||
"integrity": "sha512-eQ9qFW/fhfGJF8WKHGEHZEyVWfZxrT+6CLIJGBcZPfxUh/+BnEj+UCGYMlr9qZuX/2AltsvwrGqp0LhEW8D0zQ==",
|
||||
"dev": true
|
||||
},
|
||||
"node_modules/@typescript-eslint/eslint-plugin": {
|
||||
"version": "4.16.1",
|
||||
"resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-4.16.1.tgz",
|
||||
|
@ -7516,6 +7525,14 @@
|
|||
"buffer": "^5.5.0"
|
||||
}
|
||||
},
|
||||
"node_modules/it-concat": {
|
||||
"version": "1.0.3",
|
||||
"resolved": "https://registry.npmjs.org/it-concat/-/it-concat-1.0.3.tgz",
|
||||
"integrity": "sha512-sjeZQ1BWQ9U/W2oI09kZgUyvSWzQahTkOkLIsnEPgyqZFaF9ME5gV6An4nMjlyhXKWQMKEakQU8oRHs2SdmeyA==",
|
||||
"dependencies": {
|
||||
"bl": "^4.0.0"
|
||||
}
|
||||
},
|
||||
"node_modules/it-drain": {
|
||||
"version": "1.0.4",
|
||||
"resolved": "https://registry.npmjs.org/it-drain/-/it-drain-1.0.4.tgz",
|
||||
|
@ -14367,6 +14384,12 @@
|
|||
"integrity": "sha512-TYTfnILhrZUAZKGNgot5+sBDap7oPIzV3818p7g4VhKGc81+/eoEZ93wKBTGnSg/tpDjzWSb8Wx5E737FCH/Sw==",
|
||||
"dev": true
|
||||
},
|
||||
"@types/uuid": {
|
||||
"version": "8.3.0",
|
||||
"resolved": "https://registry.npmjs.org/@types/uuid/-/uuid-8.3.0.tgz",
|
||||
"integrity": "sha512-eQ9qFW/fhfGJF8WKHGEHZEyVWfZxrT+6CLIJGBcZPfxUh/+BnEj+UCGYMlr9qZuX/2AltsvwrGqp0LhEW8D0zQ==",
|
||||
"dev": true
|
||||
},
|
||||
"@typescript-eslint/eslint-plugin": {
|
||||
"version": "4.16.1",
|
||||
"resolved": "https://registry.npmjs.org/@typescript-eslint/eslint-plugin/-/eslint-plugin-4.16.1.tgz",
|
||||
|
@ -19583,6 +19606,14 @@
|
|||
"buffer": "^5.5.0"
|
||||
}
|
||||
},
|
||||
"it-concat": {
|
||||
"version": "1.0.3",
|
||||
"resolved": "https://registry.npmjs.org/it-concat/-/it-concat-1.0.3.tgz",
|
||||
"integrity": "sha512-sjeZQ1BWQ9U/W2oI09kZgUyvSWzQahTkOkLIsnEPgyqZFaF9ME5gV6An4nMjlyhXKWQMKEakQU8oRHs2SdmeyA==",
|
||||
"requires": {
|
||||
"bl": "^4.0.0"
|
||||
}
|
||||
},
|
||||
"it-drain": {
|
||||
"version": "1.0.4",
|
||||
"resolved": "https://registry.npmjs.org/it-drain/-/it-drain-1.0.4.tgz",
|
||||
|
|
|
@ -50,6 +50,8 @@
|
|||
"dependencies": {
|
||||
"@bitauth/libauth": "^1.17.1",
|
||||
"debug": "^4.3.1",
|
||||
"it-concat": "^1.0.3",
|
||||
"it-length-prefixed": "^3.1.0",
|
||||
"libp2p": "^0.30.0",
|
||||
"libp2p-gossipsub": "^0.7.0",
|
||||
"libp2p-mplex": "^0.10.2",
|
||||
|
@ -59,6 +61,7 @@
|
|||
"multiaddr": "^8.1.2",
|
||||
"prompt-sync": "^4.2.0",
|
||||
"ts-proto": "^1.74.0",
|
||||
"uuid": "^3.4.0",
|
||||
"yarg": "^1.0.8"
|
||||
},
|
||||
"devDependencies": {
|
||||
|
@ -70,6 +73,7 @@
|
|||
"@types/mocha": "^8.2.2",
|
||||
"@types/node": "^14.14.31",
|
||||
"@types/tail": "^2.0.0",
|
||||
"@types/uuid": "^8.3.0",
|
||||
"@typescript-eslint/eslint-plugin": "^4.0.1",
|
||||
"@typescript-eslint/parser": "^4.0.1",
|
||||
"app-root-path": "^3.0.0",
|
||||
|
|
|
@ -0,0 +1,45 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package waku.v2;
|
||||
|
||||
import "waku/v2/message.proto";
|
||||
|
||||
message Index {
|
||||
bytes digest = 1;
|
||||
double received_time = 2;
|
||||
}
|
||||
|
||||
message PagingInfoQuery {
|
||||
uint32 page_size = 1; // TODO: See https://github.com/vacp2p/rfc/issues/326
|
||||
Index cursor = 2;
|
||||
Direction direction = 3;
|
||||
}
|
||||
|
||||
enum Direction {
|
||||
DIRECTION_BACKWARD_UNSPECIFIED = 0;
|
||||
DIRECTION_FORWARD = 1;
|
||||
}
|
||||
|
||||
message PagingInfoResponse {
|
||||
uint64 page_size = 1; // TODO: See https://github.com/vacp2p/rfc/issues/326
|
||||
Index cursor = 2;
|
||||
Direction direction = 3;
|
||||
}
|
||||
|
||||
message HistoryQuery {
|
||||
repeated string topics = 1;
|
||||
optional PagingInfoQuery paging_info = 2;
|
||||
optional double start_time = 3;
|
||||
optional double end_time = 4;
|
||||
}
|
||||
|
||||
message HistoryResponse {
|
||||
repeated WakuMessageProto messages = 1;
|
||||
PagingInfoResponse paging_info = 2;
|
||||
}
|
||||
|
||||
message HistoryRPC {
|
||||
string request_id = 1;
|
||||
HistoryQuery query = 2;
|
||||
HistoryResponse response = 3;
|
||||
}
|
|
@ -1,6 +1,9 @@
|
|||
import readline from 'readline';
|
||||
import util from 'util';
|
||||
|
||||
import Multiaddr from 'multiaddr';
|
||||
import PeerId from 'peer-id';
|
||||
|
||||
import Waku from '../lib/waku';
|
||||
import { WakuMessage } from '../lib/waku_message';
|
||||
import { RelayDefaultTopic } from '../lib/waku_relay';
|
||||
|
@ -8,6 +11,8 @@ import { delay } from '../test_utils/';
|
|||
|
||||
import { ChatMessage } from './chat_message';
|
||||
|
||||
const ChatContentTopic = 'dingpu';
|
||||
|
||||
(async function () {
|
||||
const opts = processArguments();
|
||||
|
||||
|
@ -33,14 +38,7 @@ import { ChatMessage } from './chat_message';
|
|||
const wakuMsg = WakuMessage.decode(event.data);
|
||||
if (wakuMsg.payload) {
|
||||
const chatMsg = ChatMessage.decode(wakuMsg.payload);
|
||||
const timestamp = chatMsg.timestamp.toLocaleString([], {
|
||||
month: 'short',
|
||||
day: 'numeric',
|
||||
hour: 'numeric',
|
||||
minute: '2-digit',
|
||||
hour12: false,
|
||||
});
|
||||
console.log(`<${timestamp}> ${chatMsg.nick}: ${chatMsg.message}`);
|
||||
printMessage(chatMsg);
|
||||
}
|
||||
});
|
||||
|
||||
|
@ -68,19 +66,35 @@ import { ChatMessage } from './chat_message';
|
|||
waku.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
|
||||
);
|
||||
|
||||
const staticNodeId = opts.staticNode?.getPeerId();
|
||||
if (staticNodeId) {
|
||||
const storePeerId = PeerId.createFromB58String(staticNodeId);
|
||||
console.log(
|
||||
`Retrieving archived messages from ${storePeerId.toB58String()}`
|
||||
);
|
||||
const msg = await waku.store.queryHistory(storePeerId, [ChatContentTopic]);
|
||||
msg?.messages.map((msg) => {
|
||||
const wakuMsg = WakuMessage.fromProto(msg);
|
||||
if (wakuMsg.payload) {
|
||||
const chatMsg = ChatMessage.decode(wakuMsg.payload);
|
||||
printMessage(chatMsg);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
console.log('Ready to chat!');
|
||||
rl.prompt();
|
||||
for await (const line of rl) {
|
||||
rl.prompt();
|
||||
const chatMessage = new ChatMessage(new Date(), nick, line);
|
||||
|
||||
const msg = WakuMessage.fromBytes(chatMessage.encode());
|
||||
const msg = WakuMessage.fromBytes(chatMessage.encode(), ChatContentTopic);
|
||||
await waku.relay.publish(msg);
|
||||
}
|
||||
})();
|
||||
|
||||
interface Options {
|
||||
staticNode?: string;
|
||||
staticNode?: Multiaddr;
|
||||
listenAddr: string;
|
||||
}
|
||||
|
||||
|
@ -93,7 +107,9 @@ function processArguments(): Options {
|
|||
const arg = passedArgs.shift();
|
||||
switch (arg) {
|
||||
case '--staticNode':
|
||||
opts = Object.assign(opts, { staticNode: passedArgs.shift() });
|
||||
opts = Object.assign(opts, {
|
||||
staticNode: new Multiaddr(passedArgs.shift()),
|
||||
});
|
||||
break;
|
||||
case '--listenAddr':
|
||||
opts = Object.assign(opts, { listenAddr: passedArgs.shift() });
|
||||
|
@ -106,3 +122,14 @@ function processArguments(): Options {
|
|||
|
||||
return opts;
|
||||
}
|
||||
|
||||
function printMessage(chatMsg: ChatMessage) {
|
||||
const timestamp = chatMsg.timestamp.toLocaleString([], {
|
||||
month: 'short',
|
||||
day: 'numeric',
|
||||
hour: 'numeric',
|
||||
minute: '2-digit',
|
||||
hour12: false,
|
||||
});
|
||||
console.log(`<${timestamp}> ${chatMsg.nick}: ${chatMsg.message}`);
|
||||
}
|
||||
|
|
|
@ -7,6 +7,7 @@ import Multiaddr from 'multiaddr';
|
|||
import PeerId from 'peer-id';
|
||||
|
||||
import { RelayCodec, WakuRelay, WakuRelayPubsub } from './waku_relay';
|
||||
import { StoreCodec, WakuStore } from './waku_store';
|
||||
|
||||
export interface CreateOptions {
|
||||
listenAddresses: string[];
|
||||
|
@ -14,7 +15,11 @@ export interface CreateOptions {
|
|||
}
|
||||
|
||||
export default class Waku {
|
||||
private constructor(public libp2p: Libp2p, public relay: WakuRelay) {}
|
||||
private constructor(
|
||||
public libp2p: Libp2p,
|
||||
public relay: WakuRelay,
|
||||
public store: WakuStore
|
||||
) {}
|
||||
|
||||
/**
|
||||
* Create new waku node
|
||||
|
@ -46,17 +51,19 @@ export default class Waku {
|
|||
},
|
||||
});
|
||||
|
||||
const wakuStore = new WakuStore(libp2p);
|
||||
|
||||
await libp2p.start();
|
||||
|
||||
return new Waku(libp2p, new WakuRelay(libp2p.pubsub));
|
||||
return new Waku(libp2p, new WakuRelay(libp2p.pubsub), wakuStore);
|
||||
}
|
||||
|
||||
/**
|
||||
* Dials to the provided peer. If successful, the known metadata of the peer will be added to the nodes peerStore, and the Connection will be returned
|
||||
* Dials to the provided peer.
|
||||
* @param peer The peer to dial
|
||||
*/
|
||||
async dial(peer: PeerId | Multiaddr | string) {
|
||||
return this.libp2p.dialProtocol(peer, RelayCodec);
|
||||
return this.libp2p.dialProtocol(peer, [RelayCodec, StoreCodec]);
|
||||
}
|
||||
|
||||
async dialWithMultiAddr(peerId: PeerId, multiaddr: Multiaddr[]) {
|
||||
|
|
|
@ -2,18 +2,23 @@
|
|||
import { Reader } from 'protobufjs/minimal';
|
||||
|
||||
// Protecting the user from protobuf oddities
|
||||
import { WakuMessageProto } from '../proto/waku/v2/waku';
|
||||
import { WakuMessageProto } from '../proto/waku/v2/message';
|
||||
|
||||
const DEFAULT_CONTENT_TOPIC = '/waku/2/default-content/proto';
|
||||
export const DEFAULT_CONTENT_TOPIC = '/waku/2/default-content/proto';
|
||||
const DEFAULT_VERSION = 0;
|
||||
|
||||
export class WakuMessage {
|
||||
// TODO: Adopt similar design to HistoryRPC
|
||||
private constructor(
|
||||
public payload?: Uint8Array,
|
||||
public contentTopic?: string,
|
||||
public version?: number
|
||||
) {}
|
||||
|
||||
static fromProto(proto: WakuMessageProto) {
|
||||
return new WakuMessage(proto.payload, proto.contentTopic, proto.version);
|
||||
}
|
||||
|
||||
/**
|
||||
* Create Message with a utf-8 string as payload
|
||||
* @param payload
|
||||
|
@ -27,10 +32,14 @@ export class WakuMessage {
|
|||
/**
|
||||
* Create Message with a byte array as payload
|
||||
* @param payload
|
||||
* @param contentTopic
|
||||
* @returns {WakuMessage}
|
||||
*/
|
||||
static fromBytes(payload: Uint8Array): WakuMessage {
|
||||
return new WakuMessage(payload, DEFAULT_CONTENT_TOPIC, DEFAULT_VERSION);
|
||||
static fromBytes(
|
||||
payload: Uint8Array,
|
||||
contentTopic: string = DEFAULT_CONTENT_TOPIC
|
||||
): WakuMessage {
|
||||
return new WakuMessage(payload, contentTopic, DEFAULT_VERSION);
|
||||
}
|
||||
|
||||
static decode(bytes: Uint8Array): WakuMessage {
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
import { Reader } from 'protobufjs/minimal';
|
||||
import { v4 as uuid } from 'uuid';
|
||||
|
||||
import * as proto from '../../proto/waku/v2/store';
|
||||
import { DEFAULT_CONTENT_TOPIC } from '../waku_message';
|
||||
|
||||
export class HistoryRPC {
|
||||
public constructor(public proto: proto.HistoryRPC) {}
|
||||
|
||||
static query(topics: string[] = [DEFAULT_CONTENT_TOPIC]): HistoryRPC {
|
||||
const pagingInfo = {
|
||||
pageSize: 10,
|
||||
cursor: undefined,
|
||||
direction: proto.Direction.DIRECTION_BACKWARD_UNSPECIFIED,
|
||||
};
|
||||
return new HistoryRPC({
|
||||
requestId: uuid(),
|
||||
query: { topics, pagingInfo, startTime: undefined, endTime: undefined },
|
||||
response: undefined,
|
||||
});
|
||||
}
|
||||
|
||||
static decode(bytes: Uint8Array): HistoryRPC {
|
||||
const res = proto.HistoryRPC.decode(Reader.create(bytes));
|
||||
return new HistoryRPC(res);
|
||||
}
|
||||
|
||||
encode(): Uint8Array {
|
||||
return proto.HistoryRPC.encode(this.proto).finish();
|
||||
}
|
||||
|
||||
get response(): proto.HistoryResponse | undefined {
|
||||
return this.proto.response;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,73 @@
|
|||
import { expect } from 'chai';
|
||||
|
||||
import {
|
||||
delay,
|
||||
makeLogFileName,
|
||||
NimWaku,
|
||||
NOISE_KEY_1,
|
||||
NOISE_KEY_2,
|
||||
} from '../../test_utils';
|
||||
import Waku from '../waku';
|
||||
import { WakuMessage } from '../waku_message';
|
||||
|
||||
describe('Waku Store', () => {
|
||||
let waku: Waku;
|
||||
let nimWaku: NimWaku;
|
||||
|
||||
beforeEach(async function () {
|
||||
this.timeout(5_000);
|
||||
|
||||
nimWaku = new NimWaku(makeLogFileName(this));
|
||||
await nimWaku.start({ store: true });
|
||||
|
||||
const waku0 = await Waku.create({ staticNoiseKey: NOISE_KEY_2 });
|
||||
await waku0.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
await delay(100);
|
||||
await new Promise((resolve) =>
|
||||
waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
|
||||
);
|
||||
|
||||
await waku0.relay.subscribe();
|
||||
|
||||
await new Promise((resolve) =>
|
||||
waku0.libp2p.pubsub.once('gossipsub:heartbeat', resolve)
|
||||
);
|
||||
|
||||
await waku0.relay.publish(
|
||||
WakuMessage.fromUtf8String('A message from relay.')
|
||||
);
|
||||
|
||||
await nimWaku.sendMessage(
|
||||
WakuMessage.fromUtf8String('Another message from json rpc.')
|
||||
);
|
||||
|
||||
waku = await Waku.create({ staticNoiseKey: NOISE_KEY_1 });
|
||||
await waku.dial(await nimWaku.getMultiaddrWithId());
|
||||
|
||||
await delay(500);
|
||||
});
|
||||
|
||||
afterEach(async function () {
|
||||
nimWaku ? nimWaku.stop() : null;
|
||||
waku ? await waku.stop() : null;
|
||||
});
|
||||
|
||||
it('Retrieves history', async function () {
|
||||
const nimPeerId = await nimWaku.getPeerId();
|
||||
|
||||
const response = await waku.store.queryHistory(nimPeerId);
|
||||
const messages = response?.messages;
|
||||
|
||||
// TODO: Should be fixed with https://github.com/status-im/nim-waku/issues/471
|
||||
// expect(messages?.length).eq(2);
|
||||
const result = messages
|
||||
?.map((protoMsg) => {
|
||||
return WakuMessage.fromProto(protoMsg);
|
||||
})
|
||||
.findIndex((msg) => {
|
||||
return msg.utf8Payload() === 'A message from relay.';
|
||||
});
|
||||
expect(result).to.not.eq(-1);
|
||||
});
|
||||
});
|
|
@ -0,0 +1,58 @@
|
|||
import concat from 'it-concat';
|
||||
import lp from 'it-length-prefixed';
|
||||
import pipe from 'it-pipe';
|
||||
import Libp2p from 'libp2p';
|
||||
import PeerId from 'peer-id';
|
||||
|
||||
import { HistoryRPC } from './history_rpc';
|
||||
|
||||
export const StoreCodec = '/vac/waku/store/2.0.0-beta1';
|
||||
|
||||
export class WakuStore {
|
||||
constructor(public libp2p: Libp2p) {}
|
||||
|
||||
/**
|
||||
* Retrieve history from given peer
|
||||
* @param peerId
|
||||
* @param topics
|
||||
* @throws if not able to reach peer
|
||||
*/
|
||||
async queryHistory(peerId: PeerId, topics?: string[]) {
|
||||
const peer = this.libp2p.peerStore.get(peerId);
|
||||
if (!peer) throw 'Peer is unknown';
|
||||
if (!peer.protocols.includes(StoreCodec))
|
||||
throw 'Peer does not register waku store protocol';
|
||||
const connection = this.libp2p.connectionManager.get(peer.id);
|
||||
if (!connection) throw 'Failed to get a connection to the peer';
|
||||
|
||||
try {
|
||||
const { stream } = await connection.newStream(StoreCodec);
|
||||
|
||||
const historyRpc = HistoryRPC.query(topics).encode();
|
||||
try {
|
||||
const res = await pipe(
|
||||
[historyRpc],
|
||||
lp.encode(),
|
||||
stream,
|
||||
lp.decode(),
|
||||
concat
|
||||
);
|
||||
const buf = res.slice();
|
||||
try {
|
||||
const reply = HistoryRPC.decode(buf);
|
||||
return reply.response;
|
||||
} catch (err) {
|
||||
console.log('Failed to decode store reply', err);
|
||||
}
|
||||
} catch (err) {
|
||||
console.log('Failed to send waku store query', err);
|
||||
}
|
||||
} catch (err) {
|
||||
console.log(
|
||||
'Failed to negotiate waku store protocol stream with peer',
|
||||
err
|
||||
);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
|
@ -33,6 +33,7 @@ export interface Args {
|
|||
nodekey?: string;
|
||||
portsShift?: number;
|
||||
logLevel?: LogLevel;
|
||||
store?: boolean;
|
||||
}
|
||||
|
||||
export enum LogLevel {
|
||||
|
|
Loading…
Reference in New Issue