feat!: implement peer exchange (#1027)

* wip -- yet to test

* update: draft

* wip

* support passing flags manually to nwaku node

* refactor peer-exchange test

* switch response from uint8array to ENR

* rm: unnecesary logs

* implement clas

* fix: for loop

* init-wip: directories

* setup: new package & fix circular deps

* bind a response handler

* wip: refactor & update test

* test logs

* wip code - debugging

* address: comments

* Update packages/core/src/lib/waku_peer_exchange/peer_discovery.ts

Co-authored-by: fryorcraken.eth <110212804+fryorcraken@users.noreply.github.com>

* Update packages/core/src/lib/waku_peer_exchange/peer_discovery.ts

Co-authored-by: fryorcraken.eth <110212804+fryorcraken@users.noreply.github.com>

* address: comments

* address: comments

* address: comments

* address: comments

* address: comments

* fix: test build

* refactor

* fix: build

* comply with API

* numPeers: use number instead of bigint

* fix: build

* Update packages/peer-exchange/package.json

Co-authored-by: fryorcraken.eth <110212804+fryorcraken@users.noreply.github.com>

* Update packages/peer-exchange/src/waku_peer_exchange.ts

Co-authored-by: fryorcraken.eth <110212804+fryorcraken@users.noreply.github.com>

* Update packages/peer-exchange/src/waku_peer_exchange.ts

Co-authored-by: fryorcraken.eth <110212804+fryorcraken@users.noreply.github.com>

* Update packages/peer-exchange/src/waku_peer_exchange.ts

Co-authored-by: fryorcraken.eth <110212804+fryorcraken@users.noreply.github.com>

* address: comments, add eslint config

* Update packages/peer-exchange/.eslintrc.cjs

Co-authored-by: fryorcraken.eth <110212804+fryorcraken@users.noreply.github.com>

* Update packages/peer-exchange/src/index.ts

Co-authored-by: fryorcraken.eth <110212804+fryorcraken@users.noreply.github.com>

* address comments

* test works with test fleet

* rm: only for px test => run all tests

* fix: tests

* reorder packages for build, and fix imports

* remove: px test doesnt work with local nodes

* chore: move proto into a separate package

* fix: proto dir

* fix: build

* fix: ci

* add: index for proto

* fix: ci

* Update packages/proto/package.json

Co-authored-by: fryorcraken.eth <110212804+fryorcraken@users.noreply.github.com>

* address comments

* chore: run failing test with higher timeout

* chore: run failing test with higher timeout

* fix: ci

Co-authored-by: fryorcraken.eth <110212804+fryorcraken@users.noreply.github.com>
This commit is contained in:
Danish Arora 2022-12-07 11:35:30 +05:30 committed by GitHub
parent 56009d3b26
commit 0ca7fd790c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
30 changed files with 19335 additions and 2082 deletions

View File

@ -4,11 +4,12 @@
"language": "en",
"words": [
"abortable",
"Alives",
"asym",
"backoff",
"backoffs",
"bitjson",
"bitauth",
"bitjson",
"bufbuild",
"chainsafe",
"cimg",
@ -18,6 +19,7 @@
"commitlint",
"dependabot",
"dingpu",
"discv",
"Dlazy",
"dnsaddr",
"Dout",
@ -54,13 +56,13 @@
"livechat",
"Merkle",
"mkdir",
"mplex",
"multiaddr",
"multiaddresses",
"multiaddrs",
"multicodec",
"multicodecs",
"multiformats",
"mplex",
"multihashes",
"muxed",
"muxer",
@ -102,13 +104,12 @@
"varint",
"waku",
"wakuconnect",
"wakuv",
"wakunode",
"wakuv",
"webfonts",
"websockets",
"wifi",
"xsalsa20",
"Alives"
"xsalsa20"
],
"flagWords": [],
"ignorePaths": [

20396
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@ -8,6 +8,7 @@
"packages/enr",
"packages/interfaces",
"packages/libp2p-utils",
"packages/peer-exchange",
"packages/core",
"packages/dns-discovery",
"packages/message-encryption",

View File

@ -1,6 +1,7 @@
import { PeerProtocolsChangeData } from "@libp2p/interface-peer-store";
import type { PointToPointProtocol, Relay, Waku } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { PeerExchangeCodec } from "@waku/peer-exchange";
import debug from "debug";
import { pEvent } from "p-event";
@ -64,6 +65,14 @@ export async function waitForRemotePeer(
promises.push(waitForConnectedPeer(waku.filter, [FilterCodec]));
}
if (protocols.includes(Protocols.PeerExchange)) {
if (!waku.peerExchange)
throw new Error(
"Cannot wait for Peer Exchange peer: protocol not mounted"
);
promises.push(waitForConnectedPeer(waku.peerExchange, [PeerExchangeCodec]));
}
if (timeoutMs) {
await rejectOnTimeout(
Promise.all(promises),
@ -146,6 +155,9 @@ function getEnabledProtocols(waku: Waku): Protocols[] {
if (waku.lightPush) {
protocols.push(Protocols.LightPush);
}
if (waku.peerExchange) {
protocols.push(Protocols.PeerExchange);
}
return protocols;
}

View File

@ -2,8 +2,17 @@ import type { Stream } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { PubSub } from "@libp2p/interface-pubsub";
import type { Multiaddr } from "@multiformats/multiaddr";
import type { Filter, LightPush, Relay, Store, Waku } from "@waku/interfaces";
import type {
Filter,
LightPush,
PeerExchange,
PeerExchangeComponents,
Relay,
Store,
Waku,
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { PeerExchangeCodec } from "@waku/peer-exchange";
import debug from "debug";
import type { Libp2p } from "libp2p";
@ -51,6 +60,7 @@ export class WakuNode implements Waku {
public store?: Store;
public filter?: Filter;
public lightPush?: LightPush;
public peerExchange?: PeerExchange;
private pingKeepAliveTimers: {
[peer: string]: ReturnType<typeof setInterval>;
@ -64,7 +74,8 @@ export class WakuNode implements Waku {
libp2p: Libp2p,
store?: (components: StoreComponents) => Store,
lightPush?: (components: LightPushComponents) => LightPush,
filter?: (components: FilterComponents) => Filter
filter?: (components: FilterComponents) => Filter,
peerExchange?: (components: PeerExchangeComponents) => PeerExchange
) {
this.libp2p = libp2p;
@ -81,6 +92,10 @@ export class WakuNode implements Waku {
this.lightPush = lightPush(components);
}
if (peerExchange) {
this.peerExchange = peerExchange(components);
}
if (isRelay(libp2p.pubsub)) {
this.relay = libp2p.pubsub;
}
@ -89,7 +104,8 @@ export class WakuNode implements Waku {
"Waku node created",
this.libp2p.peerId.toString(),
`relay: ${!!this.relay}, store: ${!!this.store}, light push: ${!!this
.lightPush}, filter: ${!!this.filter}`
.lightPush}, filter: ${!!this.filter}, peer exchange: ${!!this
.peerExchange} `
);
this.pingKeepAliveTimers = {};
@ -147,6 +163,7 @@ export class WakuNode implements Waku {
this.store && _protocols.push(Protocols.Store);
this.filter && _protocols.push(Protocols.Filter);
this.lightPush && _protocols.push(Protocols.LightPush);
this.peerExchange && _protocols.push(Protocols.PeerExchange);
}
const codecs: string[] = [];
@ -163,6 +180,12 @@ export class WakuNode implements Waku {
codecs.push(FilterCodec);
}
if (_protocols.includes(Protocols.PeerExchange)) {
codecs.push(PeerExchangeCodec);
}
log(`Dialing to ${peer.toString()} with protocols ${_protocols}`);
return this.libp2p.dialProtocol(peer, codecs);
}

View File

@ -67,8 +67,6 @@
"@libp2p/interface-transport": "^2.0.1",
"@libp2p/mplex": "^7.0.0",
"@libp2p/websockets": "^5.0.0",
"@waku/core": "*",
"@waku/interfaces": "*",
"interface-datastore": "^7.0.1"
},
"devDependencies": {

View File

@ -16,6 +16,7 @@ import {
import { DefaultUserAgent } from "@waku/core";
import { getPredefinedBootstrapNodes } from "@waku/core/lib/predefined_bootstrap_nodes";
import type { Relay, WakuFull, WakuLight, WakuPrivacy } from "@waku/interfaces";
import { wakuPeerExchange } from "@waku/peer-exchange";
import type { Libp2p } from "libp2p";
import { createLibp2p, Libp2pOptions } from "libp2p";
@ -85,13 +86,15 @@ export async function createLightNode(
const store = wakuStore(options);
const lightPush = wakuLightPush(options);
const filter = wakuFilter(options);
const peerExchange = wakuPeerExchange(options);
return new WakuNode(
options ?? {},
libp2p,
store,
lightPush,
filter
filter,
peerExchange
) as WakuLight;
}
@ -150,13 +153,15 @@ export async function createFullNode(
const store = wakuStore(options);
const lightPush = wakuLightPush(options);
const filter = wakuFilter(options);
const peerExchange = wakuPeerExchange(options);
return new WakuNode(
options ?? {},
libp2p,
store,
lightPush,
filter
filter,
peerExchange
) as WakuFull;
}

View File

@ -1,9 +1,12 @@
import type { GossipSub } from "@chainsafe/libp2p-gossipsub";
import type { Stream } from "@libp2p/interface-connection";
import type { ConnectionManager } from "@libp2p/interface-connection-manager";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer } from "@libp2p/interface-peer-store";
import type { PeerStore } from "@libp2p/interface-peer-store";
import type { Registrar } from "@libp2p/interface-registrar";
import type { Multiaddr } from "@multiformats/multiaddr";
import { ENR } from "@waku/enr";
import type { Libp2p } from "libp2p";
export enum Protocols {
@ -11,6 +14,7 @@ export enum Protocols {
Store = "store",
LightPush = "lightpush",
Filter = "filter",
PeerExchange = "peer-exchange",
}
export interface PointToPointProtocol {
@ -50,6 +54,25 @@ export interface LightPush extends PointToPointProtocol {
) => Promise<SendResult>;
}
export interface PeerExchange extends PointToPointProtocol {
query(
params: PeerExchangeQueryParams,
callback: (response: PeerExchangeResponse) => Promise<void> | void
): Promise<void>;
}
export interface PeerExchangeQueryParams {
numPeers: number;
}
export interface PeerExchangeResponse {
peerInfos: PeerInfo[];
}
export interface PeerInfo {
ENR?: ENR;
}
export enum PageDirection {
BACKWARD = "backward",
FORWARD = "forward",
@ -60,6 +83,11 @@ export interface TimeFilter {
endTime: Date;
}
export interface PeerExchangeComponents {
connectionManager: ConnectionManager;
peerStore: PeerStore;
registrar: Registrar;
}
export type Cursor = {
digest?: Uint8Array;
senderTime?: bigint;
@ -126,6 +154,7 @@ export interface Waku {
store?: Store;
filter?: Filter;
lightPush?: LightPush;
peerExchange?: PeerExchange;
dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise<Stream>;
@ -141,6 +170,7 @@ export interface WakuLight extends Waku {
store: Store;
filter: Filter;
lightPush: LightPush;
peerExchange: PeerExchange;
}
export interface WakuPrivacy extends Waku {
@ -148,6 +178,7 @@ export interface WakuPrivacy extends Waku {
store: undefined;
filter: undefined;
lightPush: undefined;
peerExchange: undefined;
}
export interface WakuFull extends Waku {
@ -155,6 +186,7 @@ export interface WakuFull extends Waku {
store: Store;
filter: Filter;
lightPush: LightPush;
peerExchange: PeerExchange;
}
export interface RateLimitProof {

View File

View File

@ -0,0 +1,6 @@
module.exports = {
parserOptions: {
tsconfigRootDir: __dirname,
project: "./tsconfig.dev.json",
},
};

View File

@ -0,0 +1,11 @@
{
"extension": ["ts"],
"spec": "src/**/*.spec.ts",
"require": ["ts-node/register", "isomorphic-fetch", "jsdom-global/register"],
"loader": "ts-node/esm",
"node-option": [
"experimental-specifier-resolution=node",
"loader=ts-node/esm"
],
"exit": true
}

View File

@ -0,0 +1,4 @@
build
bundle
dist
node_modules

View File

@ -0,0 +1,45 @@
process.env.CHROME_BIN = require("puppeteer").executablePath();
const webpack = require("webpack");
module.exports = function (config) {
config.set({
frameworks: ["webpack", "mocha"],
files: ["src/**/*.ts"],
preprocessors: {
"src/**/*.ts": ["webpack"],
},
envPreprocessor: ["CI"],
reporters: ["progress"],
browsers: ["ChromeHeadless"],
singleRun: true,
client: {
mocha: {
timeout: 6000, // Default is 2s
},
},
webpack: {
mode: "development",
module: {
rules: [{ test: /\.([cm]?ts|tsx)$/, loader: "ts-loader" }],
},
plugins: [
new webpack.DefinePlugin({
"process.env.CI": process.env.CI || false,
}),
new webpack.ProvidePlugin({
process: "process/browser.js",
}),
],
resolve: {
extensions: [".ts", ".tsx", ".js"],
extensionAlias: {
".js": [".js", ".ts"],
".cjs": [".cjs", ".cts"],
".mjs": [".mjs", ".mts"],
},
},
stats: { warnings: false },
devtool: "inline-source-map",
},
});
};

View File

@ -0,0 +1,103 @@
{
"name": "@waku/peer-exchange",
"version": "0.0.1",
"description": "Peer Exchange (https://rfc.vac.dev/spec/34/) protocol for Waku",
"types": "./dist/index.d.ts",
"module": "./dist/index.js",
"exports": {
".": {
"types": "./dist/index.d.ts",
"import": "./dist/index.js"
}
},
"type": "module",
"author": "Waku Team",
"homepage": "https://github.com/waku-org/js-waku/tree/master/packages/peer-exchange#readme",
"repository": {
"type": "git",
"url": "https://github.com/waku-org/js-waku.git"
},
"bugs": {
"url": "https://github.com/waku-org/js-waku/issues"
},
"license": "MIT OR Apache-2.0",
"keywords": [
"waku",
"decentralized",
"secure",
"communication",
"web3",
"ethereum",
"dapps",
"privacy"
],
"scripts": {
"build": "run-s build:**",
"build:esm": "tsc",
"build:bundle": "rollup --config rollup.config.js",
"fix": "run-s fix:*",
"fix:prettier": "prettier . --write",
"fix:lint": "eslint src --ext .ts --ext .cjs --fix",
"check": "run-s check:*",
"check:lint": "eslint src --ext .ts",
"check:prettier": "prettier . --list-different",
"check:spelling": "cspell \"{README.md,src/**/*.ts}\"",
"check:tsc": "tsc -p tsconfig.dev.json",
"prepublish": "npm run build",
"reset-hard": "git clean -dfx -e .idea && git reset --hard && npm i && npm run build"
},
"engines": {
"node": ">=16"
},
"dependencies": {
"@libp2p/interface-connection": "^3.0.3",
"@libp2p/interface-peer-discovery": "^1.0.0",
"@libp2p/interface-peer-discovery-compliance-tests": "^2.0.1",
"@libp2p/interface-peer-id": "^1.0.6",
"@libp2p/interface-peer-info": "^1.0.4",
"@libp2p/interface-peer-store": "^1.2.3",
"@waku/core": "*",
"@waku/enr": "*",
"@waku/interfaces": "*",
"debug": "^4.3.4",
"it-all": "^1.0.6",
"it-length-prefixed": "^8.0.2",
"it-pipe": "^2.0.4",
"uint8arraylist": "^2.3.2"
},
"devDependencies": {
"@libp2p/peer-id-factory": "^1.0.15",
"@rollup/plugin-commonjs": "^22.0.0",
"@rollup/plugin-json": "^4.1.0",
"@rollup/plugin-node-resolve": "^13.3.0",
"@typescript-eslint/eslint-plugin": "^5.8.1",
"@typescript-eslint/parser": "^5.8.1",
"chai": "^4.3.6",
"cspell": "^5.14.0",
"eslint": "^8.6.0",
"eslint-config-prettier": "^8.3.0",
"eslint-plugin-eslint-comments": "^3.2.0",
"eslint-plugin-functional": "^4.0.2",
"eslint-plugin-import": "^2.25.3",
"eslint-plugin-prettier": "^4.0.0",
"npm-run-all": "^4.1.5",
"prettier": "^2.1.1",
"rollup": "^2.75.0",
"ts-loader": "^9.4.1",
"typescript": "^4.6.3",
"uint8arrays": "^4.0.2"
},
"typedoc": {
"entryPoint": "./src/index.ts"
},
"files": [
"dist",
"bundle",
"src/**/*.ts",
"!**/*.spec.*",
"!**/*.json",
"CHANGELOG.md",
"LICENSE",
"README.md"
]
}

View File

@ -0,0 +1,21 @@
import commonjs from "@rollup/plugin-commonjs";
import json from "@rollup/plugin-json";
import { nodeResolve } from "@rollup/plugin-node-resolve";
export default {
input: {
index: "dist/index.js",
},
output: {
dir: "bundle",
format: "esm",
},
plugins: [
commonjs(),
json(),
nodeResolve({
browser: true,
preferBuiltins: false,
}),
],
};

View File

@ -0,0 +1 @@
export { wakuPeerExchange, PeerExchangeCodec } from "./waku_peer_exchange.js";

View File

@ -0,0 +1,44 @@
import { proto_peer_exchange as proto } from "@waku/proto";
import { Uint8ArrayList } from "uint8arraylist";
/**
* PeerExchangeRPC represents a message conforming to the Waku Peer Exchange protocol
*/
export class PeerExchangeRPC {
public constructor(public proto: proto.PeerExchangeRPC) {}
static createRequest(params: proto.PeerExchangeQuery): PeerExchangeRPC {
const { numPeers } = params;
return new PeerExchangeRPC({
query: {
numPeers: numPeers,
},
response: undefined,
});
}
/**
* Encode the current PeerExchangeRPC request to bytes
* @returns Uint8Array
*/
encode(): Uint8Array {
return proto.PeerExchangeRPC.encode(this.proto);
}
/**
* Decode the current PeerExchangeRPC request to bytes
* @returns Uint8Array
*/
static decode(bytes: Uint8ArrayList): PeerExchangeRPC {
const res = proto.PeerExchangeRPC.decode(bytes);
return new PeerExchangeRPC(res);
}
get query(): proto.PeerExchangeQuery | undefined {
return this.proto.query;
}
get response(): proto.PeerExchangeResponse | undefined {
return this.proto.response;
}
}

View File

@ -0,0 +1,135 @@
import type { Stream } from "@libp2p/interface-connection";
import type { PeerId } from "@libp2p/interface-peer-id";
import type { Peer, PeerStore } from "@libp2p/interface-peer-store";
import type { IncomingStreamData } from "@libp2p/interface-registrar";
import { ENR } from "@waku/enr";
import type {
PeerExchange,
PeerExchangeComponents,
PeerExchangeQueryParams,
PeerExchangeResponse,
ProtocolOptions,
} from "@waku/interfaces";
import {
getPeersForProtocol,
selectConnection,
selectPeerForProtocol,
} from "@waku/libp2p-utils";
import debug from "debug";
import all from "it-all";
import * as lp from "it-length-prefixed";
import { pipe } from "it-pipe";
import { PeerExchangeRPC } from "./rpc.js";
export const PeerExchangeCodec = "/vac/waku/peer-exchange/2.0.0-alpha1";
const log = debug("waku:peer-exchange");
export class WakuPeerExchange implements PeerExchange {
private callback:
| ((response: PeerExchangeResponse) => Promise<void>)
| undefined;
constructor(
public components: PeerExchangeComponents,
public createOptions?: ProtocolOptions
) {
this.components.registrar
.handle(PeerExchangeCodec, this.handler.bind(this))
.catch((e) => log("Failed to register peer exchange protocol", e));
}
async query(
params: PeerExchangeQueryParams,
callback: (response: PeerExchangeResponse) => Promise<void>
): Promise<void> {
this.callback = callback;
const { numPeers } = params;
const rpcQuery = PeerExchangeRPC.createRequest({
numPeers: BigInt(numPeers),
});
const peer = await this.getPeer();
const stream = await this.newStream(peer);
await pipe(
[rpcQuery.encode()],
lp.encode(),
stream,
lp.decode(),
async (source) => await all(source)
);
}
private handler(streamData: IncomingStreamData): void {
const { stream } = streamData;
pipe(stream, lp.decode(), async (source) => {
for await (const bytes of source) {
const decoded = PeerExchangeRPC.decode(bytes).response;
if (!decoded) {
throw new Error("Failed to decode response");
}
const enrs = await Promise.all(
decoded.peerInfos.map(
(peerInfo) => peerInfo.enr && ENR.decode(peerInfo.enr)
)
);
const peerInfos = enrs.map((enr) => {
return {
ENR: enr,
};
});
if (!this.callback) throw new Error("Callback not set");
await this.callback({ peerInfos });
}
}).catch((err) => log("Failed to handle peer exchange request", err));
}
private async getPeer(peerId?: PeerId): Promise<Peer> {
const res = await selectPeerForProtocol(
this.components.peerStore,
[PeerExchangeCodec],
peerId
);
if (!res) {
throw new Error(`Failed to select peer for ${PeerExchangeCodec}`);
}
return res.peer;
}
private async newStream(peer: Peer): Promise<Stream> {
const connections = this.components.connectionManager.getConnections(
peer.id
);
const connection = selectConnection(connections);
if (!connection) {
throw new Error("Failed to get a connection to the peer");
}
return connection.newStream(PeerExchangeCodec);
}
async peers(): Promise<Peer[]> {
return getPeersForProtocol(this.components.peerStore, [PeerExchangeCodec]);
}
get peerStore(): PeerStore {
return this.components.peerStore;
}
}
export function wakuPeerExchange(
init: Partial<ProtocolOptions> = {}
): (components: PeerExchangeComponents) => WakuPeerExchange {
return (components: PeerExchangeComponents) =>
new WakuPeerExchange(components, init);
}

View File

@ -0,0 +1,112 @@
import {
PeerDiscovery,
PeerDiscoveryEvents,
symbol,
} from "@libp2p/interface-peer-discovery";
import { PeerInfo } from "@libp2p/interface-peer-info";
import { PeerProtocolsChangeData } from "@libp2p/interface-peer-store";
import { EventEmitter } from "@libp2p/interfaces/events";
import { PeerExchangeComponents } from "@waku/interfaces";
import debug from "debug";
import { PeerExchangeCodec } from "./waku_peer_exchange";
const log = debug("waku:peer-exchange-discovery");
interface Options {
/**
* Tag a bootstrap peer with this name before "discovering" it (default: 'bootstrap')
*/
tagName?: string;
/**
* The bootstrap peer tag will have this value (default: 50)
*/
tagValue?: number;
/**
* Cause the bootstrap peer tag to be removed after this number of ms (default: 2 minutes)
*/
tagTTL?: number;
}
const DEFAULT_BOOTSTRAP_TAG_NAME = "peer-exchange";
const DEFAULT_BOOTSTRAP_TAG_VALUE = 50;
const DEFAULT_BOOTSTRAP_TAG_TTL = 120000;
export class PeerExchangeDiscovery
extends EventEmitter<PeerDiscoveryEvents>
implements PeerDiscovery
{
private readonly components: PeerExchangeComponents;
private readonly options: Options;
private isStarted: boolean;
private readonly eventHandler = async (
event: CustomEvent<PeerProtocolsChangeData>
): Promise<void> => {
const { protocols } = event.detail;
if (!protocols.includes(PeerExchangeCodec)) return;
const { peerId } = event.detail;
const peer = await this.components.peerStore.get(peerId);
const peerInfo = {
id: peerId,
multiaddrs: peer.addresses.map((address) => address.multiaddr),
protocols: [],
};
await this.components.peerStore.tagPeer(
peerId,
DEFAULT_BOOTSTRAP_TAG_NAME,
{
value: this.options.tagValue ?? DEFAULT_BOOTSTRAP_TAG_VALUE,
ttl: this.options.tagTTL ?? DEFAULT_BOOTSTRAP_TAG_TTL,
}
);
this.dispatchEvent(new CustomEvent<PeerInfo>("peer", { detail: peerInfo }));
};
constructor(components: PeerExchangeComponents, options: Options = {}) {
super();
this.components = components;
this.options = options;
this.isStarted = false;
}
/**
* Start emitting events
*/
start(): void {
if (this.isStarted) {
return;
}
log("Starting peer exchange node discovery, discovering peers");
this.components.peerStore.addEventListener(
"change:protocols",
this.eventHandler
);
}
/**
* Remove event listener
*/
stop(): void {
if (!this.isStarted) return;
log("Stopping peer exchange node discovery");
this.isStarted = false;
this.components.peerStore.removeEventListener(
"change:protocols",
this.eventHandler
);
}
get [symbol](): true {
return true;
}
get [Symbol.toStringTag](): string {
return "@waku/peer-exchange";
}
}

View File

@ -0,0 +1,8 @@
{
"extends": "./tsconfig",
"compilerOptions": {
"module": "esnext",
"noEmit": true
},
"exclude": []
}

View File

@ -0,0 +1,54 @@
{
"compilerOptions": {
"incremental": true,
"target": "es2020",
"outDir": "dist",
"rootDir": "src",
"moduleResolution": "node",
"module": "es2020",
"declaration": true,
"sourceMap": true,
"esModuleInterop": true /* Enables emit interoperability between CommonJS and ES Modules via creation of namespace objects for all imports. Implies 'allowSyntheticDefaultImports'. */,
"resolveJsonModule": true /* Include modules imported with .json extension. */,
"tsBuildInfoFile": "dist/.tsbuildinfo",
"strict": true /* Enable all strict type-checking options. */,
/* Strict Type-Checking Options */
"noImplicitAny": true /* Raise error on expressions and declarations with an implied 'any' type. */,
"strictNullChecks": true /* Enable strict null checks. */,
"strictFunctionTypes": true /* Enable strict checking of function types. */,
"strictPropertyInitialization": true /* Enable strict checking of property initialization in classes. */,
"noImplicitThis": true /* Raise error on 'this' expressions with an implied 'any' type. */,
"alwaysStrict": true /* Parse in strict mode and emit "use strict" for each source file. */,
/* Additional Checks */
"noUnusedLocals": true /* Report errors on unused locals. */,
"noUnusedParameters": true /* Report errors on unused parameters. */,
"noImplicitReturns": true /* Report error when not all code paths in function return a value. */,
"noFallthroughCasesInSwitch": true /* Report errors for fallthrough cases in switch statement. */,
"forceConsistentCasingInFileNames": true,
/* Debugging Options */
"traceResolution": false /* Report module resolution log messages. */,
"listEmittedFiles": false /* Print names of generated files part of the compilation. */,
"listFiles": false /* Print names of files part of the compilation. */,
"pretty": true /* Stylize errors and messages using color and context. */,
// Due to broken types in indirect dependencies
"skipLibCheck": true,
/* Experimental Options */
// "experimentalDecorators": true /* Enables experimental support for ES7 decorators. */,
// "emitDecoratorMetadata": true /* Enables experimental support for emitting type metadata for decorators. */,
"lib": ["es2020", "dom"],
"types": ["node", "mocha"],
"typeRoots": ["node_modules/@types", "src/types"]
},
"include": ["src", ".eslintrc.js"],
"exclude": ["src/**/*.spec.ts", "src/test_utils", "dist", "bundle"],
"compileOnSave": false,
"ts-node": {
"files": true
}
}

View File

@ -9,3 +9,5 @@ export * as proto_lightpush from "./lib/light_push.js";
export { PushResponse } from "./lib/light_push.js";
export * as proto_store from "./lib/store.js";
export * as proto_peer_exchange from "./lib/peer_exchange.js";

View File

@ -0,0 +1,18 @@
syntax = "proto3";
message PeerInfo {
optional bytes enr = 1;
}
message PeerExchangeQuery {
optional uint64 numPeers = 1; // number of peers requested
}
message PeerExchangeResponse {
repeated PeerInfo peerInfos = 1;
}
message PeerExchangeRPC {
optional PeerExchangeQuery query = 1;
optional PeerExchangeResponse response = 2;
}

View File

@ -0,0 +1,271 @@
/* eslint-disable import/export */
/* eslint-disable @typescript-eslint/no-namespace */
import { encodeMessage, decodeMessage, message } from "protons-runtime";
import type { Uint8ArrayList } from "uint8arraylist";
import type { Codec } from "protons-runtime";
export interface PeerInfo {
enr?: Uint8Array;
}
export namespace PeerInfo {
let _codec: Codec<PeerInfo>;
export const codec = (): Codec<PeerInfo> => {
if (_codec == null) {
_codec = message<PeerInfo>(
(obj, writer, opts = {}) => {
if (opts.lengthDelimited !== false) {
writer.fork();
}
if (obj.enr != null) {
writer.uint32(10);
writer.bytes(obj.enr);
}
if (opts.lengthDelimited !== false) {
writer.ldelim();
}
},
(reader, length) => {
const obj: any = {};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
obj.enr = reader.bytes();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: PeerInfo): Uint8Array => {
return encodeMessage(obj, PeerInfo.codec());
};
export const decode = (buf: Uint8Array | Uint8ArrayList): PeerInfo => {
return decodeMessage(buf, PeerInfo.codec());
};
}
export interface PeerExchangeQuery {
numPeers?: bigint;
}
export namespace PeerExchangeQuery {
let _codec: Codec<PeerExchangeQuery>;
export const codec = (): Codec<PeerExchangeQuery> => {
if (_codec == null) {
_codec = message<PeerExchangeQuery>(
(obj, writer, opts = {}) => {
if (opts.lengthDelimited !== false) {
writer.fork();
}
if (obj.numPeers != null) {
writer.uint32(8);
writer.uint64(obj.numPeers);
}
if (opts.lengthDelimited !== false) {
writer.ldelim();
}
},
(reader, length) => {
const obj: any = {};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
obj.numPeers = reader.uint64();
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: PeerExchangeQuery): Uint8Array => {
return encodeMessage(obj, PeerExchangeQuery.codec());
};
export const decode = (
buf: Uint8Array | Uint8ArrayList
): PeerExchangeQuery => {
return decodeMessage(buf, PeerExchangeQuery.codec());
};
}
export interface PeerExchangeResponse {
peerInfos: PeerInfo[];
}
export namespace PeerExchangeResponse {
let _codec: Codec<PeerExchangeResponse>;
export const codec = (): Codec<PeerExchangeResponse> => {
if (_codec == null) {
_codec = message<PeerExchangeResponse>(
(obj, writer, opts = {}) => {
if (opts.lengthDelimited !== false) {
writer.fork();
}
if (obj.peerInfos != null) {
for (const value of obj.peerInfos) {
writer.uint32(10);
PeerInfo.codec().encode(value, writer);
}
} else {
throw new Error(
'Protocol error: required field "peerInfos" was not found in object'
);
}
if (opts.lengthDelimited !== false) {
writer.ldelim();
}
},
(reader, length) => {
const obj: any = {
peerInfos: [],
};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
obj.peerInfos.push(
PeerInfo.codec().decode(reader, reader.uint32())
);
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: PeerExchangeResponse): Uint8Array => {
return encodeMessage(obj, PeerExchangeResponse.codec());
};
export const decode = (
buf: Uint8Array | Uint8ArrayList
): PeerExchangeResponse => {
return decodeMessage(buf, PeerExchangeResponse.codec());
};
}
export interface PeerExchangeRPC {
query?: PeerExchangeQuery;
response?: PeerExchangeResponse;
}
export namespace PeerExchangeRPC {
let _codec: Codec<PeerExchangeRPC>;
export const codec = (): Codec<PeerExchangeRPC> => {
if (_codec == null) {
_codec = message<PeerExchangeRPC>(
(obj, writer, opts = {}) => {
if (opts.lengthDelimited !== false) {
writer.fork();
}
if (obj.query != null) {
writer.uint32(10);
PeerExchangeQuery.codec().encode(obj.query, writer);
}
if (obj.response != null) {
writer.uint32(18);
PeerExchangeResponse.codec().encode(obj.response, writer);
}
if (opts.lengthDelimited !== false) {
writer.ldelim();
}
},
(reader, length) => {
const obj: any = {};
const end = length == null ? reader.len : reader.pos + length;
while (reader.pos < end) {
const tag = reader.uint32();
switch (tag >>> 3) {
case 1:
obj.query = PeerExchangeQuery.codec().decode(
reader,
reader.uint32()
);
break;
case 2:
obj.response = PeerExchangeResponse.codec().decode(
reader,
reader.uint32()
);
break;
default:
reader.skipType(tag & 7);
break;
}
}
return obj;
}
);
}
return _codec;
};
export const encode = (obj: PeerExchangeRPC): Uint8Array => {
return encodeMessage(obj, PeerExchangeRPC.codec());
};
export const decode = (buf: Uint8Array | Uint8ArrayList): PeerExchangeRPC => {
return decodeMessage(buf, PeerExchangeRPC.codec());
};
}

View File

@ -47,6 +47,8 @@ export interface Args {
lightpush?: boolean;
filter?: boolean;
store?: boolean;
peerExchange?: boolean;
discv5Discovery?: boolean;
storeMessageDbUrl?: string;
topics?: string;
rpcPrivate?: boolean;
@ -54,6 +56,7 @@ export interface Args {
tcpPort?: number;
rpcPort?: number;
websocketPort?: number;
discv5BootstrapNode?: string;
}
export enum LogLevel {
@ -167,10 +170,12 @@ export class Nwaku {
process.env.WAKUNODE2_STORE_MESSAGE_DB_URL = "";
const argsArray = argsToArray(mergedArgs);
if (WAKU_SERVICE_NODE_PARAMS) {
argsArray.push(WAKU_SERVICE_NODE_PARAMS);
}
log(`nwaku args: ${argsArray.join(" ")}`);
this.process = spawn(WAKU_SERVICE_NODE_BIN, argsArray, {
cwd: WAKU_SERVICE_NODE_DIR,
stdio: [

View File

@ -46,7 +46,7 @@ describe("Waku Message Ephemeral field", () => {
});
beforeEach(async function () {
this.timeout(15000);
this.timeout(50_000);
nwaku = new Nwaku(makeLogFileName(this));
await nwaku.start({ filter: true, lightpush: true, store: true });
waku = await createLightNode({
@ -55,11 +55,16 @@ describe("Waku Message Ephemeral field", () => {
});
await waku.start();
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku);
await waitForRemotePeer(waku, [
Protocols.Filter,
Protocols.LightPush,
Protocols.Store,
]);
});
it("Ephemeral messages are not stored", async function () {
this.timeout(15_000);
this.timeout(50_000);
const asymText =
"This message is encrypted for me using asymmetric encryption";

View File

@ -0,0 +1,60 @@
import { bootstrap } from "@libp2p/bootstrap";
import { waitForRemotePeer } from "@waku/core";
import {
Fleet,
getPredefinedBootstrapNodes,
} from "@waku/core/lib/predefined_bootstrap_nodes";
import { createLightNode } from "@waku/create";
import type { PeerExchangeResponse, WakuFull } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { expect } from "chai";
describe("Peer Exchange: Node", () => {
let waku: WakuFull;
afterEach(async function () {
!!waku && waku.stop().catch((e) => console.log("Waku failed to stop", e));
});
it("Test Fleet: Queries successfully [Live Data]", async function () {
this.timeout(150_000);
// skipping in CI as this test demonstrates Peer Exchange working with the test fleet
// but not with locally run nwaku nodes
if (process.env.ci) {
this.skip();
}
const waku = await createLightNode({
libp2p: {
peerDiscovery: [
bootstrap({ list: getPredefinedBootstrapNodes(Fleet.Test) }),
],
},
});
await waku.start();
await waitForRemotePeer(waku, [Protocols.PeerExchange]);
let receivedCallback = false;
const numPeersToRequest = 3;
const callback = (response: PeerExchangeResponse): void => {
receivedCallback = true;
expect(response.peerInfos.length).to.be.greaterThan(0);
expect(response.peerInfos.length).to.be.lessThanOrEqual(
numPeersToRequest
);
expect(response.peerInfos[0].ENR).to.not.be.null;
};
await waku.peerExchange.query(
{
numPeers: numPeersToRequest,
},
callback
);
expect(receivedCallback).to.be.true;
});
});

View File

@ -342,7 +342,7 @@ describe("Waku Relay [node only]", () => {
await waku.start();
nwaku = new Nwaku(this.test?.ctx?.currentTest?.title + "");
await nwaku.start();
await nwaku.start({ relay: true });
await waku.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku, [Protocols.Relay]);

View File

@ -218,7 +218,11 @@ describe("Wait for remote peer", function () {
});
await waku2.start();
await waku2.dial(multiAddrWithId);
await waitForRemotePeer(waku2);
await waitForRemotePeer(waku2, [
Protocols.Filter,
Protocols.Store,
Protocols.LightPush,
]);
const filterPeers = (await waku2.filter.peers()).map((peer) =>
peer.id.toString()

View File

@ -51,7 +51,11 @@ describe("Waku Dial [node only]", function () {
});
await waku.start();
await waku.dial(multiAddrWithId);
await waitForRemotePeer(waku);
await waitForRemotePeer(waku, [
Protocols.Store,
Protocols.Filter,
Protocols.LightPush,
]);
const nimPeerId = await nwaku.getPeerId();
expect(await waku.libp2p.peerStore.has(nimPeerId)).to.be.true;