431 lines
11 KiB
TypeScript
Raw Normal View History

2021-05-10 15:53:05 +10:00
/**
* @hidden
* @module
*/
2022-02-04 14:12:00 +11:00
import { ChildProcess, spawn } from "child_process";
2021-03-10 17:39:53 +11:00
2022-02-04 14:12:00 +11:00
import appRoot from "app-root-path";
import debug from "debug";
import { Multiaddr, multiaddr } from "multiaddr";
import PeerId from "peer-id";
import portfinder from "portfinder";
2021-03-10 17:39:53 +11:00
2022-02-14 10:50:02 +11:00
import { hexToBytes } from "../lib/utils";
2022-02-04 14:12:00 +11:00
import { DefaultPubSubTopic } from "../lib/waku";
import { WakuMessage } from "../lib/waku_message";
import * as proto from "../proto/waku/v2/message";
2022-02-04 14:12:00 +11:00
import { existsAsync, mkdirAsync, openAsync } from "./async_fs";
import waitForLine from "./log_file";
2021-03-10 17:39:53 +11:00
2022-04-01 12:19:51 +11:00
const dbg = debug("waku:nwaku");
2021-04-06 11:00:40 +10:00
2022-02-04 14:12:00 +11:00
const NIM_WAKU_DIR = appRoot + "/nim-waku";
const NIM_WAKU_BIN = NIM_WAKU_DIR + "/build/wakunode2";
2021-03-10 17:39:53 +11:00
2022-02-04 14:12:00 +11:00
const LOG_DIR = "./log";
2021-03-11 10:54:35 +11:00
export interface Args {
staticnode?: string;
2022-02-04 14:12:00 +11:00
nat?: "none";
2021-03-11 10:54:35 +11:00
listenAddress?: string;
relay?: boolean;
rpc?: boolean;
rpcAdmin?: boolean;
nodekey?: string;
portsShift?: number;
2021-04-09 11:23:00 +10:00
logLevel?: LogLevel;
2021-05-06 10:37:12 +10:00
persistMessages?: boolean;
2021-05-19 11:00:43 +10:00
lightpush?: boolean;
topics?: string;
rpcPrivate?: boolean;
websocketSupport?: boolean;
2022-02-01 12:54:54 +11:00
tcpPort?: number;
rpcPort?: number;
websocketPort?: number;
2021-04-09 11:23:00 +10:00
}
export enum LogLevel {
2022-02-04 14:12:00 +11:00
Error = "error",
Info = "info",
Warn = "warn",
Debug = "debug",
Trace = "trace",
Notice = "notice",
Fatal = "fatal",
2021-03-11 10:54:35 +11:00
}
export interface KeyPair {
privateKey: string;
publicKey: string;
}
export interface WakuRelayMessage {
payload: string; // Hex encoded data string without `0x` prefix.
contentTopic?: string;
timestamp?: number; // Unix epoch time in nanoseconds as a 64-bits integer value.
}
2022-04-01 12:19:51 +11:00
export class Nwaku {
2021-03-10 17:39:53 +11:00
private process?: ChildProcess;
2021-03-26 13:07:47 +11:00
private pid?: number;
private peerId?: PeerId;
private multiaddrWithId?: Multiaddr;
2022-02-01 12:54:54 +11:00
private readonly logPath: string;
private rpcPort?: number;
2021-03-10 17:39:53 +11:00
/**
* Convert a [[WakuMessage]] to a [[WakuRelayMessage]]. The latter is used
* by the nwaku JSON-RPC API.
*/
static toWakuRelayMessage(message: WakuMessage): WakuRelayMessage {
if (!message.payload) {
throw "Attempting to convert empty message";
}
let timestamp;
if (message.proto.timestamp) {
timestamp = message.proto.timestamp.toNumber();
}
return {
payload: bytesToHex(message.payload),
contentTopic: message.contentTopic,
timestamp,
};
}
2021-03-25 15:49:07 +11:00
constructor(logName: string) {
2022-04-01 12:19:51 +11:00
this.logPath = `${LOG_DIR}/nwaku_${logName}.log`;
}
async start(args?: Args): Promise<void> {
try {
await existsAsync(LOG_DIR);
} catch (e) {
try {
await mkdirAsync(LOG_DIR);
} catch (e) {
// Looks like 2 tests tried to create the director at the same time,
// it can be ignored
}
}
2022-02-04 14:12:00 +11:00
const logFile = await openAsync(this.logPath, "w");
2021-03-11 10:54:35 +11:00
const mergedArgs = defaultArgs();
2022-02-01 12:54:54 +11:00
const ports: number[] = await new Promise((resolve, reject) => {
portfinder.getPorts(3, {}, (err, ports) => {
if (err) reject(err);
resolve(ports);
});
});
this.rpcPort = ports[0];
// Object.assign overrides the properties with the source (if there are conflicts)
2021-04-09 11:23:00 +10:00
Object.assign(
mergedArgs,
2022-02-01 12:54:54 +11:00
{
tcpPort: ports[1],
rpcPort: this.rpcPort,
websocketPort: ports[2],
logLevel: LogLevel.Trace,
},
2021-04-09 11:23:00 +10:00
args
);
const argsArray = argsToArray(mergedArgs);
2022-04-01 12:19:51 +11:00
dbg(`nwaku args: ${argsArray.join(" ")}`);
this.process = spawn(NIM_WAKU_BIN, argsArray, {
cwd: NIM_WAKU_DIR,
2021-03-11 10:54:35 +11:00
stdio: [
2022-02-04 14:12:00 +11:00
"ignore", // stdin
2021-03-11 10:54:35 +11:00
logFile, // stdout
logFile, // stderr
2021-03-10 17:39:53 +11:00
],
2021-03-11 10:54:35 +11:00
});
2021-03-26 13:07:47 +11:00
this.pid = this.process.pid;
2021-04-06 11:00:40 +10:00
dbg(
2022-04-01 12:19:51 +11:00
`nwaku ${this.process.pid} started at ${new Date().toLocaleTimeString()}`
2021-03-26 13:07:47 +11:00
);
2021-03-10 17:39:53 +11:00
2022-02-04 14:12:00 +11:00
this.process.on("exit", (signal) => {
2021-04-06 11:00:40 +10:00
dbg(
2022-04-01 12:19:51 +11:00
`nwaku ${
2021-03-26 13:07:47 +11:00
this.process ? this.process.pid : this.pid
} process exited with ${signal} at ${new Date().toLocaleTimeString()}`
);
2021-03-25 16:29:35 +11:00
});
2022-02-04 14:12:00 +11:00
this.process.on("error", (err) => {
2021-03-26 13:07:47 +11:00
console.log(
2022-04-01 12:19:51 +11:00
`nwaku ${
2021-03-26 13:07:47 +11:00
this.process ? this.process.pid : this.pid
} process encountered an error: ${err} at ${new Date().toLocaleTimeString()}`
);
2021-03-25 16:29:35 +11:00
});
2022-04-01 12:19:51 +11:00
dbg("Waiting to see 'Node setup complete' in nwaku logs");
await this.waitForLog("Node setup complete", 15000);
2022-04-01 12:19:51 +11:00
dbg("nwaku node has been started");
}
public stop(): void {
const pid = this.process ? this.process.pid : this.pid;
2022-04-01 12:19:51 +11:00
dbg(`nwaku ${pid} getting SIGINT at ${new Date().toLocaleTimeString()}`);
if (!this.process) throw "nwaku process not set";
2022-02-04 14:12:00 +11:00
const res = this.process.kill("SIGINT");
2022-04-01 12:19:51 +11:00
dbg(`nwaku ${pid} interrupted:`, res);
this.process = undefined;
}
async waitForLog(msg: string, timeout: number): Promise<void> {
return waitForLine(this.logPath, msg, timeout);
2021-03-10 17:39:53 +11:00
}
2022-04-01 12:19:51 +11:00
/** Calls nwaku JSON-RPC API `get_waku_v2_admin_v1_peers` to check
2021-03-10 17:39:53 +11:00
* for known peers
2022-04-01 12:19:51 +11:00
* @throws if nwaku isn't started.
2021-03-10 17:39:53 +11:00
*/
async peers(): Promise<string[]> {
2021-03-10 17:39:53 +11:00
this.checkProcess();
2022-02-04 14:12:00 +11:00
return this.rpcCall<string[]>("get_waku_v2_admin_v1_peers", []);
2021-03-10 17:39:53 +11:00
}
async info(): Promise<RpcInfoResponse> {
2021-03-10 17:39:53 +11:00
this.checkProcess();
2022-02-04 14:12:00 +11:00
return this.rpcCall<RpcInfoResponse>("get_waku_v2_debug_v1_info", []);
2021-03-10 17:39:53 +11:00
}
async sendMessage(
message: WakuRelayMessage,
pubSubTopic?: string
): Promise<boolean> {
this.checkProcess();
2022-02-04 14:12:00 +11:00
return this.rpcCall<boolean>("post_waku_v2_relay_v1_message", [
pubSubTopic ? pubSubTopic : DefaultPubSubTopic,
message,
]);
}
async messages(): Promise<WakuMessage[]> {
this.checkProcess();
2021-07-07 11:23:56 +10:00
const isDefined = (msg: WakuMessage | undefined): msg is WakuMessage => {
return !!msg;
};
const protoMsgs = await this.rpcCall<proto.WakuMessage[]>(
2022-02-04 14:12:00 +11:00
"get_waku_v2_relay_v1_messages",
[DefaultPubSubTopic]
2021-07-07 11:23:56 +10:00
);
const msgs = await Promise.all(
protoMsgs.map(async (protoMsg) => await WakuMessage.decodeProto(protoMsg))
);
return msgs.filter(isDefined);
}
async getAsymmetricKeyPair(): Promise<KeyPair> {
this.checkProcess();
const { seckey, pubkey } = await this.rpcCall<{
seckey: string;
pubkey: string;
2022-02-04 14:12:00 +11:00
}>("get_waku_v2_private_v1_asymmetric_keypair", []);
return { privateKey: seckey, publicKey: pubkey };
}
async postAsymmetricMessage(
message: WakuRelayMessage,
publicKey: Uint8Array,
pubSubTopic?: string
): Promise<boolean> {
this.checkProcess();
if (!message.payload) {
2022-02-04 14:12:00 +11:00
throw "Attempting to send empty message";
}
2022-02-04 14:12:00 +11:00
return this.rpcCall<boolean>("post_waku_v2_private_v1_asymmetric_message", [
pubSubTopic ? pubSubTopic : DefaultPubSubTopic,
message,
2022-02-14 10:50:02 +11:00
"0x" + bytesToHex(publicKey),
]);
}
async getAsymmetricMessages(
privateKey: Uint8Array,
pubSubTopic?: string
): Promise<WakuRelayMessage[]> {
this.checkProcess();
return await this.rpcCall<WakuRelayMessage[]>(
2022-02-04 14:12:00 +11:00
"get_waku_v2_private_v1_asymmetric_messages",
[
pubSubTopic ? pubSubTopic : DefaultPubSubTopic,
2022-02-14 10:50:02 +11:00
"0x" + bytesToHex(privateKey),
]
);
}
2022-02-14 10:50:02 +11:00
async getSymmetricKey(): Promise<Uint8Array> {
this.checkProcess();
return this.rpcCall<string>(
2022-02-04 14:12:00 +11:00
"get_waku_v2_private_v1_symmetric_key",
[]
2022-02-14 10:50:02 +11:00
).then(hexToBytes);
}
async postSymmetricMessage(
message: WakuRelayMessage,
symKey: Uint8Array,
pubSubTopic?: string
): Promise<boolean> {
this.checkProcess();
if (!message.payload) {
2022-02-04 14:12:00 +11:00
throw "Attempting to send empty message";
}
2022-02-04 14:12:00 +11:00
return this.rpcCall<boolean>("post_waku_v2_private_v1_symmetric_message", [
pubSubTopic ? pubSubTopic : DefaultPubSubTopic,
message,
2022-02-14 10:50:02 +11:00
"0x" + bytesToHex(symKey),
]);
}
async getSymmetricMessages(
symKey: Uint8Array,
pubSubTopic?: string
): Promise<WakuRelayMessage[]> {
this.checkProcess();
return await this.rpcCall<WakuRelayMessage[]>(
2022-02-04 14:12:00 +11:00
"get_waku_v2_private_v1_symmetric_messages",
2022-02-14 10:50:02 +11:00
[
pubSubTopic ? pubSubTopic : DefaultPubSubTopic,
"0x" + bytesToHex(symKey),
]
);
}
async getPeerId(): Promise<PeerId> {
2022-01-19 15:43:45 +11:00
return await this._getPeerId().then((res) => res.peerId);
}
async getMultiaddrWithId(): Promise<Multiaddr> {
2022-01-19 15:43:45 +11:00
return await this._getPeerId().then((res) => res.multiaddrWithId);
}
2022-01-19 15:43:45 +11:00
private async _getPeerId(): Promise<{
peerId: PeerId;
multiaddrWithId: Multiaddr;
}> {
if (this.peerId && this.multiaddrWithId) {
return { peerId: this.peerId, multiaddrWithId: this.multiaddrWithId };
}
const res = await this.info();
this.multiaddrWithId = res.listenAddresses
.map((ma) => multiaddr(ma))
2022-02-04 14:12:00 +11:00
.find((ma) => ma.protoNames().includes("ws"));
2022-04-01 12:19:51 +11:00
if (!this.multiaddrWithId) throw "Nwaku did not return a ws multiaddr";
const peerIdStr = this.multiaddrWithId.getPeerId();
2022-04-01 12:19:51 +11:00
if (!peerIdStr) throw "Nwaku multiaddr does not contain peerId";
this.peerId = PeerId.createFromB58String(peerIdStr);
return { peerId: this.peerId, multiaddrWithId: this.multiaddrWithId };
2021-03-10 17:39:53 +11:00
}
get rpcUrl(): string {
2022-02-01 12:54:54 +11:00
return `http://localhost:${this.rpcPort}/`;
2021-03-10 17:39:53 +11:00
}
private async rpcCall<T>(
2021-04-13 15:22:29 +10:00
method: string,
params: Array<string | number | unknown>
): Promise<T> {
2022-02-14 09:26:22 +11:00
const res = await fetch(this.rpcUrl, {
method: "POST",
body: JSON.stringify({
2022-02-04 14:12:00 +11:00
jsonrpc: "2.0",
2021-03-10 17:39:53 +11:00
id: 1,
method,
params,
2022-02-14 09:26:22 +11:00
}),
headers: new Headers({ "Content-Type": "application/json" }),
});
2021-03-10 17:39:53 +11:00
2022-02-14 09:26:22 +11:00
const json = await res.json();
return json.result;
2021-03-10 17:39:53 +11:00
}
private checkProcess(): void {
2021-03-10 17:39:53 +11:00
if (!this.process) {
2022-04-01 12:19:51 +11:00
throw "Nwaku hasn't started";
2021-03-10 17:39:53 +11:00
}
}
}
2021-03-11 10:54:35 +11:00
export function argsToArray(args: Args): Array<string> {
const array = [];
for (const [key, value] of Object.entries(args)) {
// Change the key from camelCase to kebab-case
const kebabKey = key.replace(/([A-Z])/, (_, capital) => {
2022-02-04 14:12:00 +11:00
return "-" + capital.toLowerCase();
2021-03-11 10:54:35 +11:00
});
const arg = `--${kebabKey}=${value}`;
array.push(arg);
}
return array;
}
export function defaultArgs(): Args {
2021-03-11 10:54:35 +11:00
return {
2022-02-04 14:12:00 +11:00
nat: "none",
listenAddress: "127.0.0.1",
2021-03-11 10:54:35 +11:00
relay: true,
rpc: true,
rpcAdmin: true,
websocketSupport: true,
2021-03-11 10:54:35 +11:00
};
}
2021-03-12 10:44:47 +11:00
export function strToHex(str: string): string {
let hex: string;
try {
hex = unescape(encodeURIComponent(str))
2022-02-04 14:12:00 +11:00
.split("")
.map(function (v) {
2021-03-12 10:44:47 +11:00
return v.charCodeAt(0).toString(16);
})
2022-02-04 14:12:00 +11:00
.join("");
} catch (e) {
hex = str;
2022-02-04 14:12:00 +11:00
console.log("invalid text input: " + str);
}
return hex;
}
2022-02-14 10:50:02 +11:00
export function bytesToHex(buffer: Uint8Array): string {
return Array.prototype.map
2022-02-04 14:12:00 +11:00
.call(buffer, (x) => ("00" + x.toString(16)).slice(-2))
.join("");
}
interface RpcInfoResponse {
2022-01-19 15:43:45 +11:00
// multiaddrs including peer id.
listenAddresses: string[];
}