Fix/whisper communication (#1895)

* fix(@cockpit/utils): make communications tab work again

* fix whisper node start; fix whisper console api registration; fix/refactor whisper web api
This commit is contained in:
Iuri Matias 2019-09-11 14:21:53 -04:00 committed by GitHub
parent e598e3f998
commit e330b338ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 829 additions and 121 deletions

View File

@ -11,7 +11,7 @@ module.exports = {
development: {
connection: {
host: "localhost", // Host of the blockchain node
port: 8546, // Port of the blockchain node
port: 8557, // Port of the blockchain node
type: "ws" // Type of connection (ws or rpc)
}
},

View File

@ -1,11 +1,12 @@
{
"default": {
"enabled": true,
"client": "geth",
"provider": "whisper",
"enabled": true,
"available_providers": ["whisper"],
"connection": {
"host": "localhost",
"port": 8546,
"port": 3777,
"type": "ws"
}
}

View File

@ -0,0 +1,446 @@
import { __ } from 'embark-i18n';
const fs = require('fs-extra');
const async = require('async');
const {spawn, exec} = require('child_process');
const path = require('path');
const constants = require('embark-core/constants');
const GethClient = require('./gethClient.js');
const ParityClient = require('./parityClient.js');
import { IPC } from 'embark-core';
import { compact, dappPath, defaultHost, dockerHostSwap, embarkPath} from 'embark-utils';
const Logger = require('embark-logger');
// time between IPC connection attempts (in ms)
const IPC_CONNECT_INTERVAL = 2000;
/*eslint complexity: ["error", 50]*/
var Blockchain = function(userConfig, clientClass) {
this.userConfig = userConfig;
this.env = userConfig.env || 'development';
this.isDev = userConfig.isDev;
this.onReadyCallback = userConfig.onReadyCallback || (() => {});
this.onExitCallback = userConfig.onExitCallback;
this.logger = userConfig.logger || new Logger({logLevel: 'debug', context: constants.contexts.blockchain}); // do not pass in events as we don't want any log events emitted
this.events = userConfig.events;
this.isStandalone = userConfig.isStandalone;
this.certOptions = userConfig.certOptions;
let defaultWsApi = clientClass.DEFAULTS.WS_API;
if (this.isDev) defaultWsApi = clientClass.DEFAULTS.DEV_WS_API;
this.config = {
silent: this.userConfig.silent,
client: this.userConfig.client,
ethereumClientBin: this.userConfig.ethereumClientBin || this.userConfig.client,
networkType: this.userConfig.networkType || clientClass.DEFAULTS.NETWORK_TYPE,
networkId: this.userConfig.networkId || clientClass.DEFAULTS.NETWORK_ID,
genesisBlock: this.userConfig.genesisBlock || false,
datadir: this.userConfig.datadir,
mineWhenNeeded: this.userConfig.mineWhenNeeded || false,
rpcHost: dockerHostSwap(this.userConfig.rpcHost) || defaultHost,
rpcPort: this.userConfig.rpcPort || 8545,
rpcCorsDomain: this.userConfig.rpcCorsDomain || false,
rpcApi: this.userConfig.rpcApi || clientClass.DEFAULTS.RPC_API,
port: this.userConfig.port || 30303,
nodiscover: this.userConfig.nodiscover || false,
mine: this.userConfig.mine || false,
account: {},
whisper: (this.userConfig.whisper !== false),
maxpeers: ((this.userConfig.maxpeers === 0) ? 0 : (this.userConfig.maxpeers || 25)),
bootnodes: this.userConfig.bootnodes || "",
wsRPC: (this.userConfig.wsRPC !== false),
wsHost: dockerHostSwap(this.userConfig.wsHost) || defaultHost,
wsPort: this.userConfig.wsPort || 8546,
wsOrigins: this.userConfig.wsOrigins || false,
wsApi: this.userConfig.wsApi || defaultWsApi,
vmdebug: this.userConfig.vmdebug || false,
targetGasLimit: this.userConfig.targetGasLimit || false,
syncMode: this.userConfig.syncMode || this.userConfig.syncmode,
verbosity: this.userConfig.verbosity
};
this.devFunds = null;
if (this.userConfig.accounts) {
const nodeAccounts = this.userConfig.accounts.find(account => account.nodeAccounts);
if (nodeAccounts) {
this.config.account = {
numAccounts: nodeAccounts.numAddresses || 1,
password: nodeAccounts.password,
balance: nodeAccounts.balance
};
}
}
if (this.userConfig === {} || this.userConfig.default || JSON.stringify(this.userConfig) === '{"client":"geth"}') {
if (this.env === 'development') {
this.isDev = true;
} else {
this.config.genesisBlock = embarkPath("templates/boilerplate/config/privatenet/genesis.json");
}
this.config.datadir = dappPath(".embark/development/datadir");
this.config.wsOrigins = this.config.wsOrigins || "http://localhost:8000";
this.config.rpcCorsDomain = this.config.rpcCorsDomain || "http://localhost:8000";
this.config.targetGasLimit = 8000000;
}
this.config.account.devPassword = path.join(this.config.datadir, "devPassword");
const spaceMessage = 'The path for %s in blockchain config contains spaces, please remove them';
if (this.config.datadir && this.config.datadir.indexOf(' ') > 0) {
this.logger.error(__(spaceMessage, 'datadir'));
process.exit(1);
}
if (this.config.account.password && this.config.account.password.indexOf(' ') > 0) {
this.logger.error(__(spaceMessage, 'accounts.password'));
process.exit(1);
}
if (this.config.genesisBlock && this.config.genesisBlock.indexOf(' ') > 0) {
this.logger.error(__(spaceMessage, 'genesisBlock'));
process.exit(1);
}
this.client = new clientClass({config: this.config, env: this.env, isDev: this.isDev});
this.initStandaloneProcess();
};
/**
* Polls for a connection to an IPC server (generally this is set up
* in the Embark process). Once connected, any logs logged to the
* Logger will be shipped off to the IPC server. In the case of `embark
* run`, the BlockchainListener module is listening for these logs.
*
* @returns {void}
*/
Blockchain.prototype.initStandaloneProcess = function () {
if (this.isStandalone) {
let logQueue = [];
// on every log logged in logger (say that 3x fast), send the log
// to the IPC serve listening (only if we're connected of course)
this.logger.events.on('log', (logLevel, message) => {
if (this.ipc.connected) {
this.ipc.request('blockchain:log', {logLevel, message});
} else {
logQueue.push({logLevel, message});
}
});
this.ipc = new IPC({ipcRole: 'client'});
// Wait for an IPC server to start (ie `embark run`) by polling `.connect()`.
// Do not kill this interval as the IPC server may restart (ie restart
// `embark run` without restarting `embark blockchain`)
setInterval(() => {
if (!this.ipc.connected) {
this.ipc.connect(() => {
if (this.ipc.connected) {
logQueue.forEach(message => { this.ipc.request('blockchain:log', message); });
logQueue = [];
this.ipc.client.on('process:blockchain:stop', () => {
this.kill();
process.exit(0);
});
}
});
}
}, IPC_CONNECT_INTERVAL);
}
};
Blockchain.prototype.runCommand = function (cmd, options, callback) {
this.logger.info(__("running: %s", cmd.underline).green);
if (this.config.silent) {
options.silent = true;
}
return exec(cmd, options, callback);
};
Blockchain.prototype.run = function () {
var self = this;
this.logger.info("===============================================================================".magenta);
this.logger.info("===============================================================================".magenta);
this.logger.info(__("Embark Blockchain using %s", self.client.prettyName.underline).magenta);
this.logger.info("===============================================================================".magenta);
this.logger.info("===============================================================================".magenta);
if (self.client.name === constants.blockchain.clients.geth) this.checkPathLength();
let address = '';
async.waterfall([
function checkInstallation(next) {
self.isClientInstalled((err) => {
if (err) {
return next({message: err});
}
next();
});
},
function init(next) {
if (self.isDev) {
return self.initDevChain((err) => {
next(err);
});
}
return self.initChainAndGetAddress((err, addr) => {
address = addr;
next(err);
});
},
function getMainCommand(next) {
self.client.mainCommand(address, function (cmd, args) {
next(null, cmd, args);
}, true);
}
], function(err, cmd, args) {
if (err) {
self.logger.error(err.message);
return;
}
args = compact(args);
let full_cmd = cmd + " " + args.join(' ');
self.logger.info(__(">>>>>>>>>>>>>>>>> running: %s", full_cmd.underline).green);
self.child = spawn(cmd, args, {cwd: process.cwd()});
self.child.on('error', (err) => {
err = err.toString();
self.logger.error('Blockchain error: ', err);
if (self.env === 'development' && err.indexOf('Failed to unlock') > 0) {
self.logger.error('\n' + __('Development blockchain has changed to use the --dev option.').yellow);
self.logger.error(__('You can reset your workspace to fix the problem with').yellow + ' embark reset'.cyan);
self.logger.error(__('Otherwise, you can change your data directory in blockchain.json (datadir)').yellow);
}
});
// TOCHECK I don't understand why stderr and stdout are reverted.
// This happens with Geth and Parity, so it does not seems a client problem
self.child.stdout.on('data', (data) => {
self.logger.info(`${self.client.name} error: ${data}`);
});
self.child.stderr.on('data', async (data) => {
data = data.toString();
if (!self.readyCalled && self.client.isReady(data)) {
self.readyCalled = true;
self.readyCallback();
}
self.logger.info(`${self.client.name}: ${data}`);
});
self.child.on('exit', (code) => {
let strCode;
if (code) {
strCode = 'with error code ' + code;
} else {
strCode = 'with no error code (manually killed?)';
}
self.logger.error(self.client.name + ' exited ' + strCode);
if (self.onExitCallback) {
self.onExitCallback();
}
});
self.child.on('uncaughtException', (err) => {
self.logger.error('Uncaught ' + self.client.name + ' exception', err);
if (self.onExitCallback) {
self.onExitCallback();
}
});
});
};
Blockchain.prototype.readyCallback = function () {
if (this.onReadyCallback) {
this.onReadyCallback();
}
if (this.config.mineWhenNeeded && !this.isDev) {
this.miner = this.client.getMiner();
}
};
Blockchain.prototype.kill = function () {
if (this.child) {
this.child.kill();
}
};
Blockchain.prototype.checkPathLength = function () {
let _dappPath = dappPath('');
if (_dappPath.length > 66) {
// this.logger.error is captured and sent to the console output regardless of silent setting
this.logger.error("===============================================================================".yellow);
this.logger.error("===========> ".yellow + __('WARNING! ÐApp path length is too long: ').yellow + _dappPath.yellow);
this.logger.error("===========> ".yellow + __('This is known to cause issues with starting geth, please consider reducing your ÐApp path\'s length to 66 characters or less.').yellow);
this.logger.error("===============================================================================".yellow);
}
};
Blockchain.prototype.isClientInstalled = function (callback) {
let versionCmd = this.client.determineVersionCommand();
this.runCommand(versionCmd, {}, (err, stdout, stderr) => {
if (err || !stdout || stderr.indexOf("not found") >= 0 || stdout.indexOf("not found") >= 0) {
return callback(__('Ethereum client bin not found:') + ' ' + this.client.getBinaryPath());
}
const parsedVersion = this.client.parseVersion(stdout);
const supported = this.client.isSupportedVersion(parsedVersion);
if (supported === undefined) {
this.logger.error((__('WARNING! Ethereum client version could not be determined or compared with version range') + ' ' + this.client.versSupported + __(', for best results please use a supported version')).yellow);
} else if (!supported) {
this.logger.error((__('WARNING! Ethereum client version unsupported, for best results please use a version in range') + ' ' + this.client.versSupported).yellow);
}
callback();
});
};
Blockchain.prototype.initDevChain = function(callback) {
const self = this;
const ACCOUNTS_ALREADY_PRESENT = 'accounts_already_present';
// Init the dev chain
self.client.initDevChain(self.config.datadir, (err) => {
if (err) {
return callback(err);
}
const accountsToCreate = self.config.account && self.config.account.numAccounts;
if (!accountsToCreate) return callback();
// Create other accounts
async.waterfall([
function listAccounts(next) {
self.runCommand(self.client.listAccountsCommand(), {}, (err, stdout, _stderr) => {
if (err || stdout === undefined || stdout.indexOf("Fatal") >= 0) {
console.log(__("no accounts found").green);
return next();
}
// List current addresses
self.config.unlockAddressList = self.client.parseListAccountsCommandResultToAddressList(stdout);
// Count current addresses and remove the default account from the count (because password can be different)
let addressCount = self.config.unlockAddressList.length;
if (addressCount < accountsToCreate) {
next(null, accountsToCreate - addressCount);
} else {
next(ACCOUNTS_ALREADY_PRESENT);
}
});
},
function newAccounts(accountsToCreate, next) {
var accountNumber = 0;
async.whilst(
function() {
return accountNumber < accountsToCreate;
},
function(callback) {
accountNumber++;
self.runCommand(self.client.newAccountCommand(), {}, (err, stdout, _stderr) => {
if (err) {
return callback(err, accountNumber);
}
self.config.unlockAddressList.push(self.client.parseNewAccountCommandResultToAddress(stdout));
callback(null, accountNumber);
});
},
function(err) {
next(err);
}
);
}
], (err) => {
if (err && err !== ACCOUNTS_ALREADY_PRESENT) {
console.log(err);
return callback(err);
}
callback();
});
});
};
Blockchain.prototype.initChainAndGetAddress = function (callback) {
const self = this;
let address = null;
const ALREADY_INITIALIZED = 'already';
// ensure datadir exists, bypassing the interactive liabilities prompt.
self.datadir = self.config.datadir;
async.waterfall([
function makeDir(next) {
fs.mkdirp(self.datadir, (err, _result) => {
next(err);
});
},
function listAccounts(next) {
self.runCommand(self.client.listAccountsCommand(), {}, (err, stdout, _stderr) => {
if (err || stdout === undefined || stdout.indexOf("Fatal") >= 0) {
self.logger.info(__("no accounts found").green);
return next();
}
let firstAccountFound = self.client.parseListAccountsCommandResultToAddress(stdout);
if (firstAccountFound === undefined || firstAccountFound === "") {
console.log(__("no accounts found").green);
return next();
}
self.logger.info(__("already initialized").green);
address = firstAccountFound;
next(ALREADY_INITIALIZED);
});
},
function genesisBlock(next) {
//There's no genesis init with Parity. Custom network are set in the chain property at startup
if (!self.config.genesisBlock || self.client.name === constants.blockchain.clients.parity) {
return next();
}
self.logger.info(__("initializing genesis block").green);
self.runCommand(self.client.initGenesisCommmand(), {}, (err, _stdout, _stderr) => {
next(err);
});
},
function newAccount(next) {
self.runCommand(self.client.newAccountCommand(), {}, (err, stdout, _stderr) => {
if (err) {
return next(err);
}
address = self.client.parseNewAccountCommandResultToAddress(stdout);
next();
});
}
], (err) => {
if (err === ALREADY_INITIALIZED) {
err = null;
}
callback(err, address);
});
};
export function BlockchainClient(userConfig, options) {
if ((userConfig === {} || JSON.stringify(userConfig) === '{"enabled":true}') && options.env !== 'development') {
options.logger.info("===> " + __("warning: running default config on a non-development environment"));
}
// if client is not set in preferences, default is geth
if (!userConfig.client) userConfig.client = constants.blockchain.clients.geth;
// if clientName is set, it overrides preferences
if (options.clientName) userConfig.client = options.clientName;
// Choose correct client instance based on clientName
let clientClass;
switch (userConfig.client) {
case constants.blockchain.clients.geth:
clientClass = GethClient;
break;
case constants.blockchain.clients.parity:
clientClass = ParityClient;
break;
default:
console.error(__('Unknown client "%s". Please use one of the following: %s', userConfig.client, Object.keys(constants.blockchain.clients).join(', ')));
process.exit(1);
}
userConfig.isDev = (userConfig.isDev || userConfig.default);
userConfig.env = options.env;
userConfig.onReadyCallback = options.onReadyCallback;
userConfig.onExitCallback = options.onExitCallback;
userConfig.logger = options.logger;
userConfig.certOptions = options.certOptions;
userConfig.isStandalone = options.isStandalone;
return new Blockchain(userConfig, clientClass);
}

View File

@ -154,6 +154,7 @@ simulator(_options) {
// TODO: replace with individual plugins
engine.registerModuleGroup("namesystem");
engine.registerModuleGroup("communication");
engine.registerModuleGroup("blockchain");
engine.registerModuleGroup("compiler");
engine.registerModuleGroup("contracts");
@ -161,7 +162,6 @@ simulator(_options) {
engine.registerModuleGroup("webserver");
engine.registerModuleGroup("filewatcher");
engine.registerModuleGroup("storage");
engine.registerModuleGroup("communication");
engine.registerModuleGroup("cockpit");
engine.registerModulePackage('embark-deploy-tracker', {plugins: engine.plugins});
@ -177,7 +177,6 @@ simulator(_options) {
} catch (e) {
return cb(e);
}
cb();
});
@ -748,6 +747,12 @@ simulator(_options) {
plugin.registerActionForEvent("embark:engine:started", async (_params, cb) => {
try {
await engine.events.request2("blockchain:node:start", engine.config.blockchainConfig);
await Promise.all([
engine.events.request2("storage:node:start", engine.config.storageConfig),
engine.events.request2("communication:node:start", engine.config.communicationConfig),
engine.events.request2("namesystem:node:start", engine.config.namesystemConfig)
]);
} catch (e) {
return cb(e);
}

View File

@ -5,6 +5,7 @@ const {spawn, exec} = require('child_process');
const path = require('path');
const constants = require('embark-core/constants');
const GethClient = require('./gethClient.js');
const WhisperGethClient = require('./whisperClient.js');
// const ParityClient = require('./parityClient.js');
import { IPC } from 'embark-core';
@ -15,7 +16,7 @@ const Logger = require('embark-logger');
const IPC_CONNECT_INTERVAL = 2000;
/*eslint complexity: ["error", 50]*/
var Blockchain = function(userConfig, clientClass) {
var Blockchain = function(userConfig, clientClass, communicationConfig) {
this.userConfig = userConfig;
this.env = userConfig.env || 'development';
this.isDev = userConfig.isDev;
@ -100,7 +101,7 @@ var Blockchain = function(userConfig, clientClass) {
this.logger.error(__(spaceMessage, 'genesisBlock'));
process.exit(1);
}
this.client = new clientClass({config: this.config, env: this.env, isDev: this.isDev});
this.client = new clientClass({config: this.config, env: this.env, isDev: this.isDev, communicationConfig: communicationConfig});
if (this.isStandalone) {
this.initStandaloneProcess();
@ -203,7 +204,7 @@ Blockchain.prototype.run = function () {
args = compact(args);
let full_cmd = cmd + " " + args.join(' ');
self.logger.info(__("running: %s", full_cmd.underline).green);
self.logger.info(__(">>>>>>>>>>>>>>>> running: %s", full_cmd.underline).green);
self.child = spawn(cmd, args, {cwd: process.cwd()});
self.child.on('error', (err) => {
@ -415,7 +416,7 @@ Blockchain.prototype.initChainAndGetAddress = function (callback) {
});
};
export function BlockchainClient(userConfig, options) {
export function BlockchainClient(userConfig, options, communicationConfig) {
if ((userConfig === {} || JSON.stringify(userConfig) === '{"enabled":true}') && options.env !== 'development') {
options.logger.info("===> " + __("warning: running default config on a non-development environment"));
}
@ -425,18 +426,13 @@ export function BlockchainClient(userConfig, options) {
if (options.clientName) userConfig.client = options.clientName;
// Choose correct client instance based on clientName
let clientClass;
switch (userConfig.client) {
case constants.blockchain.clients.geth:
clientClass = GethClient;
break;
// case constants.blockchain.clients.parity:
// clientClass = ParityClient;
// break;
default:
console.error(__('Unknown client "%s". Please use one of the following: %s', userConfig.client, Object.keys(constants.blockchain.clients).join(', ')));
process.exit(1);
if (communicationConfig) {
clientClass = WhisperGethClient
} else {
clientClass = GethClient;
}
userConfig.isDev = (userConfig.isDev || userConfig.default);
userConfig.env = options.env;
userConfig.onReadyCallback = options.onReadyCallback;
@ -444,5 +440,5 @@ export function BlockchainClient(userConfig, options) {
userConfig.logger = options.logger;
userConfig.certOptions = options.certOptions;
userConfig.isStandalone = options.isStandalone;
return new Blockchain(userConfig, clientClass);
return new Blockchain(userConfig, clientClass, communicationConfig);
}

View File

@ -9,6 +9,7 @@ class BlockchainProcess extends ProcessWrapper {
constructor(options) {
super();
this.blockchainConfig = options.blockchainConfig;
this.communicationConfig = options.communicationConfig;
this.client = options.client;
this.env = options.env;
this.isDev = options.isDev;
@ -26,7 +27,8 @@ class BlockchainProcess extends ProcessWrapper {
onReadyCallback: this.blockchainReady.bind(this),
onExitCallback: this.blockchainExit.bind(this),
logger: console
}
},
this.communicationConfig
);
this.blockchain.run();

View File

@ -10,6 +10,7 @@ export class BlockchainProcessLauncher {
this.logger = options.logger;
this.normalizeInput = options.normalizeInput;
this.blockchainConfig = options.blockchainConfig;
this.communicationConfig = options.communicationConfig;
this.locale = options.locale;
this.isDev = options.isDev;
this.client = options.client;
@ -35,6 +36,7 @@ export class BlockchainProcessLauncher {
this.blockchainProcess.send({
action: constants.blockchain.init, options: {
blockchainConfig: this.blockchainConfig,
communicationConfig: this.communicationConfig,
client: this.client,
env: this.env,
isDev: this.isDev,

View File

@ -11,6 +11,7 @@ class Geth {
this.embark = embark;
this.embarkConfig = embark.config.embarkConfig;
this.blockchainConfig = embark.config.blockchainConfig;
this.communicationConfig = embark.config.communicationConfig;
this.locale = options.locale;
this.logger = embark.logger;
this.client = options.client;
@ -25,7 +26,6 @@ class Geth {
this.events.request("blockchain:node:register", constants.blockchain.clients.geth, (readyCb) => {
this.events.request('processes:register', 'blockchain', {
launchFn: (cb) => {
// this.startBlockchainNode(readyCb);
this.startBlockchainNode(cb);
},
stopFn: (cb) => {
@ -40,6 +40,24 @@ class Geth {
});
this.registerServiceCheck();
});
this.events.request("whisper:node:register", constants.blockchain.clients.geth, readyCb => {
this.events.request('processes:register', 'communication', {
launchFn: cb => {
this.startWhisperNode(cb);
},
stopFn: cb => {
this.stopWhisperNode(cb);
}
});
this.events.request("processes:launch", "communication", (err) => {
if (err) {
this.logger.error(`Error launching whisper process: ${err.message || err}`);
}
readyCb();
});
});
}
shouldInit() {
@ -96,6 +114,31 @@ class Geth {
this.blockchainProcess.startBlockchainNode(callback);
}
startWhisperNode(callback) {
this.whisperProcess = new BlockchainProcessLauncher({
events: this.events,
logger: this.logger,
normalizeInput,
blockchainConfig: this.blockchainConfig,
communicationConfig: this.communicationConfig,
locale: this.locale,
client: this.client,
isDev: this.isDev,
embark: this.embark
});
this.whisperProcess.startBlockchainNode(callback);
}
stopWhisperNode(cb) {
if (!this.whisperProcess) {
return cb();
}
this.whisperProcess.stopBlockchainNode(() => {
this.logger.info(`The whisper process has been stopped.`);
cb();
});
}
stopBlockchainNode(cb) {
const message = __(`The blockchain process has been stopped. It can be restarted by running ${"service blockchain on".bold} in the Embark console.`);

View File

@ -0,0 +1,261 @@
import { __ } from 'embark-i18n';
import { dappPath, ipcPath } from 'embark-utils';
const async = require('async');
const {exec, spawn} = require('child_process');
const path = require('path');
const GethMiner = require('./miner');
const semver = require('semver');
const constants = require('embark-core/constants');
const DEFAULTS = {
"BIN": "geth",
"VERSIONS_SUPPORTED": ">=1.8.14",
"NETWORK_TYPE": "custom",
"NETWORK_ID": 1337,
"RPC_API": ['eth', 'web3', 'net', 'debug', 'personal'],
"WS_API": ['eth', 'web3', 'net', 'shh', 'debug', 'pubsub', 'personal'],
"DEV_WS_API": ['eth', 'web3', 'net', 'shh', 'debug', 'pubsub', 'personal'],
"TARGET_GAS_LIMIT": 8000000
};
class WhisperGethClient {
static get DEFAULTS() {
return DEFAULTS;
}
constructor(options) {
this.config = options && options.hasOwnProperty('config') ? options.config : {};
this.communicationConfig = options.communicationConfig;
this.env = options && options.hasOwnProperty('env') ? options.env : 'development';
this.isDev = options && options.hasOwnProperty('isDev') ? options.isDev : (this.env === 'development');
this.name = constants.blockchain.clients.geth;
this.prettyName = "Go-Ethereum (https://github.com/ethereum/go-ethereum)";
this.bin = this.config.ethereumClientBin || DEFAULTS.BIN;
this.versSupported = DEFAULTS.VERSIONS_SUPPORTED;
this.httpReady = false;
this.wsReady = !this.config.wsRPC;
}
isReady(data) {
if (data.indexOf('WebSocket endpoint opened') > -1) {
this.wsReady = true;
}
return this.wsReady;
}
/**
* Check if the client needs some sort of 'keep alive' transactions to avoid freezing by inactivity
* @returns {boolean} if keep alive is needed
*/
needKeepAlive() {
return false;
}
commonOptions() {
return [];
}
getMiner() {
return new GethMiner({datadir: this.config.datadir});
}
getBinaryPath() {
return this.bin;
}
determineVersionCommand() {
return this.bin + " version";
}
parseVersion(rawVersionOutput) {
let parsed;
const match = rawVersionOutput.match(/Version: (.*)/);
if (match) {
parsed = match[1].trim();
}
return parsed;
}
isSupportedVersion(parsedVersion) {
let test;
try {
let v = semver(parsedVersion);
v = `${v.major}.${v.minor}.${v.patch}`;
test = semver.Range(this.versSupported).test(semver(v));
if (typeof test !== 'boolean') {
test = undefined;
}
} finally {
// eslint-disable-next-line no-unsafe-finally
return test;
}
}
determineNetworkType(config) {
let cmd;
if (config.networkType === 'testnet') {
cmd = "--testnet";
} else if (config.networkType === 'rinkeby') {
cmd = "--rinkeby";
} else if (config.networkType === 'custom') {
cmd = "--networkid=" + config.networkId;
}
return cmd;
}
runAsArchival(config) {
return config.networkId === 1337 || config.archivalMode;
}
initGenesisCommmand() {
return "";
}
newAccountCommand() {
return "";
}
parseNewAccountCommandResultToAddress(data = "") {
if (data.match(/{(\w+)}/)) return "0x" + data.match(/{(\w+)}/)[1];
return "";
}
listAccountsCommand() {
return "";
}
parseListAccountsCommandResultToAddress(data = "") {
if (data.match(/{(\w+)}/)) return "0x" + data.match(/{(\w+)}/)[1];
return "";
}
parseListAccountsCommandResultToAddressList(data = "") {
const regex = RegExp(/{(\w+)}/g);
let match;
const accounts = [];
while ((match = regex.exec(data)) !== null) {
accounts.push('0x' + match[1]);
}
return accounts;
}
parseListAccountsCommandResultToAddressCount(data = "") {
return 0;
}
determineRpcOptions(config) {
let cmd = [];
cmd.push("--port=30304");
cmd.push("--rpc");
cmd.push("--rpcport=9998");
cmd.push("--rpcaddr=" + config.rpcHost);
if (config.rpcCorsDomain) {
if (config.rpcCorsDomain === '*') {
console.warn('==================================');
console.warn(__('rpcCorsDomain set to *'));
console.warn(__('make sure you know what you are doing'));
console.warn('==================================');
}
cmd.push("--rpccorsdomain=" + config.rpcCorsDomain);
} else {
console.warn('==================================');
console.warn(__('warning: cors is not set'));
console.warn('==================================');
}
return cmd;
}
determineWsOptions(config, communicationConfig) {
let cmd = [];
if (config.wsRPC) {
cmd.push("--ws");
cmd.push(`--wsport=${communicationConfig.connection.port || config.wsPost++}`);
cmd.push(`--wsaddr=${communicationConfig.connection.host || config.wsHost++}`);
if (config.wsOrigins) {
if (config.wsOrigins === '*') {
console.warn('==================================');
console.warn(__('wsOrigins set to *'));
console.warn(__('make sure you know what you are doing'));
console.warn('==================================');
}
cmd.push("--wsorigins=" + config.wsOrigins);
} else {
console.warn('==================================');
console.warn(__('warning: wsOrigins is not set'));
console.warn('==================================');
}
}
return cmd;
}
initDevChain(datadir, callback) {
callback();
}
mainCommand(address, done) {
let self = this;
let config = this.config;
let rpc_api = this.config.rpcApi;
let ws_api = this.config.wsApi;
let args = [];
async.series([
function commonOptions(callback) {
let cmd = self.commonOptions();
args = args.concat(cmd);
callback(null, cmd);
},
function wsOptions(callback) {
let cmd = self.determineWsOptions(self.config, self.communicationConfig);
args = args.concat(cmd);
callback(null, cmd);
},
function dontGetPeers(callback) {
if (config.nodiscover) {
args.push("--nodiscover");
return callback(null, "--nodiscover");
}
callback(null, "");
},
function maxPeers(callback) {
let cmd = "--maxpeers=" + config.maxpeers;
args.push(cmd);
callback(null, cmd);
},
function bootnodes(callback) {
if (config.bootnodes && config.bootnodes !== "" && config.bootnodes !== []) {
args.push("--bootnodes=" + config.bootnodes);
return callback(null, "--bootnodes=" + config.bootnodes);
}
callback("");
},
function whisper(callback) {
rpc_api.push('shh');
if (ws_api.indexOf('shh') === -1) {
ws_api.push('shh');
}
args.push("--shh");
return callback(null, "--shh ");
},
function rpcApi(callback) {
args.push('--rpcapi=' + rpc_api.join(','));
callback(null, '--rpcapi=' + rpc_api.join(','));
},
function wsApi(callback) {
args.push('--wsapi=' + ws_api.join(','));
callback(null, '--wsapi=' + ws_api.join(','));
}
], function(err) {
if (err) {
throw new Error(err.message);
}
return done(self.bin, args);
});
}
}
module.exports = WhisperGethClient;

View File

@ -46,7 +46,7 @@ __embarkWhisperNewWeb3.setProvider = function(options) {
});
};
__embarkWhisperNewWeb3.sendMessage = function(options) {
__embarkWhisperNewWeb3.sendMessage = function(options, callback) {
const data = options.data || options.payload;
if (!data) {
throw new Error("missing option: data");
@ -64,6 +64,7 @@ __embarkWhisperNewWeb3.sendMessage = function(options) {
if (err) {
throw new Error(err);
}
callback();
});
};

View File

@ -62,7 +62,8 @@
"npm-run-all": "4.1.5",
"rimraf": "3.0.0",
"tslint": "5.16.0",
"typescript": "3.4.5"
"typescript": "3.4.5",
"web3": "1.2.1"
},
"engines": {
"node": ">=8.12.0 <12.0.0",

View File

@ -1,98 +1,49 @@
import {buildUrlFromConfig, canonicalHost, defaultHost} from 'embark-utils';
const {parallel} = require('async');
const {fromEvent} = require('rxjs');
const {map, takeUntil} = require('rxjs/operators');
const Web3 = require('web3');
import whisper from 'embarkjs-whisper';
const sendMessage = whisper.real_sendMessage;
const listenTo = whisper.real_listenTo;
import EmbarkJS from 'embarkjs';
import EmbarkJSWhisper from 'embarkjs-whisper';
class API {
constructor(embark) {
this.embark = embark;
this.logger = embark.logger;
this.communicationConfig = embark.config.communicationConfig;
}
embark.events.on("blockchain:started", this.registerAPICalls.bind(this));
async initEmbarkJSWhisper() {
if (Object.keys(EmbarkJS.Messages.Providers).includes("whisper")) return;
EmbarkJS.Messages.registerProvider('whisper', EmbarkJSWhisper);
EmbarkJS.Messages.setProvider('whisper', this.communicationConfig.connection);
}
async registerAPICalls() {
const self = this;
this.initEmbarkJSWhisper();
this.registerSendMessageCall();
this.registerListenToCall();
}
const connection = this.communicationConfig.connection || {};
const config = {
host: canonicalHost(connection.host || defaultHost),
port: connection.port || '8546',
type: connection.type || 'ws'
};
this.web3 = new Web3(buildUrlFromConfig(config));
async registerSendMessageCall() {
this.embark.registerAPICall(
'post',
'/embark-api/communication/sendMessage',
(req, res) => {
EmbarkJS.Messages.sendMessage({ topic: req.body.topic, data: req.body.message }, (err, result) => {
if (err) {
return res.status(500).send({ error: err });
}
res.send(result);
});
});
}
if (self.apiCallsRegistered) {
return;
}
self.apiCallsRegistered = true;
let symKeyID, sig;
parallel([
function(paraCb) {
self.web3.shh.newSymKey((err, id) => {
symKeyID = id;
paraCb(err);
async registerListenToCall() {
this.embark.registerAPICall(
'ws',
'/embark-api/communication/listenTo/:topic',
(ws, req) => {
EmbarkJS.Messages.listenTo({ topic: req.params.topic }).subscribe(data => {
ws.send(JSON.stringify(data));
});
},
function(paraCb) {
self.web3.shh.newKeyPair((err, id) => {
sig = id;
paraCb(err);
});
}
], (err) => {
if (err) {
self.logger.error('Error getting Whisper keys:', err.message || err);
return;
}
self.embark.registerAPICall(
'post',
'/embark-api/communication/sendMessage',
(req, res) => {
sendMessage({
topic: req.body.topic,
data: req.body.message,
sig,
symKeyID,
fromAscii: self.web3.utils.asciiToHex,
toHex: self.web3.utils.toHex,
post: self.web3.shh.post
}, (err, result) => {
if (err) {
return res.status(500).send({error: err});
}
res.send(result);
});
});
self.embark.registerAPICall(
'ws',
'/embark-api/communication/listenTo/:topic',
(ws, req) => {
const obs = listenTo({
toAscii: self.web3.utils.hexToAscii,
toHex: self.web3.utils.toHex,
topic: req.params.topic,
sig,
subscribe: self.web3.shh.subscribe,
symKeyID
}).pipe(takeUntil(fromEvent(ws, 'close').pipe(map(() => (
delete self.webSocketsChannels[req.params.topic]
)))));
self.webSocketsChannels[req.params.topic] = obs;
obs.subscribe(data => {
ws.send(JSON.stringify(data));
});
});
});
});
}
}

View File

@ -15,27 +15,26 @@ class Whisper {
this.modulesPath = dappPath(embark.config.embarkConfig.generationDir, constants.dappArtifacts.symlinkDir);
this.api = new API(embark);
this.api.registerAPICalls();
this.whisperNodes = {};
this.events.request("embarkjs:plugin:register", 'messages', 'whisper', 'embarkjs-whisper');
this.events.request("embarkjs:console:register", 'messages', 'whisper', 'embarkjs-whisper');
// TODO: should launch its own whisper node
// this.events.on("communication:started", this.connectEmbarkJSProvider.bind(this));
this.events.on("blockchain:started", this.connectEmbarkJSProvider.bind(this));
this.events.setCommandHandler("whisper:node:register", (clientName, startCb) => {
this.whisperNodes[clientName] = startCb;
});
this.events.request("communication:node:register", "whisper", (readyCb) => {
// TODO: should launch its own whisper node
// this.events.request('processes:register', 'communication', {
// launchFn: (cb) => {
// this.startProcess(cb);
// },
// stopFn: (cb) => { this.stopProcess(cb); }
// });
// this.events.request("processes:launch", "communication", (err) => {
readyCb();
// });
// this.registerServiceCheck()
let clientName = this.communicationConfig.client || "geth";
let registerCb = this.whisperNodes[clientName];
if (!registerCb) return cb("whisper client " + clientName + " not found");
registerCb.apply(registerCb, [readyCb]);
});
this.events.on("communication:started", () => {
this.api = new API(embark);
this.api.registerAPICalls();
this.connectEmbarkJSProvider()
});
}