Merge pull request #803 from status-im/libp2p-upgrade-2

This commit is contained in:
fryorcraken.eth 2022-08-03 14:48:28 +10:00 committed by GitHub
commit 6017690403
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
52 changed files with 4204 additions and 4248 deletions

View File

@ -54,11 +54,14 @@
"multiaddr",
"multiaddresses",
"multiaddrs",
"multicodec",
"multicodecs",
"multiformats",
"mplex",
"multihashes",
"muxed",
"muxer",
"muxers",
"mvps",
"nodekey",
"nwaku",
@ -87,6 +90,7 @@
"transpiled",
"typedoc",
"unencrypted",
"unmarshal",
"unmount",
"unmounts",
"untracked",

View File

@ -14,7 +14,10 @@
],
"globals": { "BigInt": true, "console": true, "WebAssembly": true },
"rules": {
"@typescript-eslint/explicit-function-return-type": ["error"],
"@typescript-eslint/explicit-function-return-type": [
"error",
{ "allowExpressions": true }
],
"@typescript-eslint/explicit-module-boundary-types": "off",
"eslint-comments/disable-enable-pair": [
"error",

2
.gitignore vendored
View File

@ -7,3 +7,5 @@ node_modules
src/**.js
coverage
*.log
/tsconfig.tsbuildinfo
/tsconfig.dev.tsbuildinfo

View File

@ -1,9 +1,9 @@
import path from "path";
import fs from "fs";
const START_PATH = path.join(process.cwd(), "dist/esm");
const START_PATH = path.join(process.cwd(), "dist/");
const IMPORT_REGEXP =
/^((import|export) [^';]* from "(@[^";]+\/)?([^@";]*\/[^";]*)[^";]*)"/g;
/^((import|export) [^';]* from "(\.[^@";]*\/[^";]*)[^";]*)"/g;
const JUST_ADD_AN_EXTENSION = '$1.js"';
const ADD_INDEX_FILE = '$1/index.js"';
const JS_EXT = ".js";
@ -36,7 +36,7 @@ function fixImportsAtFile(filePath) {
}
const [_, importPath] = l.split(`"`);
let exists = true;
let exists;
let fullPath;
if (importPath.startsWith(".")) {
fullPath = path.join(filePath, "..", importPath);

View File

@ -13,40 +13,25 @@
<div id='status'></div>
<input id='textInput' placeholder='Type your message here' type='text'>
<button id='sendButton' onclick='sendMessage();' type='button'>Send Message
<button id='sendButton' type='button'>Send Message
</button>
<div><h1>Messages</h1></div>
<div id='messages'></div>
<script
src='https://unpkg.com/js-waku@latest/build/umd/js-waku.min.bundle.js'></script>
<script>
<script type='module'>
/**
* Demonstrate usage of js-waku in the browser. Use relay, gossip sub protocol to send and receive messages.
* Recommended payload is protobuf. Using simple utf-8 string for demo purposes only.
*
* - API documentation: https://js-waku.wakuconnect.dev/
* - Guides: https://docs.wakuconnect.dev/
*
* Note: open this HTML in two different browsers to experience decentralized communication.
* A node will not show its own messages, this can be changed by modifying the `Waku.create` call:
*
* Waku.create({
* bootstrap: {default: true}, libp2p: {
* config: {
* pubsub: {
* enabled: true,
* emitSelf: true,
* },
* },
* },
* })
*
*/
const { Waku, WakuMessage } = jswaku;
import {
createWaku,
waitForRemotePeer,
WakuMessage
} from '../../dist/bundle.js';
const statusDiv = document.getElementById('status');
const messagesDiv = document.getElementById('messages');
const textInput = document.getElementById('textInput');
@ -63,10 +48,6 @@
// for simplicity's sake.
const contentTopic = '/relay-demo/1/message/utf-8';
// Function to be used to send the text input over Waku.
let sendMessage = () => {
};
try {
statusDiv.innerHTML = '<p>Starting</p>';
@ -75,16 +56,13 @@
// We are currently working on migrating this method to DNS Discovery.
//
// https://js-waku.wakuconnect.dev/classes/waku.Waku.html#create
Waku.create({ bootstrap: { default: true } }).catch(e => {
statusDiv.innerHTML = 'Error';
console.log('Issue starting Waku node', e);
}
).then(wakuNode => {
const waku = await createWaku({ bootstrap: { default: true } });
await waku.start();
// Had a hook to process all incoming messages on a specified content topic.
//
// https://js-waku.wakuconnect.dev/classes/waku_relay.WakuRelay.html#addObserver
wakuNode.relay.addObserver((wakuMessage) => {
waku.relay.addObserver((wakuMessage) => {
// Checks there is a payload on the message.
// Waku Message is encoded in protobuf, in proto v3 fields are always optional.
@ -108,19 +86,13 @@
// waku nodes (peers) and for appropriate handshakes to be done.
//
// https://js-waku.wakuconnect.dev/classes/waku.Waku.html#waitForRemotePeer
wakuNode.waitForRemotePeer()
.catch((e) => {
statusDiv.innerHTML = 'Failed to connect to peers: ' + e.toString();
})
.then(() => {
await waitForRemotePeer(waku);
// We are now connected to a remote peer, let's define the `sendMessage`
// function that sends the text input over Waku Relay, the gossipsub
// protocol.
sendMessage = () => {
sendButton.onclick = async () => {
const text = textInput.value;
// Reset the text input.
textInput.value = null;
// Helper functions are available to create a Waku Message.
// These functions also provide native symmetric, asymmetric encryption,
@ -131,19 +103,15 @@
// serialize a data structure.
//
// https://js-waku.wakuconnect.dev/classes/waku_message.WakuMessage.html#fromUtf8String
WakuMessage.fromUtf8String(text, contentTopic).catch(e => console.log('Error encoding message', e)).then(
wakuMessage => {
const wakuMessage = await WakuMessage.fromUtf8String(text, contentTopic);
// Once the message is constructed, send it over Waku Relay.
//
// https://js-waku.wakuconnect.dev/classes/waku_relay.WakuRelay.html#send
wakuNode.relay.send(wakuMessage).catch((e) => {
console.log('Error sending message', e);
}).then(() => {
console.log('Message sent', text);
});
}
);
await waku.relay.send(wakuMessage);
console.log('Message sent!');
// Reset the text input.
textInput.value = null;
};
// Ready to send & receive messages, enable text input.
@ -151,10 +119,8 @@
sendButton.disabled = false;
statusDiv.innerHTML = '<p>Ready!</p>';
});
});
} catch (e) {
timestampDiv.innerHTML = 'Failed to start application';
statusDiv.innerHTML = 'Failed to start application';
console.log(e);
}
</script>

View File

@ -12,9 +12,13 @@
<div><h1>Timestamp of latest relay ping</h1></div>
<div id='timestamp'></div>
<script
src='https://unpkg.com/js-waku@latest/build/umd/js-waku.min.bundle.js'></script>
<script>
<script type='module'>
import {
createWaku,
waitForRemotePeer,
Protocols
} from '../../dist/bundle.min.js';
/**
* This example demonstrates how to use the js-waku minified bundle
* available on unpkg.com.
@ -25,19 +29,16 @@
const timestampDiv = document.getElementById('timestamp');
try {
timestampDiv.innerHTML = '<p>Starting waku.</p>';
jswaku.Waku.create({ bootstrap: { default: true } }).catch(e => {
timestampDiv.innerHTML = 'Failed to create Waku: ' + e.toString();
}
).then(waku => {
timestampDiv.innerHTML = '<p>Connecting to a peer.</p>';
waku.waitForRemotePeer()
.catch((e) => {
timestampDiv.innerHTML = 'Failed to connect to peers' + e.toString();
})
.then(() => {
timestampDiv.innerHTML = '<p>Retrieving messages.</p>';
timestampDiv.innerHTML = '<p>Creating waku.</p>';
const node = await createWaku({ bootstrap: { default: true } });
timestampDiv.innerHTML = '<p>Starting waku.</p>';
await node.start();
timestampDiv.innerHTML = '<p>Connecting to a peer.</p>';
await waitForRemotePeer(node, [Protocols.Store]);
timestampDiv.innerHTML = '<p>Retrieving messages.</p>';
const callback = (wakuMessages) => {
// Messages are ordered with oldest first
// even with page direction `backward`
@ -58,7 +59,7 @@
// Only retrieve a week of messages
startTime.setTime(Date.now() - 7 * 24 * 60 * 60 * 1000);
waku.store
await node.store
.queryHistory(['/relay-ping/1/ping/null'], {
callback,
pageDirection: 'backward',
@ -67,14 +68,9 @@
startTime,
endTime: new Date()
}
})
.catch((e) => {
timestampDiv.innerHTML = 'Failed to retrieve messages' + e.toString();
});
});
});
} catch (e) {
timestampDiv.innerHTML = 'Failed to start application' + e.toString();
timestampDiv.innerHTML = 'Error encountered: ' + e.toString();
}
</script>
</body>

View File

@ -14,6 +14,7 @@
"http-browserify": "^1.7.0",
"https-browserify": "^1.0.0",
"js-waku": "^0.24.0",
"multiaddr": "^10.0.1",
"process": "^0.11.10",
"react": "^17.0.2",
"react-dom": "^17.0.2",

View File

@ -10,6 +10,7 @@
"http-browserify": "^1.7.0",
"https-browserify": "^1.0.0",
"js-waku": "^0.24.0",
"multiaddr": "^10.0.1",
"process": "^0.11.10",
"react": "^17.0.2",
"react-dom": "^17.0.2",

View File

@ -26,7 +26,7 @@ function info(waku: Waku | undefined): string[] {
if (!waku) {
return ["Waku node is starting"];
}
return [`PeerId: ${waku.libp2p.peerId.toB58String()}`];
return [`PeerId: ${waku.libp2p.peerId.toString()}`];
}
function connect(peer: string | undefined, waku: Waku | undefined): string[] {
@ -64,7 +64,7 @@ async function peers(waku: Waku | undefined): Promise<string[]> {
peers.push(peer);
}
Array.from(peers).forEach((peer) => {
response.push(peer.id.toB58String() + ":");
response.push(peer.id.toString() + ":");
let addresses = " addresses: [";
peer.addresses.forEach(({ multiaddr }) => {
addresses += " " + multiaddr.toString() + ",";

View File

@ -1,5 +1,4 @@
process.env.CHROME_BIN = require("puppeteer").executablePath();
const webpackConfig = require("./webpack.config.cjs");
const webpack = require("webpack");
module.exports = function (config) {
@ -20,14 +19,34 @@ module.exports = function (config) {
},
webpack: {
mode: "production",
module: webpackConfig.module,
module: {
rules: [
{
test: /\.(js|tsx?)$/,
use: "ts-loader",
exclude: /(node_modules)|(node\.spec\.ts)/,
},
{
test: /node\.spec\.ts$/,
use: "ignore-loader",
},
],
},
plugins: [
new webpack.DefinePlugin({
"process.env.CI": process.env.CI || false,
}),
...webpackConfig.plugins,
new webpack.ProvidePlugin({
process: "process/browser.js",
}),
],
resolve: webpackConfig.resolve,
resolve: {
extensions: [".ts", ".js"],
fallback: {
// Can be removed once https://github.com/libp2p/js-libp2p-pubsub/pull/92 is merged and released
buffer: false,
},
},
stats: { warnings: false },
},
});

5219
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -2,14 +2,12 @@
"name": "js-waku",
"version": "0.24.0",
"description": "TypeScript implementation of the Waku v2 protocol",
"types": "./dist/esm/index.d.ts",
"main": "./dist/cjs/index.cjs",
"module": "./dist/esm/index.js",
"types": "./dist/index.d.ts",
"module": "./dist/index.js",
"exports": {
".": {
"types": "./dist/esm/index.d.ts",
"import": "./dist/esm/index.js",
"require": "./dist/cjs/index.cjs"
"types": "./dist/index.d.ts",
"import": "./dist/index.js"
}
},
"type": "module",
@ -25,11 +23,10 @@
],
"scripts": {
"prepare": "husky install",
"build": "rimraf ./dist; run-s build:**",
"build": "run-s build:**",
"build:esm": "tsc && node build-scripts/fix-imports.js",
"build:cjs": "rollup --config rollup.cjs.config.js -- dist/esm/index.js",
"build:umd": "webpack --config webpack.umd.config.cjs",
"build:umd:min": "terser --ecma 6 --compress --mangle -o dist/umd/index.min.js -- dist/umd/index.js && gzip -9 -c dist/umd/index.min.js > dist/umd/index.min.js.gz",
"build:bundle": "rollup --config rollup.config.js -- dist/index.js",
"build:bundle:min": "terser --ecma 11 --compress --mangle -o dist/bundle.min.js -- dist/bundle.js && gzip -9 -c dist/bundle.min.js > dist/bundle.min.js.gz",
"size": "npm run build && size-limit",
"fix": "run-s fix:*",
"fix:prettier": "prettier \"src/**/*.ts\" \"./*.json\" \"*.*js\" \".github/**/*.yml\" --write",
@ -66,49 +63,50 @@
"node": ">=16"
},
"dependencies": {
"@chainsafe/libp2p-noise": "^5.0.0",
"@chainsafe/libp2p-gossipsub": "^3.4.0",
"@chainsafe/libp2p-noise": "^7.0.1",
"@ethersproject/rlp": "^5.5.0",
"@libp2p/crypto": "^1.0.0",
"@libp2p/interface-peer-discovery": "^1.0.0",
"@libp2p/interface-peer-id": "^1.0.2",
"@libp2p/interface-peer-info": "^1.0.1",
"@libp2p/interface-peer-store": "^1.0.0",
"@libp2p/interfaces": "^3.0.2",
"@libp2p/mplex": "^4.0.1",
"@libp2p/peer-id": "^1.1.10",
"@libp2p/websockets": "^3.0.0",
"@multiformats/multiaddr": "^10.2.0",
"@noble/secp256k1": "^1.3.4",
"debug": "^4.3.1",
"dns-query": "^0.11.1",
"debug": "^4.3.4",
"dns-query": "^0.11.2",
"hi-base32": "^0.5.1",
"it-concat": "^2.0.0",
"it-length-prefixed": "^5.0.2",
"it-pipe": "^1.1.0",
"it-all": "^1.0.6",
"it-length-prefixed": "^7.0.1",
"it-pipe": "^2.0.3",
"js-sha3": "^0.8.0",
"libp2p": "^0.36.2",
"libp2p-bootstrap": "^0.14.0",
"libp2p-crypto": "^0.21.2",
"libp2p-gossipsub": "0.13.0",
"libp2p-interfaces": "^4.0.6",
"libp2p-mplex": "^0.10.4",
"libp2p-websockets": "^0.16.1",
"multiaddr": "^10.0.1",
"multiformats": "^9.6.5",
"peer-id": "^0.16.0",
"libp2p": "next",
"p-event": "^5.0.1",
"protons-runtime": "^1.0.4",
"uint8arrays": "^3.0.0",
"uuid": "^8.3.2",
"varint": "^6.0.0"
"uuid": "^8.3.2"
},
"devDependencies": {
"@istanbuljs/nyc-config-typescript": "^1.0.1",
"@libp2p/peer-id-factory": "^1.0.15",
"@rollup/plugin-commonjs": "^22.0.0",
"@rollup/plugin-json": "^4.1.0",
"@rollup/plugin-node-resolve": "^13.3.0",
"@size-limit/preset-big-lib": "^7.0.8",
"@types/app-root-path": "^1.2.4",
"@types/chai": "^4.2.15",
"@types/debug": "^4.1.7",
"@types/mocha": "^9.1.0",
"@types/node": "^17.0.6",
"@types/secp256k1": "^4.0.2",
"@types/tail": "^2.0.0",
"@types/uuid": "^8.3.0",
"@types/varint": "^6.0.0",
"@typescript-eslint/eslint-plugin": "^5.8.1",
"@typescript-eslint/parser": "^5.8.1",
"app-root-path": "^3.0.0",
"buffer": "^6.0.3",
"chai": "^4.3.4",
"cspell": "^5.14.0",
"eslint": "^8.6.0",
@ -139,22 +137,17 @@
"protons": "^3.0.4",
"puppeteer": "^13.0.1",
"rollup": "^2.75.0",
"rollup-plugin-polyfill-node": "^0.9.0",
"size-limit": "^7.0.8",
"stream-browserify": "^3.0.0",
"tail": "^2.2.0",
"terser": "^5.13.1",
"ts-loader": "^9.2.6",
"ts-node": "^10.4.0",
"typedoc": "^0.22.10",
"typedoc-plugin-no-inherit": "^1.3.1",
"typescript": "^4.5.5",
"webpack-cli": "^4.10.0"
"typescript": "^4.5.5"
},
"files": [
"dist/esm",
"dist/cjs",
"dist/umd",
"dist",
"src/*.ts",
"src/lib/**/*.ts",
"src/proto/**/*.ts",
@ -172,9 +165,39 @@
},
"size-limit": [
{
"path": "dist/umd/index.js",
"import": "{ Waku }",
"config": "./webpack.umd.config.cjs"
"name": "Waku core",
"path": "dist/bundle.min.js",
"import": "{ Waku }"
},
{
"name": "Waku default setup",
"path": "dist/bundle.min.js",
"import": "{ createWaku, waitForRemotePeer }"
},
{
"name": "Asymmetric, symmetric encryption and signature",
"path": "dist/bundle.min.js",
"import": "{ waku_message }"
},
{
"name": "DNS discovery",
"path": "dist/bundle.min.js",
"import": "{ discovery }"
},
{
"name": "Privacy preserving protocols",
"path": "dist/bundle.min.js",
"import": "{ WakuRelay }"
},
{
"name": "Light protocols",
"path": "dist/bundle.min.js",
"import": "{ WakuLightPush, WakuFilter }"
},
{
"name": "History retrieval protocols",
"path": "dist/bundle.min.js",
"import": "{ WakuStore }"
}
],
"lint-staged": {

View File

@ -1,21 +0,0 @@
import { nodeResolve } from "@rollup/plugin-node-resolve";
import commonjs from "@rollup/plugin-commonjs";
import json from "@rollup/plugin-json";
import nodePolyfills from "rollup-plugin-polyfill-node";
export default {
output: {
file: "dist/cjs/index.cjs",
format: "cjs",
name: "waku",
},
plugins: [
commonjs(),
json(),
nodePolyfills(),
nodeResolve({
browser: true,
preferBuiltins: false,
}),
],
};

View File

@ -1,18 +1,16 @@
import { nodeResolve } from "@rollup/plugin-node-resolve";
import commonjs from "@rollup/plugin-commonjs";
import json from "@rollup/plugin-json";
import nodePolyfills from "rollup-plugin-polyfill-node";
export default {
output: {
file: "dist/umd/index.js",
format: "umd",
file: "dist/bundle.js",
format: "esm",
name: "waku",
},
plugins: [
commonjs(),
json(),
nodePolyfills(),
nodeResolve({
browser: true,
preferBuiltins: false,

View File

@ -13,12 +13,19 @@ export * as enr from "./lib/enr";
export * as utils from "./lib/utils";
export { waitForRemotePeer } from "./lib/wait_for_remote_peer";
export * as proto_message from "./proto/message";
export * as waku from "./lib/waku";
export { Waku, Protocols } from "./lib/waku";
export { createWaku, Waku, Protocols } from "./lib/waku";
export * as waku_message from "./lib/waku_message";
export { WakuMessage } from "./lib/waku_message";
export * as waku_filter from "./lib/waku_filter";
export { WakuFilter } from "./lib/waku_filter";
export * as waku_light_push from "./lib/waku_light_push";
export {
WakuLightPush,

View File

@ -1,11 +1,19 @@
import type {
PeerDiscovery,
PeerDiscoveryEvents,
} from "@libp2p/interface-peer-discovery";
import { symbol } from "@libp2p/interface-peer-discovery";
import type { PeerInfo } from "@libp2p/interface-peer-info";
import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events";
import { peerIdFromString } from "@libp2p/peer-id";
import { Multiaddr } from "@multiformats/multiaddr";
import debug from "debug";
import { Multiaddr } from "multiaddr";
import { DnsNodeDiscovery, NodeCapabilityCount } from "./dns";
import { getPredefinedBootstrapNodes } from "./predefined";
import { getPseudoRandomSubset } from "./random_subset";
const dbg = debug("waku:discovery:bootstrap");
const log = debug("waku:discovery:bootstrap");
/**
* Setup discovery method used to bootstrap.
@ -35,8 +43,14 @@ export interface BootstrapOptions {
peers?: string[] | Multiaddr[];
/**
* Getter that retrieve multiaddrs of peers to connect to.
* will be called once.
*/
getPeers?: () => Promise<string[] | Multiaddr[]>;
/**
* The interval between emitting addresses in milliseconds.
* Used if [[peers]] is passed or a sync function is passed for [[getPeers]]
*/
interval?: number;
/**
* An EIP-1459 ENR Tree URL. For example:
* "enrtree://AOFTICU2XWDULNLZGRMQS4RIZPAZEHYMV4FYHAPW563HNRAOERP7C@test.nodes.vac.dev"
@ -56,23 +70,51 @@ export interface BootstrapOptions {
*
* @throws if an invalid combination of options is passed, see [[BootstrapOptions]] for details.
*/
export class Bootstrap {
public static DefaultMaxPeers = 1;
export class Bootstrap
extends EventEmitter<PeerDiscoveryEvents>
implements PeerDiscovery
{
static DefaultMaxPeers = 1;
public readonly getBootstrapPeers: (() => Promise<Multiaddr[]>) | undefined;
private readonly asyncGetBootstrapPeers:
| (() => Promise<Multiaddr[]>)
| undefined;
private peers: PeerInfo[];
private timer?: ReturnType<typeof setInterval>;
private readonly interval: number;
constructor(opts: BootstrapOptions) {
constructor(opts?: BootstrapOptions) {
super();
opts = opts ?? {};
const methods = [
!!opts.default,
!!opts.peers,
!!opts.getPeers,
!!opts.enrUrl,
].filter((x) => x);
if (methods.length > 1) {
throw new Error(
"Bootstrap does not support several discovery methods (yet)"
);
}
this.interval = opts.interval ?? 10000;
opts.default =
opts.default ?? (!opts.peers && !opts.getPeers && !opts.enrUrl);
const maxPeers = opts.maxPeers ?? Bootstrap.DefaultMaxPeers;
this.peers = [];
if (opts.default) {
dbg("Use hosted list of peers.");
log("Use hosted list of peers.");
this.getBootstrapPeers = (): Promise<Multiaddr[]> => {
return Promise.resolve(
this.peers = multiaddrsToPeerInfo(
getPredefinedBootstrapNodes(undefined, maxPeers)
);
};
} else if (opts.peers !== undefined && opts.peers.length > 0) {
return;
}
if (!!opts.peers && opts.peers.length > 0) {
const allPeers: Multiaddr[] = opts.peers.map(
(node: string | Multiaddr) => {
if (typeof node === "string") {
@ -82,41 +124,121 @@ export class Bootstrap {
}
}
);
const peers = getPseudoRandomSubset(allPeers, maxPeers);
dbg(
"Use provided list of peers (reduced to maxPeers)",
allPeers.map((ma) => ma.toString())
this.peers = multiaddrsToPeerInfo(
getPseudoRandomSubset(allPeers, maxPeers)
);
this.getBootstrapPeers = (): Promise<Multiaddr[]> =>
Promise.resolve(peers);
} else if (typeof opts.getPeers === "function") {
dbg("Bootstrap: Use provided getPeers function.");
log(
"Use provided list of peers (reduced to maxPeers)",
this.peers.map((ma) => ma.toString())
);
return;
}
if (typeof opts.getPeers === "function") {
log("Bootstrap: Use provided getPeers function.");
const getPeers = opts.getPeers;
this.getBootstrapPeers = async (): Promise<Multiaddr[]> => {
this.asyncGetBootstrapPeers = async () => {
const allPeers = await getPeers();
return getPseudoRandomSubset<string | Multiaddr>(
allPeers,
maxPeers
).map((node) => new Multiaddr(node));
};
} else if (opts.enrUrl) {
return;
}
if (opts.enrUrl) {
const wantedNodeCapabilityCount = opts.wantedNodeCapabilityCount;
if (!wantedNodeCapabilityCount)
throw "`wantedNodeCapabilityCount` must be defined when using `enrUrl`";
const enrUrl = opts.enrUrl;
dbg("Use provided EIP-1459 ENR Tree URL.");
log("Use provided EIP-1459 ENR Tree URL.");
const dns = DnsNodeDiscovery.dnsOverHttp();
this.getBootstrapPeers = async (): Promise<Multiaddr[]> => {
this.asyncGetBootstrapPeers = async () => {
const enrs = await dns.getPeers([enrUrl], wantedNodeCapabilityCount);
dbg(`Found ${enrs.length} peers`);
log(`Found ${enrs.length} peers`);
return enrs.map((enr) => enr.getFullMultiaddrs()).flat();
};
return;
}
}
/**
* Start discovery process
*/
start(): void {
if (this.asyncGetBootstrapPeers) {
// TODO: This should emit the peer as they are discovered instead of having
// to wait for the full DNS discovery process to be done first.
// TODO: PeerInfo should be returned by discovery
this.asyncGetBootstrapPeers().then((peers) => {
this.peers = multiaddrsToPeerInfo(peers);
this._startTimer();
});
} else {
dbg("No bootstrap method specified, no peer will be returned");
this.getBootstrapPeers = undefined;
this._startTimer();
}
}
private _startTimer(): void {
if (this.peers) {
log("Starting bootstrap node discovery");
if (this.timer != null) {
return;
}
this.timer = setInterval(() => this._returnPeers(), this.interval);
this._returnPeers();
}
}
_returnPeers(): void {
if (this.timer == null) {
return;
}
this.peers.forEach((peerData) => {
this.dispatchEvent(
new CustomEvent<PeerInfo>("peer", { detail: peerData })
);
});
}
/**
* Stop emitting events
*/
stop(): void {
if (this.timer != null) {
clearInterval(this.timer);
}
this.timer = undefined;
}
get [symbol](): true {
return true;
}
get [Symbol.toStringTag](): string {
return "@waku/bootstrap";
}
}
function multiaddrsToPeerInfo(mas: Multiaddr[]): PeerInfo[] {
return mas
.map((ma) => {
const peerIdStr = ma.getPeerId();
const protocols: string[] = [];
return {
id: peerIdStr ? peerIdFromString(peerIdStr) : null,
multiaddrs: [ma.decapsulateCode(421)],
protocols,
};
})
.filter((peerInfo): peerInfo is PeerInfo => peerInfo.id !== null);
}

View File

@ -171,7 +171,7 @@ describe("DNS Node Discovery w/ capabilities", () => {
});
expect(peers.length).to.eq(1);
expect(peers[0].peerId?.toB58String()).to.eq(
expect(peers[0].peerId?.toString()).to.eq(
"16Uiu2HAmPsYLvfKafxgRsb6tioYyGnSvGXS2iuMigptHrqHPNPzx"
);
});
@ -186,7 +186,7 @@ describe("DNS Node Discovery w/ capabilities", () => {
});
expect(peers.length).to.eq(1);
expect(peers[0].peerId?.toB58String()).to.eq(
expect(peers[0].peerId?.toString()).to.eq(
"16Uiu2HAm2HyS6brcCspSbszG9i36re2bWBVjMe3tMdnFp1Hua34F"
);
});
@ -200,7 +200,7 @@ describe("DNS Node Discovery w/ capabilities", () => {
});
expect(peers.length).to.eq(1);
expect(peers[0].peerId?.toB58String()).to.eq(
expect(peers[0].peerId?.toString()).to.eq(
"16Uiu2HAkv3La3ECgQpdYeEJfrX36EWdhkUDv4C9wvXM8TFZ9dNgd"
);
});
@ -220,7 +220,7 @@ describe("DNS Node Discovery w/ capabilities", () => {
});
expect(peers.length).to.eq(2);
const peerIds = peers.map((p) => p.peerId?.toB58String());
const peerIds = peers.map((p) => p.peerId?.toString());
expect(peerIds).to.contain(
"16Uiu2HAm2HyS6brcCspSbszG9i36re2bWBVjMe3tMdnFp1Hua34F"
);
@ -246,7 +246,7 @@ describe("DNS Node Discovery w/ capabilities", () => {
});
expect(peers.length).to.eq(3);
const peerIds = peers.map((p) => p.peerId?.toB58String());
const peerIds = peers.map((p) => p.peerId?.toString());
expect(peerIds).to.contain(
"16Uiu2HAm2HyS6brcCspSbszG9i36re2bWBVjMe3tMdnFp1Hua34F"
);

View File

@ -1,13 +1,13 @@
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { Multiaddr } from "@multiformats/multiaddr";
import { expect } from "chai";
import { Multiaddr } from "multiaddr";
import PeerId from "peer-id";
import { ENR, Waku2 } from "../enr";
import fetchNodesUntilCapabilitiesFulfilled from "./fetch_nodes";
async function createEnr(waku2: Waku2): Promise<ENR> {
const peerId = await PeerId.create({ keyType: "secp256k1" });
const peerId = await createSecp256k1PeerId();
const enr = await ENR.createFromPeerId(peerId);
enr.setLocationMultiaddr(new Multiaddr("/ip4/18.223.219.100/udp/9000"));
enr.multiaddrs = [
@ -42,7 +42,7 @@ describe("Fetch nodes until capabilities are fulfilled", function () {
);
expect(res.length).to.eq(1);
expect(res[0].peerId!.toB58String()).to.eq(relayNode.peerId?.toB58String());
expect(res[0].peerId!.toString()).to.eq(relayNode.peerId?.toString());
});
it("1 Store, 2 fetches", async function () {
@ -65,7 +65,7 @@ describe("Fetch nodes until capabilities are fulfilled", function () {
);
expect(res.length).to.eq(1);
expect(res[0].peerId!.toB58String()).to.eq(storeNode.peerId?.toB58String());
expect(res[0].peerId!.toString()).to.eq(storeNode.peerId?.toString());
});
it("1 Store, 2 relays, 2 fetches", async function () {
@ -94,15 +94,9 @@ describe("Fetch nodes until capabilities are fulfilled", function () {
);
expect(res.length).to.eq(3);
expect(res[0].peerId!.toB58String()).to.eq(
relayNode1.peerId?.toB58String()
);
expect(res[1].peerId!.toB58String()).to.eq(
relayNode2.peerId?.toB58String()
);
expect(res[2].peerId!.toB58String()).to.eq(
relayStoreNode.peerId?.toB58String()
);
expect(res[0].peerId!.toString()).to.eq(relayNode1.peerId?.toString());
expect(res[1].peerId!.toString()).to.eq(relayNode2.peerId?.toString());
expect(res[2].peerId!.toString()).to.eq(relayStoreNode.peerId?.toString());
});
it("1 Relay, 1 Filter, gives up", async function () {
@ -117,6 +111,6 @@ describe("Fetch nodes until capabilities are fulfilled", function () {
);
expect(res.length).to.eq(1);
expect(res[0].peerId!.toB58String()).to.eq(relayNode.peerId?.toB58String());
expect(res[0].peerId!.toString()).to.eq(relayNode.peerId?.toString());
});
});

View File

@ -1,6 +1,5 @@
import { expect } from "chai";
import { fleets } from "./predefined";
import { getPseudoRandomSubset } from "./random_subset";
describe("Discovery", () => {
@ -34,23 +33,3 @@ describe("Discovery", () => {
expect(res.length).to.eq(2);
});
});
describe("Discovery [live data]", function () {
before(function () {
if (process.env.CI) {
this.skip();
}
});
it("Check pre-defined nodes against hosted JSON [live data]", async function () {
const res = await fetch("https://fleets.status.im/");
const nodes = await res.json();
expect(fleets.fleets["wakuv2.prod"]["waku-websocket"]).to.deep.eq(
nodes.fleets["wakuv2.prod"]["waku-websocket"]
);
expect(fleets.fleets["wakuv2.test"]["waku-websocket"]).to.deep.eq(
nodes.fleets["wakuv2.test"]["waku-websocket"]
);
});
});

View File

@ -1,4 +1,4 @@
import { Multiaddr } from "multiaddr";
import { Multiaddr } from "@multiformats/multiaddr";
import { getPseudoRandomSubset } from "./random_subset";

View File

@ -1,12 +1,30 @@
import { shuffle } from "libp2p-gossipsub/src/utils/index";
/**
* Return pseudo random subset of the input.
*/
export function getPseudoRandomSubset<T>(
values: T[],
wantedNumber: number
): T[] {
if (values.length <= wantedNumber) {
if (values.length <= wantedNumber || values.length <= 1) {
return values;
}
return shuffle(values).slice(0, wantedNumber);
}
function shuffle<T>(arr: T[]): T[] {
if (arr.length <= 1) {
return arr;
}
const randInt = (): number => {
return Math.floor(Math.random() * Math.floor(arr.length));
};
for (let i = 0; i < arr.length; i++) {
const j = randInt();
const tmp = arr[i];
arr[i] = arr[j];
arr[j] = tmp;
}
return arr;
}

View File

@ -1,7 +1,8 @@
import { expect } from "chai";
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils";
import { Protocols, Waku } from "../waku";
import { waitForRemotePeer } from "../wait_for_remote_peer";
import { createWaku, Protocols, Waku } from "../waku";
import { ENR } from "./enr";
@ -25,18 +26,19 @@ describe("ENR Interop: nwaku", function () {
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(multiAddrWithId);
await waku.waitForRemotePeer([Protocols.Relay]);
await waitForRemotePeer(waku, [Protocols.Relay]);
const nwakuInfo = await nwaku.info();
const nimPeerId = await nwaku.getPeerId();
expect(nwakuInfo.enrUri).to.not.be.undefined;
const dec = await ENR.decodeTxt(nwakuInfo.enrUri ?? "");
expect(dec.peerId?.toB58String()).to.eq(nimPeerId.toB58String());
expect(dec.peerId?.toString()).to.eq(nimPeerId.toString());
expect(dec.waku2).to.deep.eq({
relay: true,
store: false,
@ -56,18 +58,19 @@ describe("ENR Interop: nwaku", function () {
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(multiAddrWithId);
await waku.waitForRemotePeer([Protocols.Relay]);
await waitForRemotePeer(waku, [Protocols.Relay]);
const nwakuInfo = await nwaku.info();
const nimPeerId = await nwaku.getPeerId();
expect(nwakuInfo.enrUri).to.not.be.undefined;
const dec = await ENR.decodeTxt(nwakuInfo.enrUri ?? "");
expect(dec.peerId?.toB58String()).to.eq(nimPeerId.toB58String());
expect(dec.peerId?.toString()).to.eq(nimPeerId.toString());
expect(dec.waku2).to.deep.eq({
relay: true,
store: true,
@ -87,18 +90,19 @@ describe("ENR Interop: nwaku", function () {
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(multiAddrWithId);
await waku.waitForRemotePeer([Protocols.Relay]);
await waitForRemotePeer(waku, [Protocols.Relay]);
const nwakuInfo = await nwaku.info();
const nimPeerId = await nwaku.getPeerId();
expect(nwakuInfo.enrUri).to.not.be.undefined;
const dec = await ENR.decodeTxt(nwakuInfo.enrUri ?? "");
expect(dec.peerId?.toB58String()).to.eq(nimPeerId.toB58String());
expect(dec.peerId?.toString()).to.eq(nimPeerId.toString());
expect(dec.waku2).to.deep.eq({
relay: true,
store: true,

View File

@ -1,6 +1,6 @@
import { createSecp256k1PeerId } from "@libp2p/peer-id-factory";
import { Multiaddr } from "@multiformats/multiaddr";
import { assert, expect } from "chai";
import { Multiaddr } from "multiaddr";
import PeerId from "peer-id";
import { getPublicKey } from "../crypto";
import { bytesToHex, hexToBytes, utf8ToBytes } from "../utils";
@ -13,9 +13,9 @@ import { Waku2 } from "./waku2_codec";
describe("ENR", function () {
describe("Txt codec", () => {
it("should encodeTxt and decodeTxt", async () => {
const peerId = await PeerId.create({ keyType: "secp256k1" });
const peerId = await createSecp256k1PeerId();
const enr = await ENR.createFromPeerId(peerId);
const keypair = createKeypairFromPeerId(peerId);
const keypair = await createKeypairFromPeerId(peerId);
enr.setLocationMultiaddr(new Multiaddr("/ip4/18.223.219.100/udp/9000"));
enr.multiaddrs = [
new Multiaddr(
@ -102,16 +102,16 @@ describe("ENR", function () {
expect(enr.ip).to.not.be.undefined;
expect(enr.ip).to.be.equal("134.209.139.210");
expect(enr.publicKey).to.not.be.undefined;
expect(enr.peerId?.toB58String()).to.be.equal(
expect(enr.peerId?.toString()).to.be.equal(
"16Uiu2HAmPLe7Mzm8TsYUubgCAW1aJoeFScxrLj8ppHFivPo97bUZ"
);
});
it("should throw error - no id", async () => {
try {
const peerId = await PeerId.create({ keyType: "secp256k1" });
const peerId = await createSecp256k1PeerId();
const enr = await ENR.createFromPeerId(peerId);
const keypair = createKeypairFromPeerId(peerId);
const keypair = await createKeypairFromPeerId(peerId);
enr.setLocationMultiaddr(new Multiaddr("/ip4/18.223.219.100/udp/9000"));
enr.set("id", new Uint8Array([0]));
@ -307,7 +307,7 @@ describe("ENR", function () {
let enr: ENR;
before(async function () {
peerId = await PeerId.create({ keyType: "secp256k1" });
peerId = await createSecp256k1PeerId();
enr = await ENR.createFromPeerId(peerId);
enr.ip = ip4;
enr.ip6 = ip6;
@ -387,9 +387,9 @@ describe("ENR", function () {
let keypair: IKeypair;
beforeEach(async function () {
peerId = await PeerId.create({ keyType: "secp256k1" });
peerId = await createSecp256k1PeerId();
enr = await ENR.createFromPeerId(peerId);
keypair = createKeypairFromPeerId(peerId);
keypair = await createKeypairFromPeerId(peerId);
waku2Protocols = {
relay: false,
store: false,

View File

@ -1,13 +1,13 @@
import * as RLP from "@ethersproject/rlp";
import type { PeerId } from "@libp2p/interface-peer-id";
import { Multiaddr } from "@multiformats/multiaddr";
import {
convertToBytes,
convertToString,
} from "@multiformats/multiaddr/convert";
import debug from "debug";
import { Multiaddr, protocols } from "multiaddr";
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: No types available
import muConvert from "multiaddr/src/convert";
import PeerId from "peer-id";
import { fromString } from "uint8arrays/from-string";
import { toString } from "uint8arrays/to-string";
import { encode as varintEncode } from "varint";
import { compressPublicKey, keccak256, verifySignature } from "../crypto";
import { bytesToHex, bytesToUtf8, hexToBytes, utf8ToBytes } from "../utils";
@ -20,6 +20,7 @@ import {
IKeypair,
KeypairType,
} from "./keypair";
import { multiaddrFromFields } from "./multiaddr_from_fields";
import { decodeMultiaddrs, encodeMultiaddrs } from "./multiaddrs_codec";
import { ENRKey, ENRValue, NodeId, SequenceNumber } from "./types";
import * as v4 from "./v4";
@ -77,11 +78,11 @@ export class ENR extends Map<ENRKey, ENRValue> {
});
}
static createFromPeerId(
static async createFromPeerId(
peerId: PeerId,
kvs: Record<ENRKey, ENRValue> = {}
): Promise<ENR> {
const keypair = createKeypairFromPeerId(peerId);
const keypair = await createKeypairFromPeerId(peerId);
switch (keypair.type) {
case KeypairType.secp256k1:
return ENR.createV4(keypair.publicKey, kvs);
@ -190,7 +191,7 @@ export class ENR extends Map<ENRKey, ENRValue> {
get ip(): string | undefined {
const raw = this.get("ip");
if (raw) {
return muConvert.toString(protocols.names.ip4.code, raw) as string;
return convertToString("ip4", raw) as string;
} else {
return undefined;
}
@ -198,7 +199,7 @@ export class ENR extends Map<ENRKey, ENRValue> {
set ip(ip: string | undefined) {
if (ip) {
this.set("ip", muConvert.toBytes(protocols.names.ip4.code, ip));
this.set("ip", convertToBytes("ip4", ip));
} else {
this.delete("ip");
}
@ -207,7 +208,7 @@ export class ENR extends Map<ENRKey, ENRValue> {
get tcp(): number | undefined {
const raw = this.get("tcp");
if (raw) {
return Number(muConvert.toString(protocols.names.tcp.code, raw));
return Number(convertToString("tcp", raw));
} else {
return undefined;
}
@ -217,14 +218,14 @@ export class ENR extends Map<ENRKey, ENRValue> {
if (port === undefined) {
this.delete("tcp");
} else {
this.set("tcp", muConvert.toBytes(protocols.names.tcp.code, port));
this.set("tcp", convertToBytes("tcp", port.toString(10)));
}
}
get udp(): number | undefined {
const raw = this.get("udp");
if (raw) {
return Number(muConvert.toString(protocols.names.udp.code, raw));
return Number(convertToString("udp", raw));
} else {
return undefined;
}
@ -234,14 +235,14 @@ export class ENR extends Map<ENRKey, ENRValue> {
if (port === undefined) {
this.delete("udp");
} else {
this.set("udp", muConvert.toBytes(protocols.names.udp.code, port));
this.set("udp", convertToBytes("udp", port.toString(10)));
}
}
get ip6(): string | undefined {
const raw = this.get("ip6");
if (raw) {
return muConvert.toString(protocols.names.ip6.code, raw) as string;
return convertToString("ip6", raw) as string;
} else {
return undefined;
}
@ -249,7 +250,7 @@ export class ENR extends Map<ENRKey, ENRValue> {
set ip6(ip: string | undefined) {
if (ip) {
this.set("ip6", muConvert.toBytes(protocols.names.ip6.code, ip));
this.set("ip6", convertToBytes("ip6", ip));
} else {
this.delete("ip6");
}
@ -258,7 +259,7 @@ export class ENR extends Map<ENRKey, ENRValue> {
get tcp6(): number | undefined {
const raw = this.get("tcp6");
if (raw) {
return Number(muConvert.toString(protocols.names.tcp.code, raw));
return Number(convertToString("tcp", raw));
} else {
return undefined;
}
@ -268,14 +269,14 @@ export class ENR extends Map<ENRKey, ENRValue> {
if (port === undefined) {
this.delete("tcp6");
} else {
this.set("tcp6", muConvert.toBytes(protocols.names.tcp.code, port));
this.set("tcp6", convertToBytes("tcp", port.toString(10)));
}
}
get udp6(): number | undefined {
const raw = this.get("udp6");
if (raw) {
return Number(muConvert.toString(protocols.names.udp.code, raw));
return Number(convertToString("udp", raw));
} else {
return undefined;
}
@ -285,7 +286,7 @@ export class ENR extends Map<ENRKey, ENRValue> {
if (port === undefined) {
this.delete("udp6");
} else {
this.set("udp6", muConvert.toBytes(protocols.names.udp.code, port));
this.set("udp6", convertToBytes("udp", port.toString(10)));
}
}
@ -346,7 +347,7 @@ export class ENR extends Map<ENRKey, ENRValue> {
const isIpv6 = protocol.endsWith("6");
const ipVal = this.get(isIpv6 ? "ip6" : "ip");
if (!ipVal) {
return undefined;
return;
}
const isUdp = protocol.startsWith("udp");
@ -359,29 +360,19 @@ export class ENR extends Map<ENRKey, ENRValue> {
protoName = "tcp";
protoVal = isIpv6 ? this.get("tcp6") : this.get("tcp");
} else {
return undefined;
return;
}
if (!protoVal) {
return undefined;
return;
}
// Create raw multiaddr buffer
// multiaddr length is:
// 1 byte for the ip protocol (ip4 or ip6)
// N bytes for the ip address
// 1 or 2 bytes for the protocol as buffer (tcp or udp)
// 2 bytes for the port
const ipMa = protocols.names[isIpv6 ? "ip6" : "ip4"];
const ipByteLen = ipMa.size / 8;
const protoMa = protocols.names[protoName];
const protoBuf = varintEncode(protoMa.code);
const maBuf = new Uint8Array(3 + ipByteLen + protoBuf.length);
maBuf[0] = ipMa.code;
maBuf.set(ipVal, 1);
maBuf.set(protoBuf, 1 + ipByteLen);
maBuf.set(protoVal, 1 + ipByteLen + protoBuf.length);
return new Multiaddr(maBuf);
return multiaddrFromFields(
isIpv6 ? "ip6" : "ip4",
protoName,
ipVal,
protoVal
);
}
setLocationMultiaddr(multiaddr: Multiaddr): void {
@ -422,9 +413,7 @@ export class ENR extends Map<ENRKey, ENRValue> {
if (this.peerId) {
const locationMultiaddr = this.getLocationMultiaddr(protocol);
if (locationMultiaddr) {
return locationMultiaddr.encapsulate(
`/p2p/${this.peerId.toB58String()}`
);
return locationMultiaddr.encapsulate(`/p2p/${this.peerId.toString()}`);
}
}
return;
@ -437,7 +426,7 @@ export class ENR extends Map<ENRKey, ENRValue> {
if (this.peerId && this.multiaddrs) {
const peerId = this.peerId;
return this.multiaddrs.map((ma) => {
return ma.encapsulate(`/p2p/${peerId.toB58String()}`);
return ma.encapsulate(`/p2p/${peerId.toString()}`);
});
}
return [];

View File

@ -1,35 +0,0 @@
import { expect } from "chai";
import { keys } from "libp2p-crypto";
import PeerId from "peer-id";
import { createPeerIdFromKeypair, generateKeypair, KeypairType } from "./index";
const { supportedKeys } = keys;
describe("createPeerIdFromKeypair", function () {
it("should properly create a PeerId from a secp256k1 keypair with private key", async function () {
const keypair = await generateKeypair(KeypairType.secp256k1);
const privKey = new supportedKeys.secp256k1.Secp256k1PrivateKey(
keypair.privateKey,
keypair.publicKey
);
const expectedPeerId = await PeerId.createFromPrivKey(privKey.bytes);
const actualPeerId = await createPeerIdFromKeypair(keypair);
expect(actualPeerId).to.be.deep.equal(expectedPeerId);
});
it("should properly create a PeerId from a secp256k1 keypair without private key", async function () {
const keypair = await generateKeypair(KeypairType.secp256k1);
delete (keypair as any)._privateKey;
const pubKey = new supportedKeys.secp256k1.Secp256k1PublicKey(
keypair.publicKey
);
const expectedPeerId = await PeerId.createFromPubKey(pubKey.bytes);
const actualPeerId = await createPeerIdFromKeypair(keypair);
expect(actualPeerId).to.be.deep.equal(expectedPeerId);
});
});

View File

@ -1,16 +1,16 @@
import { keys } from "libp2p-crypto";
import { identity } from "multiformats/hashes/identity";
import PeerId from "peer-id";
import { unmarshalPrivateKey, unmarshalPublicKey } from "@libp2p/crypto/keys";
import { supportedKeys } from "@libp2p/crypto/keys";
import type { PeerId } from "@libp2p/interface-peer-id";
import { peerIdFromKeys } from "@libp2p/peer-id";
import { Secp256k1Keypair } from "./secp256k1";
import { IKeypair, KeypairType } from "./types";
const { keysPBM, supportedKeys } = keys;
export const ERR_TYPE_NOT_IMPLEMENTED = "Keypair type not implemented";
export * from "./types";
export * from "./secp256k1";
// TODO: Check if @libp2p/crypto methods can be used instead.
export async function generateKeypair(type: KeypairType): Promise<IKeypair> {
switch (type) {
case KeypairType.secp256k1:
@ -38,31 +38,49 @@ export async function createPeerIdFromKeypair(
): Promise<PeerId> {
switch (keypair.type) {
case KeypairType.secp256k1: {
// manually create a peer id to avoid expensive ops
const privKey = keypair.hasPrivateKey()
? new supportedKeys.secp256k1.Secp256k1PrivateKey(
keypair.privateKey,
keypair.publicKey
)
: undefined;
const pubKey = new supportedKeys.secp256k1.Secp256k1PublicKey(
const publicKey = new supportedKeys.secp256k1.Secp256k1PublicKey(
keypair.publicKey
);
const id = await identity.digest(pubKey.bytes);
return new PeerId(id.bytes, privKey, pubKey);
const privateKey = keypair.hasPrivateKey()
? new supportedKeys.secp256k1.Secp256k1PrivateKey(keypair.privateKey)
: undefined;
return peerIdFromKeys(publicKey.bytes, privateKey?.bytes);
}
default:
throw new Error(ERR_TYPE_NOT_IMPLEMENTED);
}
}
export function createKeypairFromPeerId(peerId: PeerId): IKeypair {
// pub/private key bytes from peer-id are encoded in protobuf format
const pub = keysPBM.PublicKey.decode(peerId.pubKey.bytes);
export async function createKeypairFromPeerId(
peerId: PeerId
): Promise<IKeypair> {
let keypairType;
switch (peerId.type) {
case "RSA":
keypairType = KeypairType.rsa;
break;
case "Ed25519":
keypairType = KeypairType.ed25519;
break;
case "secp256k1":
keypairType = KeypairType.secp256k1;
break;
default:
throw new Error("Unsupported peer id type");
}
const publicKey = peerId.publicKey
? unmarshalPublicKey(peerId.publicKey)
: undefined;
const privateKey = peerId.privateKey
? await unmarshalPrivateKey(peerId.privateKey)
: undefined;
return createKeypair(
pub.Type as KeypairType,
peerId.privKey ? peerId.privKey.marshal() : undefined,
pub.Data
keypairType,
privateKey?.marshal(),
publicKey?.marshal()
);
}

View File

@ -0,0 +1,24 @@
import { convertToBytes } from "@multiformats/multiaddr/convert";
import { expect } from "chai";
import { multiaddrFromFields } from "./multiaddr_from_fields";
describe("Multiaddr from fields", () => {
it("tcp ip4 address", function () {
const ipBytes = convertToBytes("ip4", "1.2.3.4");
const portBytes = convertToBytes("tcp", "3333");
const ma = multiaddrFromFields("ip4", "tcp", ipBytes, portBytes);
expect(ma.toString()).to.eq("/ip4/1.2.3.4/tcp/3333");
});
it("udp ip6 address", function () {
const ipBytes = convertToBytes("ip6", "2345:425:2ca1::5673:23b5");
const portBytes = convertToBytes("udp", "1111");
const ma = multiaddrFromFields("ip6", "udp", ipBytes, portBytes);
expect(ma.toString()).to.eq("/ip6/2345:425:2ca1::5673:23b5/udp/1111");
});
});

View File

@ -0,0 +1,21 @@
import { Multiaddr } from "@multiformats/multiaddr";
import { convertToString } from "@multiformats/multiaddr/convert";
export function multiaddrFromFields(
ipFamily: string,
protocol: string,
ipBytes: Uint8Array,
protocolBytes: Uint8Array
): Multiaddr {
let ma = new Multiaddr(
"/" + ipFamily + "/" + convertToString(ipFamily, ipBytes)
);
ma = ma.encapsulate(
new Multiaddr(
"/" + protocol + "/" + convertToString(protocol, protocolBytes)
)
);
return ma;
}

View File

@ -1,5 +1,5 @@
import { Multiaddr } from "@multiformats/multiaddr";
import { expect } from "chai";
import { Multiaddr } from "multiaddr";
import { decodeMultiaddrs, encodeMultiaddrs } from "./multiaddrs_codec";

View File

@ -1,4 +1,4 @@
import { Multiaddr } from "multiaddr";
import { Multiaddr } from "@multiformats/multiaddr";
import { MULTIADDR_LENGTH_SIZE } from "./constants";

View File

@ -1,18 +1,13 @@
import Libp2p from "libp2p";
import { Peer } from "libp2p/src/peer-store";
import { Peer } from "@libp2p/interface-peer-store";
import { Libp2p } from "libp2p";
/**
* Returns a pseudo-random peer that supports the given protocol.
* Useful for protocols such as store and light push
*/
export async function selectRandomPeer(
peersIter: AsyncIterable<Peer>
peers: Peer[]
): Promise<Peer | undefined> {
const peers = [];
for await (const peer of peersIter) {
peers.push(peer);
}
if (peers.length === 0) return;
const index = Math.round(Math.random() * (peers.length - 1));
@ -22,21 +17,18 @@ export async function selectRandomPeer(
/**
* Returns the list of peers that supports the given protocol.
*/
export async function* getPeersForProtocol(
export async function getPeersForProtocol(
libp2p: Libp2p,
protocols: string[]
): AsyncIterable<Peer> {
for await (const peer of libp2p.peerStore.getPeers()) {
let peerFound = false;
): Promise<Peer[]> {
const peers: Peer[] = [];
await libp2p.peerStore.forEach((peer) => {
for (let i = 0; i < protocols.length; i++) {
if (peer.protocols.includes(protocols[i])) {
peerFound = true;
peers.push(peer);
break;
}
}
if (!peerFound) {
continue;
}
yield peer;
}
});
return peers;
}

View File

@ -0,0 +1,200 @@
import { expect } from "chai";
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../test_utils";
import { delay } from "../test_utils/delay";
import { waitForRemotePeer } from "./wait_for_remote_peer";
import { createWaku, Protocols, Waku } from "./waku";
describe("Wait for remote peer", function () {
let waku: Waku;
let nwaku: Nwaku | undefined;
afterEach(async function () {
if (nwaku) {
nwaku.stop();
nwaku = undefined;
}
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});
it("Relay - dialed first", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({
relay: true,
store: false,
filter: false,
lightpush: false,
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(multiAddrWithId);
await delay(1000);
await waitForRemotePeer(waku, [Protocols.Relay]);
const peers = waku.relay.getMeshPeers();
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers).to.includes(nimPeerId);
});
it("Relay - dialed after", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({
relay: true,
store: false,
filter: false,
lightpush: false,
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
const waitPromise = waitForRemotePeer(waku, [Protocols.Relay]);
await delay(1000);
await waku.dial(multiAddrWithId);
await waitPromise;
const peers = waku.relay.getMeshPeers();
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers).includes(nimPeerId);
});
it("Relay - times out", function (done) {
this.timeout(5000);
createWaku({
staticNoiseKey: NOISE_KEY_1,
})
.then((waku) => waku.start().then(() => waku))
.then((waku) => {
waitForRemotePeer(waku, [Protocols.Relay], 200).then(
() => {
throw "Promise expected to reject on time out";
},
(reason) => {
expect(reason).to.eq("Timed out waiting for a remote peer.");
done();
}
);
});
});
it("Store - dialed first", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({
store: true,
relay: false,
lightpush: false,
filter: false,
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(multiAddrWithId);
await delay(1000);
await waitForRemotePeer(waku, [Protocols.Store]);
const peers = (await waku.store.peers()).map((peer) => peer.id.toString());
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
it("Store - dialed after - with timeout", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({
store: true,
relay: false,
lightpush: false,
filter: false,
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
const waitPromise = waitForRemotePeer(waku, [Protocols.Store], 2000);
await delay(1000);
await waku.dial(multiAddrWithId);
await waitPromise;
const peers = (await waku.store.peers()).map((peer) => peer.id.toString());
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
it("LightPush", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({
lightpush: true,
filter: false,
relay: false,
store: false,
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(multiAddrWithId);
await waitForRemotePeer(waku, [Protocols.LightPush]);
const peers = (await waku.lightPush.peers()).map((peer) =>
peer.id.toString()
);
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
it("Filter", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({
filter: true,
lightpush: false,
relay: false,
store: false,
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(multiAddrWithId);
await waitForRemotePeer(waku, [Protocols.Filter]);
const peers = (await waku.filter.peers()).map((peer) => peer.id.toString());
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
});

View File

@ -0,0 +1,131 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import { Peer, PeerProtocolsChangeData } from "@libp2p/interface-peer-store";
import debug from "debug";
import type { Libp2p } from "libp2p";
import { pEvent } from "p-event";
import { StoreCodecs } from "./constants";
import { Protocols, Waku } from "./waku";
import { FilterCodec } from "./waku_filter";
import { LightPushCodec } from "./waku_light_push";
const log = debug("waku:wait-for-remote-peer");
interface WakuProtocol {
libp2p: Libp2p;
peers: () => Promise<Peer[]>;
}
interface WakuGossipSubProtocol extends GossipSub {
getMeshPeers: () => string[];
}
/**
* Wait for a remote peer to be ready given the passed protocols.
* Useful when using the [[CreateOptions.bootstrap]] with [[createWaku]].
*
* If the passed protocols is a GossipSub protocol, then it resolves only once
* a peer is in a mesh, to help ensure that other peers will send and receive
* message to us.
*
* @param waku The Waku Node
* @param protocols The protocols that need to be enabled by remote peers.
* @param timeoutMs A timeout value in milliseconds..
*
* @returns A promise that **resolves** if all desired protocols are fulfilled by
* remote nodes, **rejects** if the timeoutMs is reached.
*
* @default Remote peer must have Waku Relay enabled and no time out is applied.
*/
export async function waitForRemotePeer(
waku: Waku,
protocols?: Protocols[],
timeoutMs?: number
): Promise<void> {
protocols = protocols ?? [Protocols.Relay];
if (!waku.isStarted()) return Promise.reject("Waku node is not started");
const promises = [];
if (protocols.includes(Protocols.Relay)) {
promises.push(waitForGossipSubPeerInMesh(waku.relay));
}
if (protocols.includes(Protocols.Store)) {
promises.push(waitForConnectedPeer(waku.store, Object.values(StoreCodecs)));
}
if (protocols.includes(Protocols.LightPush)) {
promises.push(waitForConnectedPeer(waku.lightPush, [LightPushCodec]));
}
if (protocols.includes(Protocols.Filter)) {
promises.push(waitForConnectedPeer(waku.filter, [FilterCodec]));
}
if (timeoutMs) {
await rejectOnTimeout(
Promise.all(promises),
timeoutMs,
"Timed out waiting for a remote peer."
);
} else {
await Promise.all(promises);
}
}
/**
* Wait for a peer with the given protocol to be connected.
*/
async function waitForConnectedPeer(
waku: WakuProtocol,
codecs: string[]
): Promise<void> {
const peers = await waku.peers();
if (peers.length) {
log(`${codecs} peer found: `, peers[0].id.toString());
return;
}
await new Promise<void>((resolve) => {
const cb = (evt: CustomEvent<PeerProtocolsChangeData>): void => {
for (const codec of codecs) {
if (evt.detail.protocols.includes(codec)) {
log("Resolving for", codec, evt.detail.protocols);
waku.libp2p.peerStore.removeEventListener("change:protocols", cb);
resolve();
break;
}
}
};
waku.libp2p.peerStore.addEventListener("change:protocols", cb);
});
}
/**
* Wait for a peer with the given protocol to be connected and in the gossipsub
* mesh.
*/
async function waitForGossipSubPeerInMesh(
waku: WakuGossipSubProtocol
): Promise<void> {
let peers = waku.getMeshPeers();
while (peers.length == 0) {
await pEvent(waku, "gossipsub:heartbeat");
peers = waku.getMeshPeers();
}
}
const awaitTimeout = (ms: number, rejectReason: string): Promise<void> =>
new Promise((_resolve, reject) => setTimeout(() => reject(rejectReason), ms));
async function rejectOnTimeout<T>(
promise: Promise<T>,
timeoutMs: number,
rejectReason: string
): Promise<void> {
await Promise.race([promise, awaitTimeout(timeoutMs, rejectReason)]);
}

View File

@ -1,5 +1,5 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import { expect } from "chai";
import PeerId from "peer-id";
import {
makeLogFileName,
@ -7,10 +7,10 @@ import {
NOISE_KEY_2,
Nwaku,
} from "../test_utils/";
import { delay } from "../test_utils/delay";
import { generateSymmetricKey } from "./crypto";
import { Protocols, Waku } from "./waku";
import { waitForRemotePeer } from "./wait_for_remote_peer";
import { createWaku, Protocols, Waku } from "./waku";
import { WakuMessage } from "./waku_message";
const TestContentTopic = "/test/1/waku/utf8";
@ -31,11 +31,12 @@ describe("Waku Dial [node only]", function () {
await nwaku.start();
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(multiAddrWithId);
await waku.waitForRemotePeer([Protocols.Relay]);
await waitForRemotePeer(waku, [Protocols.Relay]);
const nimPeerId = await nwaku.getPeerId();
expect(await waku.libp2p.peerStore.has(nimPeerId)).to.be.true;
@ -57,19 +58,22 @@ describe("Waku Dial [node only]", function () {
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start();
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
bootstrap: { peers: [multiAddrWithId] },
});
await waku.start();
const connectedPeerID: PeerId = await new Promise((resolve) => {
waku.libp2p.connectionManager.on("peer:connect", (connection) => {
resolve(connection.remotePeer);
});
waku.libp2p.connectionManager.addEventListener(
"peer:connect",
(evt) => {
resolve(evt.detail.remotePeer);
}
);
});
expect(connectedPeerID.toB58String()).to.eq(multiAddrWithId.getPeerId());
expect(connectedPeerID.toString()).to.eq(multiAddrWithId.getPeerId());
});
it("Passing a function", async function () {
@ -78,7 +82,7 @@ describe("Waku Dial [node only]", function () {
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start();
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
bootstrap: {
getPeers: async () => {
@ -86,15 +90,19 @@ describe("Waku Dial [node only]", function () {
},
},
});
await waku.start();
const connectedPeerID: PeerId = await new Promise((resolve) => {
waku.libp2p.connectionManager.on("peer:connect", (connection) => {
resolve(connection.remotePeer);
});
waku.libp2p.connectionManager.addEventListener(
"peer:connect",
(evt) => {
resolve(evt.detail.remotePeer);
}
);
});
const multiAddrWithId = await nwaku.getMultiaddrWithId();
expect(connectedPeerID.toB58String()).to.eq(multiAddrWithId.getPeerId());
expect(connectedPeerID.toString()).to.eq(multiAddrWithId.getPeerId());
});
});
});
@ -111,18 +119,23 @@ describe("Decryption Keys", () => {
beforeEach(async function () {
this.timeout(5000);
[waku1, waku2] = await Promise.all([
Waku.create({ staticNoiseKey: NOISE_KEY_1 }),
Waku.create({
createWaku({ staticNoiseKey: NOISE_KEY_1 }).then((waku) =>
waku.start().then(() => waku)
),
createWaku({
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
}),
}).then((waku) => waku.start().then(() => waku)),
]);
waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
waku1.addPeerToAddressBook(
waku2.libp2p.peerId,
waku2.libp2p.getMultiaddrs()
);
await Promise.all([
waku1.waitForRemotePeer([Protocols.Relay]),
waku2.waitForRemotePeer([Protocols.Relay]),
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
]);
});
@ -163,169 +176,3 @@ describe("Decryption Keys", () => {
expect(receivedMsg.timestamp?.valueOf()).to.eq(messageTimestamp.valueOf());
});
});
describe("Wait for remote peer / get peers", function () {
let waku: Waku;
let nwaku: Nwaku | undefined;
afterEach(async function () {
if (nwaku) {
nwaku.stop();
nwaku = undefined;
}
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});
it("Relay - dialed first", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start();
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
await waku.dial(multiAddrWithId);
await delay(1000);
await waku.waitForRemotePeer([Protocols.Relay]);
const peers = waku.relay.getPeers();
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.has(nimPeerId as string)).to.be.true;
});
it("Relay - dialed after", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start();
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
const waitPromise = waku.waitForRemotePeer([Protocols.Relay]);
await delay(1000);
await waku.dial(multiAddrWithId);
await waitPromise;
const peers = waku.relay.getPeers();
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.has(nimPeerId as string)).to.be.true;
});
it("Relay - times out", function (done) {
this.timeout(5000);
Waku.create({
staticNoiseKey: NOISE_KEY_1,
}).then((waku) => {
waku.waitForRemotePeer([Protocols.Relay], 200).then(
() => {
throw "Promise expected to reject on time out";
},
(reason) => {
expect(reason).to.eq("Timed out waiting for a remote peer.");
done();
}
);
});
});
it("Store - dialed first", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ persistMessages: true });
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
await waku.dial(multiAddrWithId);
await delay(1000);
await waku.waitForRemotePeer([Protocols.Store]);
const peers = [];
for await (const peer of waku.store.peers) {
peers.push(peer.id.toB58String());
}
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
it("Store - dialed after - with timeout", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ persistMessages: true });
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
const waitPromise = waku.waitForRemotePeer([Protocols.Store], 2000);
await delay(1000);
await waku.dial(multiAddrWithId);
await waitPromise;
const peers = [];
for await (const peer of waku.store.peers) {
peers.push(peer.id.toB58String());
}
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
it("LightPush", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ lightpush: true });
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
await waku.dial(multiAddrWithId);
await waku.waitForRemotePeer([Protocols.LightPush]);
const peers = [];
for await (const peer of waku.lightPush.peers) {
peers.push(peer.id.toB58String());
}
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
it("Filter", async function () {
this.timeout(20_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ filter: true });
const multiAddrWithId = await nwaku.getMultiaddrWithId();
waku = await Waku.create({
staticNoiseKey: NOISE_KEY_1,
});
await waku.dial(multiAddrWithId);
await waku.waitForRemotePeer([Protocols.Filter]);
const peers = [];
for await (const peer of waku.filter.peers) {
peers.push(peer.id.toB58String());
}
const nimPeerId = multiAddrWithId.getPeerId();
expect(nimPeerId).to.not.be.undefined;
expect(peers.includes(nimPeerId as string)).to.be.true;
});
});

View File

@ -1,7 +1,7 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import { expect } from "chai";
import PeerId from "peer-id";
import { Waku } from "./waku";
import { createWaku, Waku } from "./waku";
describe("Waku Dial", function () {
describe("Bootstrap [live data]", function () {
@ -22,14 +22,18 @@ describe("Waku Dial", function () {
// This dependence must be removed once DNS discovery is implemented
this.timeout(20_000);
waku = await Waku.create({
waku = await createWaku({
bootstrap: { default: true },
});
await waku.start();
const connectedPeerID: PeerId = await new Promise((resolve) => {
waku.libp2p.connectionManager.on("peer:connect", (connection) => {
resolve(connection.remotePeer);
});
waku.libp2p.connectionManager.addEventListener(
"peer:connect",
(evt) => {
resolve(evt.detail.remotePeer);
}
);
});
expect(connectedPeerID).to.not.be.undefined;

View File

@ -1,20 +1,13 @@
import { Noise } from "@chainsafe/libp2p-noise";
import type { Stream } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import { Mplex } from "@libp2p/mplex";
import { peerIdFromString } from "@libp2p/peer-id";
import { WebSockets } from "@libp2p/websockets";
import { all as filterAll } from "@libp2p/websockets/filters";
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
import debug from "debug";
import Libp2p, { Connection, Libp2pModules, Libp2pOptions } from "libp2p";
import Libp2pBootstrap from "libp2p-bootstrap";
import { MuxedStream } from "libp2p-interfaces/dist/src/stream-muxer/types";
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: No types available
import Mplex from "libp2p-mplex";
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: No types available
import Websockets from "libp2p-websockets";
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: No types available
import filters from "libp2p-websockets/src/filters";
import PingService from "libp2p/src/ping";
import { Multiaddr, multiaddr } from "multiaddr";
import PeerId from "peer-id";
import { createLibp2p, Libp2p, Libp2pOptions } from "libp2p";
import { Bootstrap, BootstrapOptions } from "./discovery";
import { FilterCodec, WakuFilter } from "./waku_filter";
@ -24,12 +17,10 @@ import { WakuRelay } from "./waku_relay";
import { RelayCodecs, RelayPingContentTopic } from "./waku_relay/constants";
import { StoreCodecs, WakuStore } from "./waku_store";
const websocketsTransportKey = Websockets.prototype[Symbol.toStringTag];
export const DefaultPingKeepAliveValueSecs = 0;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
const dbg = debug("waku:waku");
const log = debug("waku:waku");
export enum Protocols {
Relay = "relay",
@ -74,9 +65,7 @@ export interface CreateOptions {
* allowing its omission and letting Waku set good defaults.
* Notes that some values are overridden by {@link Waku} to ensure it implements the Waku protocol.
*/
libp2p?: Omit<Libp2pOptions & import("libp2p").CreateOptions, "modules"> & {
modules?: Partial<Libp2pModules>;
};
libp2p?: Partial<Libp2pOptions>;
/**
* Byte array used as key for the noise protocol used for connection encryption
* by [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
@ -95,6 +84,32 @@ export interface CreateOptions {
decryptionKeys?: Array<Uint8Array | string>;
}
export async function createWaku(options?: CreateOptions): Promise<Waku> {
const peerDiscovery = [];
if (options?.bootstrap) {
peerDiscovery.push(new Bootstrap(options?.bootstrap));
}
const libp2pOpts = Object.assign(
{
transports: [new WebSockets({ filter: filterAll })],
streamMuxers: [new Mplex()],
pubsub: new WakuRelay(options),
connectionEncryption: [new Noise()],
peerDiscovery: peerDiscovery,
},
options?.libp2p ?? {}
);
const libp2p = await createLibp2p(libp2pOpts);
const wakuStore = new WakuStore(libp2p, options);
const wakuLightPush = new WakuLightPush(libp2p, options);
const wakuFilter = new WakuFilter(libp2p, options);
return new Waku(options ?? {}, libp2p, wakuStore, wakuLightPush, wakuFilter);
}
export class Waku {
public libp2p: Libp2p;
public relay: WakuRelay;
@ -109,7 +124,7 @@ export class Waku {
[peer: string]: ReturnType<typeof setInterval>;
};
private constructor(
constructor(
options: CreateOptions,
libp2p: Libp2p,
store: WakuStore,
@ -129,8 +144,8 @@ export class Waku {
const relayKeepAlive =
options.relayKeepAlive || DefaultRelayKeepAliveValueSecs;
libp2p.connectionManager.on("peer:connect", (connection: Connection) => {
this.startKeepAlive(connection.remotePeer, pingKeepAlive, relayKeepAlive);
libp2p.connectionManager.addEventListener("peer:connect", (evt) => {
this.startKeepAlive(evt.detail.remotePeer, pingKeepAlive, relayKeepAlive);
});
/**
@ -144,8 +159,8 @@ export class Waku {
* >this event will **only** be triggered when the last connection is closed.
* @see https://github.com/libp2p/js-libp2p/blob/bad9e8c0ff58d60a78314077720c82ae331cc55b/doc/API.md?plain=1#L2100
*/
libp2p.connectionManager.on("peer:disconnect", (connection: Connection) => {
this.stopKeepAlive(connection.remotePeer);
libp2p.connectionManager.addEventListener("peer:disconnect", (evt) => {
this.stopKeepAlive(evt.detail.remotePeer);
});
options?.decryptionKeys?.forEach((key) => {
@ -153,97 +168,6 @@ export class Waku {
});
}
/**
* Create and start new waku node.
*/
static async create(options?: CreateOptions): Promise<Waku> {
// Get an object in case options or libp2p are undefined
const libp2pOpts = Object.assign({}, options?.libp2p);
// Default for Websocket filter is `all`:
// Returns all TCP and DNS based addresses, both with ws or wss.
libp2pOpts.config = Object.assign(
{
transport: {
[websocketsTransportKey]: {
filter: filters.all,
},
},
},
options?.libp2p?.config
);
// Pass pubsub topic to relay
if (options?.pubSubTopic) {
libp2pOpts.config.pubsub = Object.assign(
{ pubSubTopic: options.pubSubTopic },
libp2pOpts.config.pubsub
);
}
libp2pOpts.modules = Object.assign({}, options?.libp2p?.modules);
// Default transport for libp2p is Websockets
libp2pOpts.modules = Object.assign(
{
transport: [Websockets],
},
options?.libp2p?.modules
);
// streamMuxer, connection encryption and pubsub are overridden
// as those are the only ones currently supported by Waku nodes.
libp2pOpts.modules = Object.assign(libp2pOpts.modules, {
streamMuxer: [Mplex],
connEncryption: [new Noise(options?.staticNoiseKey)],
pubsub: WakuRelay,
});
if (options?.bootstrap) {
const bootstrap = new Bootstrap(options?.bootstrap);
if (bootstrap.getBootstrapPeers !== undefined) {
try {
const list = await bootstrap.getBootstrapPeers();
// Note: this overrides any other peer discover
libp2pOpts.modules = Object.assign(libp2pOpts.modules, {
peerDiscovery: [Libp2pBootstrap],
});
libp2pOpts.config.peerDiscovery = {
[Libp2pBootstrap.tag]: {
list,
enabled: true,
},
};
} catch (e) {
dbg("Failed to retrieve bootstrap nodes", e);
}
}
}
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
// @ts-ignore: modules property is correctly set thanks to voodoo
const libp2p = await Libp2p.create(libp2pOpts);
const wakuStore = new WakuStore(libp2p, {
pubSubTopic: options?.pubSubTopic,
});
const wakuLightPush = new WakuLightPush(libp2p);
const wakuFilter = new WakuFilter(libp2p);
await libp2p.start();
return new Waku(
options ? options : {},
libp2p,
wakuStore,
wakuLightPush,
wakuFilter
);
}
/**
* Dials to the provided peer.
*
@ -251,12 +175,9 @@ export class Waku {
* @param protocols Waku protocols we expect from the peer; Default to Relay
*/
async dial(
peer: PeerId | Multiaddr | string,
peer: PeerId | Multiaddr,
protocols?: Protocols[]
): Promise<{
stream: MuxedStream;
protocol: string;
}> {
): Promise<Stream> {
const _protocols = protocols ?? [Protocols.Relay];
const codecs: string[] = [];
@ -287,7 +208,7 @@ export class Waku {
): void {
let peer;
if (typeof peerId === "string") {
peer = PeerId.createFromB58String(peerId);
peer = peerIdFromString(peerId);
} else {
peer = peerId;
}
@ -301,11 +222,19 @@ export class Waku {
this.libp2p.peerStore.addressBook.set(peer, addresses);
}
async start(): Promise<void> {
await this.libp2p.start();
}
async stop(): Promise<void> {
this.stopAllKeepAlives();
await this.libp2p.stop();
}
isStarted(): boolean {
return this.libp2p.isStarted();
}
/**
* Register a decryption key to attempt decryption of messages received via
* [[WakuRelay]] and [[WakuStore]]. This can either be a private key for
@ -340,90 +269,13 @@ export class Waku {
* @throws if libp2p is not listening on localhost.
*/
getLocalMultiaddrWithID(): string {
const localMultiaddr = this.libp2p.multiaddrs.find((addr) =>
addr.toString().match(/127\.0\.0\.1/)
);
const localMultiaddr = this.libp2p
.getMultiaddrs()
.find((addr) => addr.toString().match(/127\.0\.0\.1/));
if (!localMultiaddr || localMultiaddr.toString() === "") {
throw "Not listening on localhost";
}
return localMultiaddr + "/p2p/" + this.libp2p.peerId.toB58String();
}
/**
* Wait for a remote peer to be ready given the passed protocols.
* Useful when using the [[CreateOptions.bootstrap]] with [[Waku.create]].
*
* @param protocols The protocols that need to be enabled by remote peers.
* @param timeoutMs A timeout value in milliseconds..
*
* @returns A promise that **resolves** if all desired protocols are fulfilled by
* remote nodes, **rejects** if the timeoutMs is reached.
*
* @default Remote peer must have Waku Relay enabled and no time out is applied.
*/
async waitForRemotePeer(
protocols?: Protocols[],
timeoutMs?: number
): Promise<void> {
protocols = protocols ?? [Protocols.Relay];
const promises: Promise<void>[] = [];
if (protocols.includes(Protocols.Relay)) {
const peers = this.relay.getPeers();
if (peers.size == 0) {
// No peer yet available, wait for a subscription
const promise = new Promise<void>((resolve) => {
this.libp2p.pubsub.once("pubsub:subscription-change", () => {
// Remote peer subscribed to topic, now wait for a heartbeat
// so that the mesh is updated and the remote peer added to it
this.libp2p.pubsub.once("gossipsub:heartbeat", resolve);
});
});
promises.push(promise);
}
}
if (protocols.includes(Protocols.Store)) {
const storePromise = (async (): Promise<void> => {
for await (const peer of this.store.peers) {
dbg("Store peer found", peer.id.toB58String());
break;
}
})();
promises.push(storePromise);
}
if (protocols.includes(Protocols.LightPush)) {
const lightPushPromise = (async (): Promise<void> => {
for await (const peer of this.lightPush.peers) {
dbg("Light Push peer found", peer.id.toB58String());
break;
}
})();
promises.push(lightPushPromise);
}
if (protocols.includes(Protocols.Filter)) {
const filterPromise = (async (): Promise<void> => {
for await (const peer of this.filter.peers) {
dbg("Filter peer found", peer.id.toB58String());
break;
}
})();
promises.push(filterPromise);
}
if (timeoutMs) {
await rejectOnTimeout(
Promise.all(promises),
timeoutMs,
"Timed out waiting for a remote peer."
);
} else {
await Promise.all(promises);
}
return localMultiaddr + "/p2p/" + this.libp2p.peerId.toString();
}
private startKeepAlive(
@ -434,13 +286,12 @@ export class Waku {
// Just in case a timer already exist for this peer
this.stopKeepAlive(peerId);
const peerIdStr = peerId.toB58String();
const peerIdStr = peerId.toString();
if (pingPeriodSecs !== 0) {
const pingService = new PingService(this.libp2p);
this.pingKeepAliveTimers[peerIdStr] = setInterval(() => {
pingService.ping(peerId).catch((e) => {
dbg(`Ping failed (${peerIdStr})`, e);
this.libp2p.ping(peerId).catch((e) => {
log(`Ping failed (${peerIdStr})`, e);
});
}, pingPeriodSecs * 1000);
}
@ -455,7 +306,7 @@ export class Waku {
}
private stopKeepAlive(peerId: PeerId): void {
const peerIdStr = peerId.toB58String();
const peerIdStr = peerId.toString();
if (this.pingKeepAliveTimers[peerIdStr]) {
clearInterval(this.pingKeepAliveTimers[peerIdStr]);
@ -480,13 +331,3 @@ export class Waku {
this.relayKeepAliveTimers = {};
}
}
const awaitTimeout = (ms: number, rejectReason: string): Promise<void> =>
new Promise((_resolve, reject) => setTimeout(() => reject(rejectReason), ms));
const rejectOnTimeout = (
promise: Promise<any>,
timeoutMs: number,
rejectReason: string
): Promise<void> =>
Promise.race([promise, awaitTimeout(timeoutMs, rejectReason)]);

View File

@ -3,7 +3,8 @@ import debug from "debug";
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils";
import { delay } from "../../test_utils/delay";
import { Protocols, Waku } from "../waku";
import { waitForRemotePeer } from "../wait_for_remote_peer";
import { createWaku, Protocols, Waku } from "../waku";
import { WakuMessage } from "../waku_message";
const log = debug("waku:test");
@ -22,13 +23,14 @@ describe("Waku Filter", () => {
beforeEach(async function () {
this.timeout(10000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ filter: true });
waku = await Waku.create({
await nwaku.start({ filter: true, lightpush: true });
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Filter, Protocols.Relay]);
await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]);
});
it("creates a subscription", async function () {
@ -47,7 +49,7 @@ describe("Waku Filter", () => {
messageText,
TestContentTopic
);
await waku.relay.send(message);
await waku.lightPush.push(message);
while (messageCount === 0) {
await delay(250);
}
@ -63,10 +65,10 @@ describe("Waku Filter", () => {
expect(msg.contentTopic).to.eq(TestContentTopic);
};
await waku.filter.subscribe(callback, [TestContentTopic]);
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String("Filtering works!", TestContentTopic)
);
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"Filtering still works!",
TestContentTopic
@ -86,7 +88,7 @@ describe("Waku Filter", () => {
const unsubscribe = await waku.filter.subscribe(callback, [
TestContentTopic,
]);
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"This should be received",
TestContentTopic
@ -94,7 +96,7 @@ describe("Waku Filter", () => {
);
await delay(100);
await unsubscribe();
await waku.relay.send(
await waku.lightPush.push(
await WakuMessage.fromUtf8String(
"This should not be received",
TestContentTopic

View File

@ -1,8 +1,12 @@
import type { Stream } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import debug from "debug";
import lp from "it-length-prefixed";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import Libp2p, { MuxedStream } from "libp2p";
import { Peer, PeerId } from "libp2p/src/peer-store";
import type { Libp2p } from "libp2p";
import { WakuMessage as WakuMessageProto } from "../../proto/message";
import { DefaultPubSubTopic } from "../constants";
@ -11,12 +15,25 @@ import { hexToBytes } from "../utils";
import { DecryptionMethod, WakuMessage } from "../waku_message";
import { ContentFilter, FilterRPC } from "./filter_rpc";
export { ContentFilter };
export const FilterCodec = "/vac/waku/filter/2.0.0-beta1";
const log = debug("waku:filter");
type FilterSubscriptionOpts = {
export interface CreateOptions {
/**
* The PubSub Topic to use. Defaults to {@link DefaultPubSubTopic}.
*
* The usage of the default pubsub topic is recommended.
* See [Waku v2 Topic Usage Recommendations](https://rfc.vac.dev/spec/23/) for details.
*
* @default {@link DefaultPubSubTopic}
*/
pubSubTopic?: string;
}
export type FilterSubscriptionOpts = {
/**
* The Pubsub topic for the subscription
*/
@ -27,9 +44,9 @@ type FilterSubscriptionOpts = {
peerId?: PeerId;
};
type FilterCallback = (msg: WakuMessage) => void | Promise<void>;
export type FilterCallback = (msg: WakuMessage) => void | Promise<void>;
type UnsubscribeFunction = () => Promise<void>;
export type UnsubscribeFunction = () => Promise<void>;
/**
* Implements client side of the [Waku v2 Filter protocol](https://rfc.vac.dev/spec/12/).
@ -39,16 +56,20 @@ type UnsubscribeFunction = () => Promise<void>;
* - https://github.com/status-im/nwaku/issues/948
*/
export class WakuFilter {
pubSubTopic: string;
private subscriptions: Map<string, FilterCallback>;
public decryptionKeys: Map<
Uint8Array,
{ method?: DecryptionMethod; contentTopics?: string[] }
>;
constructor(public libp2p: Libp2p) {
constructor(public libp2p: Libp2p, options?: CreateOptions) {
this.subscriptions = new Map();
this.decryptionKeys = new Map();
this.libp2p.handle(FilterCodec, this.onRequest.bind(this));
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
this.libp2p
.handle(FilterCodec, this.onRequest.bind(this))
.catch((e) => log("Failed to register filter protocol", e));
}
/**
@ -62,7 +83,7 @@ export class WakuFilter {
contentTopics: string[],
opts?: FilterSubscriptionOpts
): Promise<UnsubscribeFunction> {
const topic = opts?.pubsubTopic || DefaultPubSubTopic;
const topic = opts?.pubsubTopic ?? this.pubSubTopic;
const contentFilters = contentTopics.map((contentTopic) => ({
contentTopic,
}));
@ -83,11 +104,19 @@ export class WakuFilter {
const stream = await this.newStream(peer);
try {
await pipe([request.encode()], lp.encode(), stream);
const res = await pipe(
[request.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => await all(source)
);
log("response", res);
} catch (e) {
log(
"Error subscribing to peer ",
peer.id.toB58String(),
peer.id.toString(),
"for content topics",
contentTopics,
": ",
@ -104,19 +133,22 @@ export class WakuFilter {
};
}
private async onRequest({ stream }: Libp2p.HandlerProps): Promise<void> {
private onRequest(streamData: IncomingStreamData): void {
log("Receiving message push");
try {
await pipe(
stream.source,
lp.decode(),
async (source: AsyncIterable<Buffer>) => {
pipe(streamData.stream, lp.decode(), async (source) => {
for await (const bytes of source) {
const res = FilterRPC.decode(bytes.slice());
if (res.requestId && res.push?.messages?.length) {
await this.pushMessages(res.requestId, res.push.messages);
}
}
}).then(
() => {
log("Receiving pipe closed.");
},
(e) => {
log("Error with receiving pipe", e);
}
);
} catch (e) {
@ -184,14 +216,15 @@ export class WakuFilter {
}
}
private async newStream(peer: Peer): Promise<MuxedStream> {
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) {
// Should be able to remove any at next libp2p release >0.37.3
private async newStream(peer: Peer): Promise<Stream> {
const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections) {
throw new Error("Failed to get a connection to the peer");
}
const { stream } = await connection.newStream(FilterCodec);
return stream;
// TODO: Appropriate connection selection
return connections[0].newStream(FilterCodec);
}
private async getPeer(peerId?: PeerId): Promise<Peer> {
@ -200,11 +233,11 @@ export class WakuFilter {
peer = await this.libp2p.peerStore.get(peerId);
if (!peer) {
throw new Error(
`Failed to retrieve connection details for provided peer in peer store: ${peerId.toB58String()}`
`Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}`
);
}
} else {
peer = await this.randomPeer;
peer = await this.randomPeer();
if (!peer) {
throw new Error(
"Failed to find known peer that registers waku filter protocol"
@ -238,11 +271,11 @@ export class WakuFilter {
this.decryptionKeys.delete(hexToBytes(key));
}
get peers(): AsyncIterable<Peer> {
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.libp2p, [FilterCodec]);
}
get randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(this.peers);
async randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(await this.peers());
}
}

View File

@ -3,7 +3,8 @@ import debug from "debug";
import { makeLogFileName, NOISE_KEY_1, Nwaku } from "../../test_utils";
import { delay } from "../../test_utils/delay";
import { Protocols, Waku } from "../waku";
import { waitForRemotePeer } from "../wait_for_remote_peer";
import { createWaku, Protocols, Waku } from "../waku";
import { WakuMessage } from "../waku_message";
const dbg = debug("waku:test:lightpush");
@ -25,11 +26,12 @@ describe("Waku Light Push [node only]", () => {
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ lightpush: true });
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.LightPush]);
await waitForRemotePeer(waku, [Protocols.LightPush]);
const messageText = "Light Push works!";
const message = await WakuMessage.fromUtf8String(
@ -60,12 +62,13 @@ describe("Waku Light Push [node only]", () => {
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ lightpush: true, topics: customPubSubTopic });
waku = await Waku.create({
waku = await createWaku({
pubSubTopic: customPubSubTopic,
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.LightPush]);
await waitForRemotePeer(waku, [Protocols.LightPush]);
const nimPeerId = await nwaku.getPeerId();
@ -87,7 +90,7 @@ describe("Waku Light Push [node only]", () => {
dbg("Waiting for message to show in nwaku");
while (msgs.length === 0) {
await delay(200);
msgs = await nwaku.messages();
msgs = await nwaku.messages(customPubSubTopic);
}
expect(msgs[0].contentTopic).to.equal(message.contentTopic);

View File

@ -1,9 +1,10 @@
import concat from "it-concat";
import lp from "it-length-prefixed";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import Libp2p from "libp2p";
import { Peer } from "libp2p/src/peer-store";
import PeerId from "peer-id";
import { Libp2p } from "libp2p";
import { concat } from "uint8arrays/concat";
import { PushResponse } from "../../proto/light_push";
import { DefaultPubSubTopic } from "../constants";
@ -39,11 +40,7 @@ export class WakuLightPush {
pubSubTopic: string;
constructor(public libp2p: Libp2p, options?: CreateOptions) {
if (options?.pubSubTopic) {
this.pubSubTopic = options.pubSubTopic;
} else {
this.pubSubTopic = DefaultPubSubTopic;
}
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
}
async push(
@ -55,16 +52,17 @@ export class WakuLightPush {
peer = await this.libp2p.peerStore.get(opts.peerId);
if (!peer) throw "Peer is unknown";
} else {
peer = await this.randomPeer;
peer = await this.randomPeer();
}
if (!peer) throw "No peer available";
if (!peer.protocols.includes(LightPushCodec))
throw "Peer does not register waku light push protocol";
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) throw "Failed to get a connection to the peer";
const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections) throw "Failed to get a connection to the peer";
const { stream } = await connection.newStream(LightPushCodec);
// TODO: Appropriate connection management
const stream = await connections[0].newStream(LightPushCodec);
try {
const pubSubTopic = opts?.pubSubTopic
? opts.pubSubTopic
@ -75,10 +73,11 @@ export class WakuLightPush {
lp.encode(),
stream,
lp.decode(),
concat
async (source) => await all(source)
);
try {
const response = PushRPC.decode(res.slice()).response;
const bytes = concat(res);
const response = PushRPC.decode(bytes).response;
if (!response) {
console.log("No response in PushRPC");
@ -97,9 +96,10 @@ export class WakuLightPush {
/**
* Returns known peers from the address book (`libp2p.peerStore`) that support
* light push protocol. Waku may or may not be currently connected to these peers.
* light push protocol. Waku may or may not be currently connected to these
* peers.
*/
get peers(): AsyncIterable<Peer> {
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.libp2p, [LightPushCodec]);
}
@ -108,7 +108,7 @@ export class WakuLightPush {
* book (`libp2p.peerStore`). Waku may or may not be currently connected to
* this peer.
*/
get randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(this.peers);
async randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(await this.peers());
}
}

View File

@ -14,7 +14,8 @@ import {
getPublicKey,
} from "../crypto";
import { bytesToHex, bytesToUtf8, hexToBytes, utf8ToBytes } from "../utils";
import { Protocols, Waku } from "../waku";
import { waitForRemotePeer } from "../wait_for_remote_peer";
import { createWaku, Protocols, Waku } from "../waku";
import { DecryptionMethod, WakuMessage } from "./index";
@ -29,9 +30,10 @@ describe("Waku Message [node only]", function () {
beforeEach(async function () {
this.timeout(30_000);
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
nwaku = new Nwaku(makeLogFileName(this));
dbg("Starting nwaku node");
@ -40,7 +42,7 @@ describe("Waku Message [node only]", function () {
dbg("Dialing to nwaku node");
await waku.dial(await nwaku.getMultiaddrWithId());
dbg("Wait for remote peer");
await waku.waitForRemotePeer([Protocols.Relay]);
await waitForRemotePeer(waku, [Protocols.Relay]);
dbg("Remote peer ready");
// As this test uses the nwaku RPC API, we somehow often face
// Race conditions where the nwaku node does not have the js-waku

View File

@ -1,48 +0,0 @@
import Gossipsub from "libp2p-gossipsub";
import { shuffle } from "libp2p-gossipsub/src/utils";
import { RelayCodecs } from "./constants";
/**
* Given a topic, returns up to count peers subscribed to that topic
* that pass an optional filter function
*
* @param {Gossipsub} router
* @param {String} topic
* @param {Number} count
* @param {Function} [filter] a function to filter acceptable peers
* @returns {Set<string>}
*
*/
export function getRelayPeers(
router: Gossipsub,
topic: string,
count: number,
filter: (id: string) => boolean = (): boolean => true
): Set<string> {
const peersInTopic = router.topics.get(topic);
if (!peersInTopic) {
return new Set();
}
// Adds all peers using our protocol
// that also pass the filter function
let peers: string[] = [];
peersInTopic.forEach((id: string) => {
const peerStreams = router.peers.get(id);
if (!peerStreams) {
return;
}
if (RelayCodecs.includes(peerStreams.protocol) && filter(id)) {
peers.push(id);
}
});
// Pseudo-randomly shuffles peers
peers = shuffle(peers);
if (count > 0 && peers.length > count) {
peers = peers.slice(0, count);
}
return new Set(peers);
}

View File

@ -1,3 +1,4 @@
import { PeerId } from "@libp2p/interface-peer-id";
import { expect } from "chai";
import debug from "debug";
@ -5,6 +6,7 @@ import {
makeLogFileName,
NOISE_KEY_1,
NOISE_KEY_2,
NOISE_KEY_3,
Nwaku,
} from "../../test_utils";
import { delay } from "../../test_utils/delay";
@ -14,7 +16,8 @@ import {
generateSymmetricKey,
getPublicKey,
} from "../crypto";
import { Protocols, Waku } from "../waku";
import { waitForRemotePeer } from "../wait_for_remote_peer";
import { createWaku, Protocols, Waku } from "../waku";
import { DecryptionMethod, WakuMessage } from "../waku_message";
const log = debug("waku:test");
@ -38,19 +41,24 @@ describe("Waku Relay [node only]", () => {
log("Starting JS Waku instances");
[waku1, waku2] = await Promise.all([
Waku.create({ staticNoiseKey: NOISE_KEY_1 }),
Waku.create({
createWaku({ staticNoiseKey: NOISE_KEY_1 }).then((waku) =>
waku.start().then(() => waku)
),
createWaku({
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
}),
}).then((waku) => waku.start().then(() => waku)),
]);
log("Instances started, adding waku2 to waku1's address book");
waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
waku1.addPeerToAddressBook(
waku2.libp2p.peerId,
waku2.libp2p.getMultiaddrs()
);
log("Wait for mutual pubsub subscription");
await Promise.all([
waku1.waitForRemotePeer([Protocols.Relay]),
waku2.waitForRemotePeer([Protocols.Relay]),
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
]);
log("before each hook done");
});
@ -64,18 +72,20 @@ describe("Waku Relay [node only]", () => {
it("Subscribe", async function () {
log("Getting subscribers");
const subscribers1 =
waku1.libp2p.pubsub.getSubscribers(DefaultPubSubTopic);
const subscribers2 =
waku2.libp2p.pubsub.getSubscribers(DefaultPubSubTopic);
const subscribers1 = waku1.libp2p.pubsub
.getSubscribers(DefaultPubSubTopic)
.map((p) => p.toString());
const subscribers2 = waku2.libp2p.pubsub
.getSubscribers(DefaultPubSubTopic)
.map((p) => p.toString());
log("Asserting mutual subscription");
expect(subscribers1).to.contain(waku2.libp2p.peerId.toB58String());
expect(subscribers2).to.contain(waku1.libp2p.peerId.toB58String());
expect(subscribers1).to.contain(waku2.libp2p.peerId.toString());
expect(subscribers2).to.contain(waku1.libp2p.peerId.toString());
});
it("Register correct protocols", async function () {
const protocols = Array.from(waku1.libp2p.upgrader.protocols.keys());
const protocols = waku1.libp2p.registrar.getProtocols();
expect(protocols).to.contain("/vac/waku/relay/2.0.0");
expect(protocols.findIndex((value) => value.match(/sub/))).to.eq(-1);
@ -247,34 +257,52 @@ describe("Waku Relay [node only]", () => {
});
describe("Custom pubsub topic", () => {
let waku1: Waku;
let waku2: Waku;
let waku3: Waku;
afterEach(async function () {
!!waku1 &&
waku1.stop().catch((e) => console.log("Waku failed to stop", e));
!!waku2 &&
waku2.stop().catch((e) => console.log("Waku failed to stop", e));
!!waku3 &&
waku3.stop().catch((e) => console.log("Waku failed to stop", e));
});
it("Publish", async function () {
this.timeout(10000);
const pubSubTopic = "/some/pubsub/topic";
// 1 and 2 uses a custom pubsub
const [waku1, waku2, waku3] = await Promise.all([
Waku.create({
// 3 uses the default pubsub
[waku1, waku2, waku3] = await Promise.all([
createWaku({
pubSubTopic: pubSubTopic,
staticNoiseKey: NOISE_KEY_1,
}),
Waku.create({
}).then((waku) => waku.start().then(() => waku)),
createWaku({
pubSubTopic: pubSubTopic,
staticNoiseKey: NOISE_KEY_2,
libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } },
}),
Waku.create({
staticNoiseKey: NOISE_KEY_2,
}),
}).then((waku) => waku.start().then(() => waku)),
createWaku({
staticNoiseKey: NOISE_KEY_3,
}).then((waku) => waku.start().then(() => waku)),
]);
waku1.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
waku3.addPeerToAddressBook(waku2.libp2p.peerId, waku2.libp2p.multiaddrs);
waku1.addPeerToAddressBook(
waku2.libp2p.peerId,
waku2.libp2p.getMultiaddrs()
);
waku3.addPeerToAddressBook(
waku2.libp2p.peerId,
waku2.libp2p.getMultiaddrs()
);
await Promise.all([
waku1.waitForRemotePeer([Protocols.Relay]),
waku2.waitForRemotePeer([Protocols.Relay]),
// No subscription change expected for Waku 3
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
]);
const messageText = "Communicating using a custom pubsub topic";
@ -313,15 +341,16 @@ describe("Waku Relay [node only]", () => {
beforeEach(async function () {
this.timeout(30_000);
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
nwaku = new Nwaku(this.test?.ctx?.currentTest?.title + "");
await nwaku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Relay]);
await waitForRemotePeer(waku, [Protocols.Relay]);
});
afterEach(async function () {
@ -330,7 +359,7 @@ describe("Waku Relay [node only]", () => {
});
it("nwaku subscribes", async function () {
let subscribers: string[] = [];
let subscribers: PeerId[] = [];
while (subscribers.length === 0) {
await delay(200);
@ -338,7 +367,9 @@ describe("Waku Relay [node only]", () => {
}
const nimPeerId = await nwaku.getPeerId();
expect(subscribers).to.contain(nimPeerId.toB58String());
expect(subscribers.map((p) => p.toString())).to.contain(
nimPeerId.toString()
);
});
it("Publishes to nwaku", async function () {
@ -405,12 +436,12 @@ describe("Waku Relay [node only]", () => {
it("Js publishes, other Js receives", async function () {
this.timeout(60_000);
[waku1, waku2] = await Promise.all([
Waku.create({
createWaku({
staticNoiseKey: NOISE_KEY_1,
}),
Waku.create({
}).then((waku) => waku.start().then(() => waku)),
createWaku({
staticNoiseKey: NOISE_KEY_2,
}),
}).then((waku) => waku.start().then(() => waku)),
]);
nwaku = new Nwaku(makeLogFileName(this));
@ -424,8 +455,8 @@ describe("Waku Relay [node only]", () => {
// Wait for identify protocol to finish
await Promise.all([
waku1.waitForRemotePeer([Protocols.Relay]),
waku2.waitForRemotePeer([Protocols.Relay]),
waitForRemotePeer(waku1, [Protocols.Relay]),
waitForRemotePeer(waku2, [Protocols.Relay]),
]);
await delay(2000);

View File

@ -1,17 +1,14 @@
import debug from "debug";
import Libp2p from "libp2p";
import Gossipsub from "libp2p-gossipsub";
import { AddrInfo, MessageIdFunction } from "libp2p-gossipsub/src/interfaces";
import { MessageCache } from "libp2p-gossipsub/src/message-cache";
import { RPC } from "libp2p-gossipsub/src/message/rpc";
import {
PeerScoreParams,
PeerScoreThresholds,
} from "libp2p-gossipsub/src/score";
import { createGossipRpc, shuffle } from "libp2p-gossipsub/src/utils";
import { InMessage } from "libp2p-interfaces/src/pubsub";
import { SignaturePolicy } from "libp2p-interfaces/src/pubsub/signature-policy";
import PeerId from "peer-id";
GossipSub,
GossipsubMessage,
GossipsubOpts,
} from "@chainsafe/libp2p-gossipsub";
import {
PeerIdStr,
TopicStr,
} from "@chainsafe/libp2p-gossipsub/dist/src/types";
import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types";
import debug from "debug";
import { DefaultPubSubTopic } from "../constants";
import { hexToBytes } from "../utils";
@ -19,35 +16,9 @@ import { CreateOptions } from "../waku";
import { DecryptionMethod, WakuMessage } from "../waku_message";
import * as constants from "./constants";
import { getRelayPeers } from "./get_relay_peers";
import { RelayHeartbeat } from "./relay_heartbeat";
const dbg = debug("waku:relay");
/**
* See constructor libp2p-gossipsub [API](https://github.com/ChainSafe/js-libp2p-gossipsub#api).
*/
export interface GossipOptions {
emitSelf: boolean;
gossipIncoming: boolean;
fallbackToFloodsub: boolean;
floodPublish: boolean;
doPX: boolean;
msgIdFn: MessageIdFunction;
messageCache: MessageCache;
// This option is always overridden
// globalSignaturePolicy: string;
scoreParams: Partial<PeerScoreParams>;
scoreThresholds: Partial<PeerScoreThresholds>;
directPeers: AddrInfo[];
D: number;
Dlo: number;
Dhi: number;
Dscore: number;
Dout: number;
Dlazy: number;
}
/**
* Implements the [Waku v2 Relay protocol]{@link https://rfc.vac.dev/spec/11/}.
* Must be passed as a `pubsub` module to a {Libp2p} instance.
@ -55,9 +26,9 @@ export interface GossipOptions {
* @implements {require('libp2p-interfaces/src/pubsub')}
* @noInheritDoc
*/
export class WakuRelay extends Gossipsub {
heartbeat: RelayHeartbeat;
export class WakuRelay extends GossipSub {
pubSubTopic: string;
public static multicodec: string = constants.RelayCodecs[0];
public decryptionKeys: Map<
Uint8Array,
@ -72,27 +43,19 @@ export class WakuRelay extends Gossipsub {
[contentTopic: string]: Set<(message: WakuMessage) => void>;
};
constructor(
libp2p: Libp2p,
options?: Partial<CreateOptions & GossipOptions>
) {
super(
libp2p,
Object.assign(options ?? {}, {
constructor(options?: Partial<CreateOptions & GossipsubOpts>) {
options = Object.assign(options ?? {}, {
// Ensure that no signature is included nor expected in the messages.
globalSignaturePolicy: SignaturePolicy.StrictNoSign,
})
);
fallbackToFloodsub: false,
});
super(options);
this.multicodecs = constants.RelayCodecs;
this.heartbeat = new RelayHeartbeat(this);
this.observers = {};
this.decryptionKeys = new Map();
const multicodecs = constants.RelayCodecs;
Object.assign(this, { multicodecs });
this.pubSubTopic = options?.pubSubTopic || DefaultPubSubTopic;
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
options?.decryptionKeys?.forEach((key) => {
this.addDecryptionKey(key);
@ -119,7 +82,7 @@ export class WakuRelay extends Gossipsub {
*/
public async send(message: WakuMessage): Promise<void> {
const msg = message.encode();
await super.publish(this.pubSubTopic, msg);
await this.publish(this.pubSubTopic, msg);
}
/**
@ -195,25 +158,16 @@ export class WakuRelay extends Gossipsub {
}
}
/**
* Return the relay peers we are connected to, and we would publish a message to
*/
getPeers(): Set<string> {
return getRelayPeers(this, this.pubSubTopic, this._options.D, (id) => {
// Filter peers we would not publish to
return (
this.score.score(id) >= this._options.scoreThresholds.publishThreshold
);
});
}
/**
* Subscribe to a pubsub topic and start emitting Waku messages to observers.
*
* @override
*/
subscribe(pubSubTopic: string): void {
this.on(pubSubTopic, (event) => {
this.addEventListener(
"gossipsub:message",
(event: CustomEvent<GossipsubMessage>) => {
if (event.detail.msg.topic === pubSubTopic) {
const decryptionKeys = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => {
return {
@ -225,7 +179,7 @@ export class WakuRelay extends Gossipsub {
);
dbg(`Message received on ${pubSubTopic}`);
WakuMessage.decode(event.data, decryptionKeys)
WakuMessage.decode(event.detail.msg.data, decryptionKeys)
.then((wakuMsg) => {
if (!wakuMsg) {
dbg("Failed to decode Waku Message");
@ -248,285 +202,14 @@ export class WakuRelay extends Gossipsub {
.catch((e) => {
dbg("Failed to decode Waku Message", e);
});
});
}
}
);
super.subscribe(pubSubTopic);
}
/**
* Join pubsub topic.
* This is present to override the behavior of Gossipsub and should not
* be used by API Consumers
*
* @internal
* @param {string} topic
* @returns {void}
* @override
*/
join(topic: string): void {
if (!this.started) {
throw new Error("WakuRelayPubSub has not started");
}
const fanoutPeers = this.fanout.get(topic);
if (fanoutPeers) {
// these peers have a score above the publish threshold, which may be negative
// so drop the ones with a negative score
fanoutPeers.forEach((id) => {
if (this.score.score(id) < 0) {
fanoutPeers.delete(id);
}
});
if (fanoutPeers.size < this._options.D) {
// we need more peers; eager, as this would get fixed in the next heartbeat
getRelayPeers(
this,
topic,
this._options.D - fanoutPeers.size,
(id: string): boolean => {
// filter our current peers, direct peers, and peers with negative scores
return (
!fanoutPeers.has(id) &&
!this.direct.has(id) &&
this.score.score(id) >= 0
);
}
).forEach((id) => fanoutPeers.add(id));
}
this.mesh.set(topic, fanoutPeers);
this.fanout.delete(topic);
this.lastpub.delete(topic);
} else {
const peers = getRelayPeers(
this,
topic,
this._options.D,
(id: string): boolean => {
// filter direct peers and peers with negative score
return !this.direct.has(id) && this.score.score(id) >= 0;
}
);
this.mesh.set(topic, peers);
}
this.mesh.get(topic)?.forEach((id) => {
this.log("JOIN: Add mesh link to %s in %s", id, topic);
this._sendGraft(id, topic);
});
}
/**
* Publish messages.
* This is present to override the behavior of Gossipsub and should not
* be used by API Consumers
*
* @ignore
* @override
* @param {InMessage} msg
* @returns {void}
*/
async _publish(msg: InMessage): Promise<void> {
const msgIdStr = await this.getCanonicalMsgIdStr(msg);
if (msg.receivedFrom !== this.peerId.toB58String()) {
this.score.deliverMessage(msg, msgIdStr);
this.gossipTracer.deliverMessage(msgIdStr);
}
// put in seen cache
this.seenCache.put(msgIdStr);
this.messageCache.put(msg, msgIdStr);
const toSend = new Set<string>();
msg.topicIDs.forEach((topic) => {
const peersInTopic = this.topics.get(topic);
if (!peersInTopic) {
return;
}
// direct peers
this.direct.forEach((id) => {
toSend.add(id);
});
let meshPeers = this.mesh.get(topic);
if (!meshPeers || !meshPeers.size) {
// We are not in the mesh for topic, use fanout peers
meshPeers = this.fanout.get(topic);
if (!meshPeers) {
// If we are not in the fanout, then pick peers in topic above the publishThreshold
const peers = getRelayPeers(this, topic, this._options.D, (id) => {
return (
this.score.score(id) >=
this._options.scoreThresholds.publishThreshold
);
});
if (peers.size > 0) {
meshPeers = peers;
this.fanout.set(topic, peers);
} else {
meshPeers = new Set();
}
}
// Store the latest publishing time
this.lastpub.set(topic, this._now());
}
meshPeers?.forEach((peer) => {
toSend.add(peer);
});
});
// Publish messages to peers
const rpc = createGossipRpc([Gossipsub.utils.normalizeOutRpcMessage(msg)]);
dbg(`Relay message to ${toSend.size} peers`);
toSend.forEach((id) => {
if (id === msg.from) {
return;
}
dbg("Relay message to", id);
this._sendRpc(id, rpc);
});
}
/**
* Emits gossip to peers in a particular topic.
*
* This is present to override the behavior of Gossipsub and should not
* be used by API Consumers
*
* @ignore
* @override
* @param {string} topic
* @param {Set<string>} exclude peers to exclude
* @returns {void}
*/
_emitGossip(topic: string, exclude: Set<string>): void {
const messageIDs = this.messageCache.getGossipIDs(topic);
if (!messageIDs.length) {
return;
}
// shuffle to emit in random order
shuffle(messageIDs);
// if we are emitting more than GossipsubMaxIHaveLength ids, truncate the list
if (messageIDs.length > constants.RelayMaxIHaveLength) {
// we do the truncation (with shuffling) per peer below
this.log(
"too many messages for gossip; will truncate IHAVE list (%d messages)",
messageIDs.length
);
}
// Send gossip to GossipFactor peers above threshold with a minimum of D_lazy
// First we collect the peers above gossipThreshold that are not in the exclude set
// and then randomly select from that set
// We also exclude direct peers, as there is no reason to emit gossip to them
const peersToGossip: string[] = [];
const topicPeers = this.topics.get(topic);
if (!topicPeers) {
// no topic peers, no gossip
return;
}
topicPeers.forEach((id) => {
const peerStreams = this.peers.get(id);
if (!peerStreams) {
return;
}
if (
!exclude.has(id) &&
!this.direct.has(id) &&
constants.RelayCodecs.includes(peerStreams.protocol) &&
this.score.score(id) >= this._options.scoreThresholds.gossipThreshold
) {
peersToGossip.push(id);
}
});
let target = this._options.Dlazy;
const factor = constants.RelayGossipFactor * peersToGossip.length;
if (factor > target) {
target = factor;
}
if (target > peersToGossip.length) {
target = peersToGossip.length;
} else {
shuffle(peersToGossip);
}
// Emit the IHAVE gossip to the selected peers up to the target
peersToGossip.slice(0, target).forEach((id) => {
let peerMessageIDs = messageIDs;
if (messageIDs.length > constants.RelayMaxIHaveLength) {
// shuffle and slice message IDs per peer so that we emit a different set for each peer
// we have enough redundancy in the system that this will significantly increase the message
// coverage when we do truncate
peerMessageIDs = shuffle(peerMessageIDs.slice()).slice(
0,
constants.RelayMaxIHaveLength
);
}
this._pushGossip(id, {
topicID: topic,
messageIDs: peerMessageIDs,
});
});
}
/**
* Make a PRUNE control message for a peer in a topic.
* This is present to override the behavior of Gossipsub and should not
* be used by API Consumers
*
* @ignore
* @override
* @param {string} id
* @param {string} topic
* @param {boolean} doPX
* @returns {Promise<RPC.IControlPrune>}
*/
async _makePrune(
id: string,
topic: string,
doPX: boolean
): Promise<RPC.IControlPrune> {
// backoff is measured in seconds
// RelayPruneBackoff is measured in milliseconds
const backoff = constants.RelayPruneBackoff / 1000;
if (!doPX) {
return {
topicID: topic,
peers: [],
backoff: backoff,
};
}
// select peers for Peer eXchange
const peers = getRelayPeers(
this,
topic,
constants.RelayPrunePeers,
(xid: string): boolean => {
return xid !== id && this.score.score(xid) >= 0;
}
);
const px = await Promise.all(
Array.from(peers).map(async (p) => {
// see if we have a signed record to send back; if we don't, just send
// the peer ID and let the pruned peer find them in the DHT -- we can't trust
// unsigned address records through PX anyways
// Finding signed records in the DHT is not supported at the time of writing in js-libp2p
const peerId = PeerId.createFromB58String(p);
return {
peerID: peerId.toBytes(),
signedPeerRecord:
await this._libp2p.peerStore.addressBook.getRawEnvelope(peerId),
};
})
);
return {
topicID: topic,
peers: px,
backoff: backoff,
};
getMeshPeers(topic?: TopicStr): PeerIdStr[] {
return super.getMeshPeers(topic ?? this.pubSubTopic);
}
}

View File

@ -1,379 +0,0 @@
/**
* @hidden
* @module
*/
import Gossipsub from "libp2p-gossipsub";
import { Heartbeat } from "libp2p-gossipsub/src/heartbeat";
import { shuffle } from "libp2p-gossipsub/src/utils";
import * as constants from "./constants";
import { getRelayPeers } from "./get_relay_peers";
export class RelayHeartbeat extends Heartbeat {
/**
* @param {Object} gossipsub
* @constructor
*/
constructor(gossipsub: Gossipsub) {
super(gossipsub);
}
start(): void {
if (this._heartbeatTimer) {
return;
}
const heartbeat = this._heartbeat.bind(this);
const timeout = setTimeout(() => {
heartbeat();
this._heartbeatTimer?.runPeriodically(
heartbeat,
constants.RelayHeartbeatInterval
);
}, constants.RelayHeartbeatInitialDelay);
this._heartbeatTimer = {
_intervalId: undefined,
runPeriodically: (fn, period): void => {
// this._heartbeatTimer cannot be null, it is being assigned.
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
this._heartbeatTimer!._intervalId = setInterval(fn, period);
},
cancel: (): void => {
clearTimeout(timeout);
clearInterval(this._heartbeatTimer?._intervalId as NodeJS.Timeout);
},
};
}
/**
* Unmounts the gossipsub protocol and shuts down every connection
* @override
* @returns {void}
*/
stop(): void {
if (!this._heartbeatTimer) {
return;
}
this._heartbeatTimer.cancel();
this._heartbeatTimer = null;
}
/**
* Maintains the mesh and fanout maps in gossipsub.
*
* @returns {void}
*/
_heartbeat(): void {
const { D, Dlo, Dhi, Dscore, Dout } = this.gossipsub._options;
this.gossipsub.heartbeatTicks++;
// cache scores through the heartbeat
const scores = new Map<string, number>();
const getScore = (id: string): number => {
let s = scores.get(id);
if (s === undefined) {
s = this.gossipsub.score.score(id);
scores.set(id, s);
}
return s;
};
// peer id => topic[]
const toGraft = new Map<string, string[]>();
// peer id => topic[]
const toPrune = new Map<string, string[]>();
// peer id => don't px
const noPX = new Map<string, boolean>();
// clean up expired backoffs
this.gossipsub._clearBackoff();
// clean up peerhave/iasked counters
this.gossipsub.peerhave.clear();
this.gossipsub.iasked.clear();
// apply IWANT request penalties
this.gossipsub._applyIwantPenalties();
// ensure direct peers are connected
this.gossipsub._directConnect();
// maintain the mesh for topics we have joined
this.gossipsub.mesh.forEach((peers, topic) => {
// prune/graft helper functions (defined per topic)
const prunePeer = (id: string): void => {
this.gossipsub.log(
"HEARTBEAT: Remove mesh link to %s in %s",
id,
topic
);
// update peer score
this.gossipsub.score.prune(id, topic);
// add prune backoff record
this.gossipsub._addBackoff(id, topic);
// remove peer from mesh
peers.delete(id);
// add to toPrune
const topics = toPrune.get(id);
if (!topics) {
toPrune.set(id, [topic]);
} else {
topics.push(topic);
}
};
const graftPeer = (id: string): void => {
this.gossipsub.log("HEARTBEAT: Add mesh link to %s in %s", id, topic);
// update peer score
this.gossipsub.score.graft(id, topic);
// add peer to mesh
peers.add(id);
// add to toGraft
const topics = toGraft.get(id);
if (!topics) {
toGraft.set(id, [topic]);
} else {
topics.push(topic);
}
};
// drop all peers with negative score, without PX
peers.forEach((id) => {
const score = getScore(id);
if (score < 0) {
this.gossipsub.log(
"HEARTBEAT: Prune peer %s with negative score: score=%d, topic=%s",
id,
score,
topic
);
prunePeer(id);
noPX.set(id, true);
}
});
// do we have enough peers?
if (peers.size < Dlo) {
const backoff = this.gossipsub.backoff.get(topic);
const ineed = D - peers.size;
const peersSet = getRelayPeers(
this.gossipsub,
topic,
ineed,
(id: string) => {
// filter out mesh peers, direct peers, peers we are backing off, peers with negative score
return (
!peers.has(id) &&
!this.gossipsub.direct.has(id) &&
(!backoff || !backoff.has(id)) &&
getScore(id) >= 0
);
}
);
peersSet.forEach(graftPeer);
}
// do we have to many peers?
if (peers.size > Dhi) {
let peersArray = Array.from(peers);
// sort by score
peersArray.sort((a, b) => getScore(b) - getScore(a));
// We keep the first D_score peers by score and the remaining up to D randomly
// under the constraint that we keep D_out peers in the mesh (if we have that many)
peersArray = peersArray
.slice(0, Dscore)
.concat(shuffle(peersArray.slice(Dscore)));
// count the outbound peers we are keeping
let outbound = 0;
peersArray.slice(0, D).forEach((p) => {
if (this.gossipsub.outbound.get(p)) {
outbound++;
}
});
// if it's less than D_out, bubble up some outbound peers from the random selection
if (outbound < Dout) {
const rotate = (i: number): void => {
// rotate the peersArray to the right and put the ith peer in the front
const p = peersArray[i];
for (let j = i; j > 0; j--) {
peersArray[j] = peersArray[j - 1];
}
peersArray[0] = p;
};
// first bubble up all outbound peers already in the selection to the front
if (outbound > 0) {
let ihave = outbound;
for (let i = 1; i < D && ihave > 0; i++) {
if (this.gossipsub.outbound.get(peersArray[i])) {
rotate(i);
ihave--;
}
}
}
// now bubble up enough outbound peers outside the selection to the front
let ineed = D - outbound;
for (let i = D; i < peersArray.length && ineed > 0; i++) {
if (this.gossipsub.outbound.get(peersArray[i])) {
rotate(i);
ineed--;
}
}
}
// prune the excess peers
peersArray.slice(D).forEach(prunePeer);
}
// do we have enough outbound peers?
if (peers.size >= Dlo) {
// count the outbound peers we have
let outbound = 0;
peers.forEach((p) => {
if (this.gossipsub.outbound.get(p)) {
outbound++;
}
});
// if it's less than D_out, select some peers with outbound connections and graft them
if (outbound < Dout) {
const ineed = Dout - outbound;
const backoff = this.gossipsub.backoff.get(topic);
getRelayPeers(this.gossipsub, topic, ineed, (id: string): boolean => {
// filter our current mesh peers, direct peers, peers we are backing off, peers with negative score
return (
!peers.has(id) &&
!this.gossipsub.direct.has(id) &&
(!backoff || !backoff.has(id)) &&
getScore(id) >= 0
);
}).forEach(graftPeer);
}
}
// should we try to improve the mesh with opportunistic grafting?
if (
this.gossipsub.heartbeatTicks %
constants.RelayOpportunisticGraftTicks ===
0 &&
peers.size > 1
) {
// Opportunistic grafting works as follows: we check the median score of peers in the
// mesh; if this score is below the opportunisticGraftThreshold, we select a few peers at
// random with score over the median.
// The intention is to (slowly) improve an under performing mesh by introducing good
// scoring peers that may have been gossiping at us. This allows us to get out of sticky
// situations where we are stuck with poor peers and also recover from churn of good peers.
// now compute the median peer score in the mesh
const peersList = Array.from(peers).sort(
(a, b) => getScore(a) - getScore(b)
);
const medianIndex = Math.floor(peers.size / 2);
const medianScore = getScore(peersList[medianIndex]);
// if the median score is below the threshold, select a better peer (if any) and GRAFT
if (
medianScore <
this.gossipsub._options.scoreThresholds.opportunisticGraftThreshold
) {
const backoff = this.gossipsub.backoff.get(topic);
const peersToGraft = getRelayPeers(
this.gossipsub,
topic,
constants.RelayOpportunisticGraftPeers,
(id: string): boolean => {
// filter out current mesh peers, direct peers, peers we are backing off, peers below or at threshold
return (
peers.has(id) &&
!this.gossipsub.direct.has(id) &&
(!backoff || !backoff.has(id)) &&
getScore(id) > medianScore
);
}
);
peersToGraft.forEach((id: string) => {
this.gossipsub.log(
"HEARTBEAT: Opportunistically graft peer %s on topic %s",
id,
topic
);
graftPeer(id);
});
}
}
// 2nd arg are mesh peers excluded from gossip. We have already pushed
// messages to them, so its redundant to gossip IHAVEs.
this.gossipsub._emitGossip(topic, peers);
});
// expire fanout for topics we haven't published to in a while
const now = this.gossipsub._now();
this.gossipsub.lastpub.forEach((lastpub, topic) => {
if (lastpub + constants.RelayFanoutTTL < now) {
this.gossipsub.fanout.delete(topic);
this.gossipsub.lastpub.delete(topic);
}
});
// maintain our fanout for topics we are publishing but we have not joined
this.gossipsub.fanout.forEach((fanoutPeers, topic) => {
// checks whether our peers are still in the topic and have a score above the publish threshold
const topicPeers = this.gossipsub.topics.get(topic);
fanoutPeers.forEach((id) => {
if (
!topicPeers?.has(id) ||
getScore(id) <
this.gossipsub._options.scoreThresholds.publishThreshold
) {
fanoutPeers.delete(id);
}
});
// do we need more peers?
if (fanoutPeers.size < D) {
const ineed = D - fanoutPeers.size;
const peersSet = getRelayPeers(
this.gossipsub,
topic,
ineed,
(id: string): boolean => {
// filter out existing fanout peers, direct peers, and peers with score above the publish threshold
return (
!fanoutPeers.has(id) &&
!this.gossipsub.direct.has(id) &&
getScore(id) >=
this.gossipsub._options.scoreThresholds.publishThreshold
);
}
);
peersSet.forEach((id: string) => {
fanoutPeers.add(id);
});
}
// 2nd arg are fanout peers excluded from gossip.
// We have already pushed messages to them, so its redundant to gossip IHAVEs
this.gossipsub._emitGossip(topic, fanoutPeers);
});
// send coalesced GRAFT/PRUNE messages (will piggyback gossip)
this.gossipsub._sendGraftPrune(toGraft, toPrune, noPX);
// flush pending gossip that wasn't piggybacked above
this.gossipsub._flush();
// advance the message history window
this.gossipsub.messageCache.shift();
this.gossipsub.emit("gossipsub:heartbeat");
}
}

View File

@ -12,7 +12,8 @@ import {
generateSymmetricKey,
getPublicKey,
} from "../crypto";
import { Protocols, Waku } from "../waku";
import { waitForRemotePeer } from "../wait_for_remote_peer";
import { createWaku, Protocols, Waku } from "../waku";
import { DecryptionMethod, WakuMessage } from "../waku_message";
import { PageDirection } from "./history_rpc";
@ -46,11 +47,12 @@ describe("Waku Store", () => {
).to.be.true;
}
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Store]);
await waitForRemotePeer(waku, [Protocols.Store]);
const messages = await waku.store.queryHistory([]);
expect(messages?.length).eq(2);
@ -78,11 +80,12 @@ describe("Waku Store", () => {
).to.be.true;
}
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Store]);
await waitForRemotePeer(waku, [Protocols.Store]);
let messages: WakuMessage[] = [];
@ -117,11 +120,12 @@ describe("Waku Store", () => {
).to.be.true;
}
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Store]);
await waitForRemotePeer(waku, [Protocols.Store]);
let messages: WakuMessage[] = [];
const desiredMsgs = 14;
@ -153,11 +157,12 @@ describe("Waku Store", () => {
).to.be.true;
}
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Store]);
await waitForRemotePeer(waku, [Protocols.Store]);
const messages = await waku.store.queryHistory([], {
pageDirection: PageDirection.FORWARD,
@ -191,12 +196,13 @@ describe("Waku Store", () => {
).to.be.true;
}
waku = await Waku.create({
waku = await createWaku({
pubSubTopic: customPubSubTopic,
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Store]);
await waitForRemotePeer(waku, [Protocols.Store]);
const nimPeerId = await nwaku.getPeerId();
@ -259,12 +265,12 @@ describe("Waku Store", () => {
dbg("Messages have been encrypted");
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
Waku.create({
createWaku({
staticNoiseKey: NOISE_KEY_1,
}),
Waku.create({
}).then((waku) => waku.start().then(() => waku)),
createWaku({
staticNoiseKey: NOISE_KEY_2,
}),
}).then((waku) => waku.start().then(() => waku)),
nwaku.getMultiaddrWithId(),
]);
@ -277,7 +283,7 @@ describe("Waku Store", () => {
dbg("Waku nodes connected to nwaku");
await waku1.waitForRemotePeer([Protocols.LightPush]);
await waitForRemotePeer(waku1, [Protocols.LightPush]);
dbg("Sending messages using light push");
await Promise.all([
@ -287,7 +293,7 @@ describe("Waku Store", () => {
waku1.lightPush.push(clearMessage),
]);
await waku2.waitForRemotePeer([Protocols.Store]);
await waitForRemotePeer(waku2, [Protocols.Store]);
waku2.store.addDecryptionKey(symKey);
@ -363,12 +369,12 @@ describe("Waku Store", () => {
dbg("Messages have been encrypted");
const [waku1, waku2, nimWakuMultiaddr] = await Promise.all([
Waku.create({
createWaku({
staticNoiseKey: NOISE_KEY_1,
}),
Waku.create({
}).then((waku) => waku.start().then(() => waku)),
createWaku({
staticNoiseKey: NOISE_KEY_2,
}),
}).then((waku) => waku.start().then(() => waku)),
nwaku.getMultiaddrWithId(),
]);
@ -381,7 +387,7 @@ describe("Waku Store", () => {
dbg("Waku nodes connected to nwaku");
await waku1.waitForRemotePeer([Protocols.LightPush]);
await waitForRemotePeer(waku1, [Protocols.LightPush]);
dbg("Sending messages using light push");
await Promise.all([
@ -391,7 +397,7 @@ describe("Waku Store", () => {
waku1.lightPush.push(clearMessage),
]);
await waku2.waitForRemotePeer([Protocols.Store]);
await waitForRemotePeer(waku2, [Protocols.Store]);
waku2.addDecryptionKey(symKey, {
contentTopics: [encryptedSymmetricContentTopic],
@ -450,11 +456,12 @@ describe("Waku Store", () => {
).to.be.true;
}
waku = await Waku.create({
waku = await createWaku({
staticNoiseKey: NOISE_KEY_1,
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waku.waitForRemotePeer([Protocols.Store]);
await waitForRemotePeer(waku, [Protocols.Store]);
const nwakuPeerId = await nwaku.getPeerId();

View File

@ -1,10 +1,11 @@
import type { PeerId } from "@libp2p/interface-peer-id";
import { Peer } from "@libp2p/interface-peer-store";
import debug from "debug";
import concat from "it-concat";
import lp from "it-length-prefixed";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import Libp2p from "libp2p";
import { Peer } from "libp2p/src/peer-store";
import PeerId from "peer-id";
import { Libp2p } from "libp2p";
import { concat } from "uint8arrays/concat";
import * as protoV2Beta4 from "../../proto/store_v2beta4";
import { HistoryResponse } from "../../proto/store_v2beta4";
@ -105,11 +106,7 @@ export class WakuStore {
>;
constructor(public libp2p: Libp2p, options?: CreateOptions) {
if (options?.pubSubTopic) {
this.pubSubTopic = options.pubSubTopic;
} else {
this.pubSubTopic = DefaultPubSubTopic;
}
this.pubSubTopic = options?.pubSubTopic ?? DefaultPubSubTopic;
this.decryptionKeys = new Map();
}
@ -146,7 +143,7 @@ export class WakuStore {
);
dbg("Querying history with the following options", {
peerId: options?.peerId?.toB58String(),
peerId: options?.peerId?.toString(),
...options,
});
@ -154,9 +151,9 @@ export class WakuStore {
if (opts.peerId) {
peer = await this.libp2p.peerStore.get(opts.peerId);
if (!peer)
throw `Failed to retrieve connection details for provided peer in peer store: ${opts.peerId.toB58String()}`;
throw `Failed to retrieve connection details for provided peer in peer store: ${opts.peerId.toString()}`;
} else {
peer = await this.randomPeer;
peer = await this.randomPeer();
if (!peer)
throw "Failed to find known peer that registers waku store protocol";
}
@ -170,11 +167,12 @@ export class WakuStore {
}
dbg(`Use store codec ${storeCodec}`);
if (!storeCodec)
throw `Peer does not register waku store protocol: ${peer.id.toB58String()}`;
throw `Peer does not register waku store protocol: ${peer.id.toString()}`;
Object.assign(opts, { storeCodec });
const connection = this.libp2p.connectionManager.get(peer.id);
if (!connection) throw "Failed to get a connection to the peer";
const connections = this.libp2p.connectionManager.getConnections(peer.id);
if (!connections || !connections.length)
throw "Failed to get a connection to the peer";
const decryptionKeys = Array.from(this.decryptionKeys).map(
([key, { method, contentTopics }]) => {
@ -201,19 +199,21 @@ export class WakuStore {
const messages: WakuMessage[] = [];
let cursor = undefined;
while (true) {
const { stream } = await connection.newStream(storeCodec);
// TODO: Some connection selection logic?
const stream = await connections[0].newStream(storeCodec);
const queryOpts = Object.assign(opts, { cursor });
const historyRpcQuery = HistoryRPC.createQuery(queryOpts);
dbg("Querying store peer", connection.remoteAddr.toString());
dbg("Querying store peer", connections[0].remoteAddr.toString());
const res = await pipe(
[historyRpcQuery.encode()],
lp.encode(),
stream,
lp.decode(),
concat
async (source) => await all(source)
);
const reply = historyRpcQuery.decode(res.slice());
const bytes = concat(res);
const reply = historyRpcQuery.decode(bytes);
if (!reply.response) {
throw "History response misses response field";
@ -301,7 +301,7 @@ export class WakuStore {
* Returns known peers from the address book (`libp2p.peerStore`) that support
* store protocol. Waku may or may not be currently connected to these peers.
*/
get peers(): AsyncIterable<Peer> {
async peers(): Promise<Peer[]> {
const codecs = [];
for (const codec of Object.values(StoreCodecs)) {
codecs.push(codec);
@ -315,7 +315,7 @@ export class WakuStore {
* book (`libp2p.peerStore`). Waku may or may not be currently connected to
* this peer.
*/
get randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(this.peers);
async randomPeer(): Promise<Peer | undefined> {
return selectRandomPeer(await this.peers());
}
}

View File

@ -24,3 +24,13 @@ export const NOISE_KEY_2 = new Uint8Array(
return b;
})()
);
export const NOISE_KEY_3 = new Uint8Array(
((): number[] => {
const b = [];
for (let i = 0; i < 32; i++) {
b.push(3);
}
return b;
})()
);

View File

@ -5,10 +5,11 @@
import { ChildProcess, spawn } from "child_process";
import type { PeerId } from "@libp2p/interface-peer-id";
import { peerIdFromString } from "@libp2p/peer-id";
import { Multiaddr, multiaddr } from "@multiformats/multiaddr";
import appRoot from "app-root-path";
import debug from "debug";
import { Multiaddr, multiaddr } from "multiaddr";
import PeerId from "peer-id";
import portfinder from "portfinder";
import { DefaultPubSubTopic } from "../lib/constants";
@ -216,7 +217,7 @@ export class Nwaku {
]);
}
async messages(): Promise<WakuMessage[]> {
async messages(pubsubTopic?: string): Promise<WakuMessage[]> {
this.checkProcess();
const isDefined = (msg: WakuMessage | undefined): msg is WakuMessage => {
@ -225,7 +226,7 @@ export class Nwaku {
const protoMsgs = await this.rpcCall<proto.WakuMessage[]>(
"get_waku_v2_relay_v1_messages",
[DefaultPubSubTopic]
[pubsubTopic ?? DefaultPubSubTopic]
);
const msgs = await Promise.all(
@ -343,7 +344,7 @@ export class Nwaku {
if (!this.multiaddrWithId) throw "Nwaku did not return a ws multiaddr";
const peerIdStr = this.multiaddrWithId.getPeerId();
if (!peerIdStr) throw "Nwaku multiaddr does not contain peerId";
this.peerId = PeerId.createFromB58String(peerIdStr);
this.peerId = peerIdFromString(peerIdStr);
return { peerId: this.peerId, multiaddrWithId: this.multiaddrWithId };
}

View File

@ -2,7 +2,7 @@
"compilerOptions": {
"incremental": true,
"target": "es2020",
"outDir": "dist/esm",
"outDir": "dist/",
"rootDir": "src",
"moduleResolution": "node",
"module": "es2020",

View File

@ -1,50 +0,0 @@
const webpack = require("webpack");
const path = require("path");
module.exports = {
mode: "development",
entry: {
"js-waku": "./src/index.ts",
},
devtool: "inline-source-map",
module: {
rules: [
{
test: /\.(js|tsx?)$/,
use: "ts-loader",
exclude: /(node_modules)|(node\.spec\.ts)/,
},
{
test: /node\.spec\.ts$/,
use: "ignore-loader",
},
],
},
plugins: [
new webpack.ProvidePlugin({
process: "process/browser.js",
Buffer: ["buffer", "Buffer"],
}),
],
resolve: {
extensions: [".ts", ".js"],
fallback: {
buffer: require.resolve("buffer/"),
crypto: false,
stream: require.resolve("stream-browserify"),
},
},
output: {
filename: "[name].js",
path: path.resolve(__dirname, "build/umd"),
library: "jswaku",
libraryTarget: "umd",
globalObject: "this",
},
optimization: {
splitChunks: {
name: "vendors",
chunks: "all",
},
},
};

View File

@ -1,40 +0,0 @@
const webpack = require("webpack");
const path = require("path");
module.exports = {
mode: "production",
entry: {
"js-waku": "./src/index.ts",
},
devtool: "inline-source-map",
module: {
rules: [
{
test: /\.ts$/,
use: "ts-loader",
exclude: /node_modules/,
},
],
},
plugins: [
new webpack.ProvidePlugin({
process: "process/browser.js",
Buffer: ["buffer", "Buffer"],
}),
],
resolve: {
extensions: [".ts", ".js"],
fallback: {
buffer: require.resolve("buffer/"),
crypto: false,
stream: require.resolve("stream-browserify"),
},
},
output: {
filename: "index.js",
path: path.resolve(__dirname, "dist/umd"),
library: "waku",
libraryTarget: "umd",
globalObject: "this",
},
};