From e330b338ea2a45acb14eebd93af93bc2aba62516 Mon Sep 17 00:00:00 2001 From: Iuri Matias Date: Wed, 11 Sep 2019 14:21:53 -0400 Subject: [PATCH] Fix/whisper communication (#1895) * fix(@cockpit/utils): make communications tab work again * fix whisper node start; fix whisper console api registration; fix/refactor whisper web api --- dapps/templates/demo/config/communication.js | 2 +- dapps/tests/app/config/communication.json | 5 +- .../src/blockchain.js | 446 ++++++++++++++++++ packages/embark/src/cmd/cmd_controller.js | 9 +- .../embark/src/lib/modules/geth/blockchain.js | 26 +- .../src/lib/modules/geth/blockchainProcess.js | 4 +- .../modules/geth/blockchainProcessLauncher.js | 2 + packages/embark/src/lib/modules/geth/index.js | 45 +- .../src/lib/modules/geth/whisperClient.js | 261 ++++++++++ .../embarkjs/embarkjs-whisper/src/index.js | 3 +- packages/plugins/whisper/package.json | 3 +- packages/plugins/whisper/src/api.js | 115 ++--- packages/plugins/whisper/src/index.js | 29 +- 13 files changed, 829 insertions(+), 121 deletions(-) create mode 100644 packages/embark-blockchain-process/src/blockchain.js create mode 100644 packages/embark/src/lib/modules/geth/whisperClient.js diff --git a/dapps/templates/demo/config/communication.js b/dapps/templates/demo/config/communication.js index 818248258..637b85c33 100644 --- a/dapps/templates/demo/config/communication.js +++ b/dapps/templates/demo/config/communication.js @@ -11,7 +11,7 @@ module.exports = { development: { connection: { host: "localhost", // Host of the blockchain node - port: 8546, // Port of the blockchain node + port: 8557, // Port of the blockchain node type: "ws" // Type of connection (ws or rpc) } }, diff --git a/dapps/tests/app/config/communication.json b/dapps/tests/app/config/communication.json index a6b7f7617..96a2b498a 100644 --- a/dapps/tests/app/config/communication.json +++ b/dapps/tests/app/config/communication.json @@ -1,11 +1,12 @@ { "default": { - "enabled": true, + "client": "geth", "provider": "whisper", + "enabled": true, "available_providers": ["whisper"], "connection": { "host": "localhost", - "port": 8546, + "port": 3777, "type": "ws" } } diff --git a/packages/embark-blockchain-process/src/blockchain.js b/packages/embark-blockchain-process/src/blockchain.js new file mode 100644 index 000000000..e84dd14b3 --- /dev/null +++ b/packages/embark-blockchain-process/src/blockchain.js @@ -0,0 +1,446 @@ +import { __ } from 'embark-i18n'; +const fs = require('fs-extra'); +const async = require('async'); +const {spawn, exec} = require('child_process'); +const path = require('path'); +const constants = require('embark-core/constants'); +const GethClient = require('./gethClient.js'); +const ParityClient = require('./parityClient.js'); +import { IPC } from 'embark-core'; + +import { compact, dappPath, defaultHost, dockerHostSwap, embarkPath} from 'embark-utils'; +const Logger = require('embark-logger'); + +// time between IPC connection attempts (in ms) +const IPC_CONNECT_INTERVAL = 2000; + +/*eslint complexity: ["error", 50]*/ +var Blockchain = function(userConfig, clientClass) { + this.userConfig = userConfig; + this.env = userConfig.env || 'development'; + this.isDev = userConfig.isDev; + this.onReadyCallback = userConfig.onReadyCallback || (() => {}); + this.onExitCallback = userConfig.onExitCallback; + this.logger = userConfig.logger || new Logger({logLevel: 'debug', context: constants.contexts.blockchain}); // do not pass in events as we don't want any log events emitted + this.events = userConfig.events; + this.isStandalone = userConfig.isStandalone; + this.certOptions = userConfig.certOptions; + + + let defaultWsApi = clientClass.DEFAULTS.WS_API; + if (this.isDev) defaultWsApi = clientClass.DEFAULTS.DEV_WS_API; + + this.config = { + silent: this.userConfig.silent, + client: this.userConfig.client, + ethereumClientBin: this.userConfig.ethereumClientBin || this.userConfig.client, + networkType: this.userConfig.networkType || clientClass.DEFAULTS.NETWORK_TYPE, + networkId: this.userConfig.networkId || clientClass.DEFAULTS.NETWORK_ID, + genesisBlock: this.userConfig.genesisBlock || false, + datadir: this.userConfig.datadir, + mineWhenNeeded: this.userConfig.mineWhenNeeded || false, + rpcHost: dockerHostSwap(this.userConfig.rpcHost) || defaultHost, + rpcPort: this.userConfig.rpcPort || 8545, + rpcCorsDomain: this.userConfig.rpcCorsDomain || false, + rpcApi: this.userConfig.rpcApi || clientClass.DEFAULTS.RPC_API, + port: this.userConfig.port || 30303, + nodiscover: this.userConfig.nodiscover || false, + mine: this.userConfig.mine || false, + account: {}, + whisper: (this.userConfig.whisper !== false), + maxpeers: ((this.userConfig.maxpeers === 0) ? 0 : (this.userConfig.maxpeers || 25)), + bootnodes: this.userConfig.bootnodes || "", + wsRPC: (this.userConfig.wsRPC !== false), + wsHost: dockerHostSwap(this.userConfig.wsHost) || defaultHost, + wsPort: this.userConfig.wsPort || 8546, + wsOrigins: this.userConfig.wsOrigins || false, + wsApi: this.userConfig.wsApi || defaultWsApi, + vmdebug: this.userConfig.vmdebug || false, + targetGasLimit: this.userConfig.targetGasLimit || false, + syncMode: this.userConfig.syncMode || this.userConfig.syncmode, + verbosity: this.userConfig.verbosity + }; + + this.devFunds = null; + + if (this.userConfig.accounts) { + const nodeAccounts = this.userConfig.accounts.find(account => account.nodeAccounts); + if (nodeAccounts) { + this.config.account = { + numAccounts: nodeAccounts.numAddresses || 1, + password: nodeAccounts.password, + balance: nodeAccounts.balance + }; + } + } + + if (this.userConfig === {} || this.userConfig.default || JSON.stringify(this.userConfig) === '{"client":"geth"}') { + if (this.env === 'development') { + this.isDev = true; + } else { + this.config.genesisBlock = embarkPath("templates/boilerplate/config/privatenet/genesis.json"); + } + this.config.datadir = dappPath(".embark/development/datadir"); + this.config.wsOrigins = this.config.wsOrigins || "http://localhost:8000"; + this.config.rpcCorsDomain = this.config.rpcCorsDomain || "http://localhost:8000"; + this.config.targetGasLimit = 8000000; + } + this.config.account.devPassword = path.join(this.config.datadir, "devPassword"); + + const spaceMessage = 'The path for %s in blockchain config contains spaces, please remove them'; + if (this.config.datadir && this.config.datadir.indexOf(' ') > 0) { + this.logger.error(__(spaceMessage, 'datadir')); + process.exit(1); + } + if (this.config.account.password && this.config.account.password.indexOf(' ') > 0) { + this.logger.error(__(spaceMessage, 'accounts.password')); + process.exit(1); + } + if (this.config.genesisBlock && this.config.genesisBlock.indexOf(' ') > 0) { + this.logger.error(__(spaceMessage, 'genesisBlock')); + process.exit(1); + } + this.client = new clientClass({config: this.config, env: this.env, isDev: this.isDev}); + + this.initStandaloneProcess(); +}; + +/** + * Polls for a connection to an IPC server (generally this is set up + * in the Embark process). Once connected, any logs logged to the + * Logger will be shipped off to the IPC server. In the case of `embark + * run`, the BlockchainListener module is listening for these logs. + * + * @returns {void} + */ +Blockchain.prototype.initStandaloneProcess = function () { + if (this.isStandalone) { + let logQueue = []; + + // on every log logged in logger (say that 3x fast), send the log + // to the IPC serve listening (only if we're connected of course) + this.logger.events.on('log', (logLevel, message) => { + if (this.ipc.connected) { + this.ipc.request('blockchain:log', {logLevel, message}); + } else { + logQueue.push({logLevel, message}); + } + }); + + this.ipc = new IPC({ipcRole: 'client'}); + + // Wait for an IPC server to start (ie `embark run`) by polling `.connect()`. + // Do not kill this interval as the IPC server may restart (ie restart + // `embark run` without restarting `embark blockchain`) + setInterval(() => { + if (!this.ipc.connected) { + this.ipc.connect(() => { + if (this.ipc.connected) { + logQueue.forEach(message => { this.ipc.request('blockchain:log', message); }); + logQueue = []; + this.ipc.client.on('process:blockchain:stop', () => { + this.kill(); + process.exit(0); + }); + } + }); + } + }, IPC_CONNECT_INTERVAL); + } +}; + +Blockchain.prototype.runCommand = function (cmd, options, callback) { + this.logger.info(__("running: %s", cmd.underline).green); + if (this.config.silent) { + options.silent = true; + } + return exec(cmd, options, callback); +}; + +Blockchain.prototype.run = function () { + var self = this; + this.logger.info("===============================================================================".magenta); + this.logger.info("===============================================================================".magenta); + this.logger.info(__("Embark Blockchain using %s", self.client.prettyName.underline).magenta); + this.logger.info("===============================================================================".magenta); + this.logger.info("===============================================================================".magenta); + + if (self.client.name === constants.blockchain.clients.geth) this.checkPathLength(); + + let address = ''; + async.waterfall([ + function checkInstallation(next) { + self.isClientInstalled((err) => { + if (err) { + return next({message: err}); + } + next(); + }); + }, + function init(next) { + if (self.isDev) { + return self.initDevChain((err) => { + next(err); + }); + } + return self.initChainAndGetAddress((err, addr) => { + address = addr; + next(err); + }); + }, + function getMainCommand(next) { + self.client.mainCommand(address, function (cmd, args) { + next(null, cmd, args); + }, true); + } + ], function(err, cmd, args) { + if (err) { + self.logger.error(err.message); + return; + } + args = compact(args); + + let full_cmd = cmd + " " + args.join(' '); + self.logger.info(__(">>>>>>>>>>>>>>>>> running: %s", full_cmd.underline).green); + self.child = spawn(cmd, args, {cwd: process.cwd()}); + + self.child.on('error', (err) => { + err = err.toString(); + self.logger.error('Blockchain error: ', err); + if (self.env === 'development' && err.indexOf('Failed to unlock') > 0) { + self.logger.error('\n' + __('Development blockchain has changed to use the --dev option.').yellow); + self.logger.error(__('You can reset your workspace to fix the problem with').yellow + ' embark reset'.cyan); + self.logger.error(__('Otherwise, you can change your data directory in blockchain.json (datadir)').yellow); + } + }); + + // TOCHECK I don't understand why stderr and stdout are reverted. + // This happens with Geth and Parity, so it does not seems a client problem + self.child.stdout.on('data', (data) => { + self.logger.info(`${self.client.name} error: ${data}`); + }); + + self.child.stderr.on('data', async (data) => { + data = data.toString(); + if (!self.readyCalled && self.client.isReady(data)) { + self.readyCalled = true; + self.readyCallback(); + } + self.logger.info(`${self.client.name}: ${data}`); + }); + + self.child.on('exit', (code) => { + let strCode; + if (code) { + strCode = 'with error code ' + code; + } else { + strCode = 'with no error code (manually killed?)'; + } + self.logger.error(self.client.name + ' exited ' + strCode); + if (self.onExitCallback) { + self.onExitCallback(); + } + }); + + self.child.on('uncaughtException', (err) => { + self.logger.error('Uncaught ' + self.client.name + ' exception', err); + if (self.onExitCallback) { + self.onExitCallback(); + } + }); + }); +}; + +Blockchain.prototype.readyCallback = function () { + if (this.onReadyCallback) { + this.onReadyCallback(); + } + if (this.config.mineWhenNeeded && !this.isDev) { + this.miner = this.client.getMiner(); + } +}; + +Blockchain.prototype.kill = function () { + if (this.child) { + this.child.kill(); + } +}; + +Blockchain.prototype.checkPathLength = function () { + let _dappPath = dappPath(''); + if (_dappPath.length > 66) { + // this.logger.error is captured and sent to the console output regardless of silent setting + this.logger.error("===============================================================================".yellow); + this.logger.error("===========> ".yellow + __('WARNING! ÐApp path length is too long: ').yellow + _dappPath.yellow); + this.logger.error("===========> ".yellow + __('This is known to cause issues with starting geth, please consider reducing your ÐApp path\'s length to 66 characters or less.').yellow); + this.logger.error("===============================================================================".yellow); + } +}; + +Blockchain.prototype.isClientInstalled = function (callback) { + let versionCmd = this.client.determineVersionCommand(); + this.runCommand(versionCmd, {}, (err, stdout, stderr) => { + if (err || !stdout || stderr.indexOf("not found") >= 0 || stdout.indexOf("not found") >= 0) { + return callback(__('Ethereum client bin not found:') + ' ' + this.client.getBinaryPath()); + } + const parsedVersion = this.client.parseVersion(stdout); + const supported = this.client.isSupportedVersion(parsedVersion); + if (supported === undefined) { + this.logger.error((__('WARNING! Ethereum client version could not be determined or compared with version range') + ' ' + this.client.versSupported + __(', for best results please use a supported version')).yellow); + } else if (!supported) { + this.logger.error((__('WARNING! Ethereum client version unsupported, for best results please use a version in range') + ' ' + this.client.versSupported).yellow); + } + callback(); + }); +}; + +Blockchain.prototype.initDevChain = function(callback) { + const self = this; + const ACCOUNTS_ALREADY_PRESENT = 'accounts_already_present'; + // Init the dev chain + self.client.initDevChain(self.config.datadir, (err) => { + if (err) { + return callback(err); + } + + const accountsToCreate = self.config.account && self.config.account.numAccounts; + if (!accountsToCreate) return callback(); + + // Create other accounts + async.waterfall([ + function listAccounts(next) { + self.runCommand(self.client.listAccountsCommand(), {}, (err, stdout, _stderr) => { + if (err || stdout === undefined || stdout.indexOf("Fatal") >= 0) { + console.log(__("no accounts found").green); + return next(); + } + // List current addresses + self.config.unlockAddressList = self.client.parseListAccountsCommandResultToAddressList(stdout); + // Count current addresses and remove the default account from the count (because password can be different) + let addressCount = self.config.unlockAddressList.length; + if (addressCount < accountsToCreate) { + next(null, accountsToCreate - addressCount); + } else { + next(ACCOUNTS_ALREADY_PRESENT); + } + }); + }, + function newAccounts(accountsToCreate, next) { + var accountNumber = 0; + async.whilst( + function() { + return accountNumber < accountsToCreate; + }, + function(callback) { + accountNumber++; + self.runCommand(self.client.newAccountCommand(), {}, (err, stdout, _stderr) => { + if (err) { + return callback(err, accountNumber); + } + self.config.unlockAddressList.push(self.client.parseNewAccountCommandResultToAddress(stdout)); + callback(null, accountNumber); + }); + }, + function(err) { + next(err); + } + ); + } + ], (err) => { + if (err && err !== ACCOUNTS_ALREADY_PRESENT) { + console.log(err); + return callback(err); + } + callback(); + }); + }); +}; + +Blockchain.prototype.initChainAndGetAddress = function (callback) { + const self = this; + let address = null; + const ALREADY_INITIALIZED = 'already'; + + // ensure datadir exists, bypassing the interactive liabilities prompt. + self.datadir = self.config.datadir; + + async.waterfall([ + function makeDir(next) { + fs.mkdirp(self.datadir, (err, _result) => { + next(err); + }); + }, + function listAccounts(next) { + self.runCommand(self.client.listAccountsCommand(), {}, (err, stdout, _stderr) => { + if (err || stdout === undefined || stdout.indexOf("Fatal") >= 0) { + self.logger.info(__("no accounts found").green); + return next(); + } + let firstAccountFound = self.client.parseListAccountsCommandResultToAddress(stdout); + if (firstAccountFound === undefined || firstAccountFound === "") { + console.log(__("no accounts found").green); + return next(); + } + self.logger.info(__("already initialized").green); + address = firstAccountFound; + next(ALREADY_INITIALIZED); + }); + }, + function genesisBlock(next) { + //There's no genesis init with Parity. Custom network are set in the chain property at startup + if (!self.config.genesisBlock || self.client.name === constants.blockchain.clients.parity) { + return next(); + } + self.logger.info(__("initializing genesis block").green); + self.runCommand(self.client.initGenesisCommmand(), {}, (err, _stdout, _stderr) => { + next(err); + }); + }, + function newAccount(next) { + self.runCommand(self.client.newAccountCommand(), {}, (err, stdout, _stderr) => { + if (err) { + return next(err); + } + address = self.client.parseNewAccountCommandResultToAddress(stdout); + next(); + }); + } + ], (err) => { + if (err === ALREADY_INITIALIZED) { + err = null; + } + callback(err, address); + }); +}; + +export function BlockchainClient(userConfig, options) { + if ((userConfig === {} || JSON.stringify(userConfig) === '{"enabled":true}') && options.env !== 'development') { + options.logger.info("===> " + __("warning: running default config on a non-development environment")); + } + // if client is not set in preferences, default is geth + if (!userConfig.client) userConfig.client = constants.blockchain.clients.geth; + // if clientName is set, it overrides preferences + if (options.clientName) userConfig.client = options.clientName; + // Choose correct client instance based on clientName + let clientClass; + switch (userConfig.client) { + case constants.blockchain.clients.geth: + clientClass = GethClient; + break; + + case constants.blockchain.clients.parity: + clientClass = ParityClient; + break; + default: + console.error(__('Unknown client "%s". Please use one of the following: %s', userConfig.client, Object.keys(constants.blockchain.clients).join(', '))); + process.exit(1); + } + userConfig.isDev = (userConfig.isDev || userConfig.default); + userConfig.env = options.env; + userConfig.onReadyCallback = options.onReadyCallback; + userConfig.onExitCallback = options.onExitCallback; + userConfig.logger = options.logger; + userConfig.certOptions = options.certOptions; + userConfig.isStandalone = options.isStandalone; + return new Blockchain(userConfig, clientClass); +} diff --git a/packages/embark/src/cmd/cmd_controller.js b/packages/embark/src/cmd/cmd_controller.js index dde27ae05..7f935eb8c 100644 --- a/packages/embark/src/cmd/cmd_controller.js +++ b/packages/embark/src/cmd/cmd_controller.js @@ -154,6 +154,7 @@ simulator(_options) { // TODO: replace with individual plugins engine.registerModuleGroup("namesystem"); + engine.registerModuleGroup("communication"); engine.registerModuleGroup("blockchain"); engine.registerModuleGroup("compiler"); engine.registerModuleGroup("contracts"); @@ -161,7 +162,6 @@ simulator(_options) { engine.registerModuleGroup("webserver"); engine.registerModuleGroup("filewatcher"); engine.registerModuleGroup("storage"); - engine.registerModuleGroup("communication"); engine.registerModuleGroup("cockpit"); engine.registerModulePackage('embark-deploy-tracker', {plugins: engine.plugins}); @@ -177,7 +177,6 @@ simulator(_options) { } catch (e) { return cb(e); } - cb(); }); @@ -748,6 +747,12 @@ simulator(_options) { plugin.registerActionForEvent("embark:engine:started", async (_params, cb) => { try { await engine.events.request2("blockchain:node:start", engine.config.blockchainConfig); + + await Promise.all([ + engine.events.request2("storage:node:start", engine.config.storageConfig), + engine.events.request2("communication:node:start", engine.config.communicationConfig), + engine.events.request2("namesystem:node:start", engine.config.namesystemConfig) + ]); } catch (e) { return cb(e); } diff --git a/packages/embark/src/lib/modules/geth/blockchain.js b/packages/embark/src/lib/modules/geth/blockchain.js index 813255414..4226bb09d 100644 --- a/packages/embark/src/lib/modules/geth/blockchain.js +++ b/packages/embark/src/lib/modules/geth/blockchain.js @@ -5,6 +5,7 @@ const {spawn, exec} = require('child_process'); const path = require('path'); const constants = require('embark-core/constants'); const GethClient = require('./gethClient.js'); +const WhisperGethClient = require('./whisperClient.js'); // const ParityClient = require('./parityClient.js'); import { IPC } from 'embark-core'; @@ -15,7 +16,7 @@ const Logger = require('embark-logger'); const IPC_CONNECT_INTERVAL = 2000; /*eslint complexity: ["error", 50]*/ -var Blockchain = function(userConfig, clientClass) { +var Blockchain = function(userConfig, clientClass, communicationConfig) { this.userConfig = userConfig; this.env = userConfig.env || 'development'; this.isDev = userConfig.isDev; @@ -100,7 +101,7 @@ var Blockchain = function(userConfig, clientClass) { this.logger.error(__(spaceMessage, 'genesisBlock')); process.exit(1); } - this.client = new clientClass({config: this.config, env: this.env, isDev: this.isDev}); + this.client = new clientClass({config: this.config, env: this.env, isDev: this.isDev, communicationConfig: communicationConfig}); if (this.isStandalone) { this.initStandaloneProcess(); @@ -203,7 +204,7 @@ Blockchain.prototype.run = function () { args = compact(args); let full_cmd = cmd + " " + args.join(' '); - self.logger.info(__("running: %s", full_cmd.underline).green); + self.logger.info(__(">>>>>>>>>>>>>>>> running: %s", full_cmd.underline).green); self.child = spawn(cmd, args, {cwd: process.cwd()}); self.child.on('error', (err) => { @@ -415,7 +416,7 @@ Blockchain.prototype.initChainAndGetAddress = function (callback) { }); }; -export function BlockchainClient(userConfig, options) { +export function BlockchainClient(userConfig, options, communicationConfig) { if ((userConfig === {} || JSON.stringify(userConfig) === '{"enabled":true}') && options.env !== 'development') { options.logger.info("===> " + __("warning: running default config on a non-development environment")); } @@ -425,18 +426,13 @@ export function BlockchainClient(userConfig, options) { if (options.clientName) userConfig.client = options.clientName; // Choose correct client instance based on clientName let clientClass; - switch (userConfig.client) { - case constants.blockchain.clients.geth: - clientClass = GethClient; - break; - // case constants.blockchain.clients.parity: - // clientClass = ParityClient; - // break; - default: - console.error(__('Unknown client "%s". Please use one of the following: %s', userConfig.client, Object.keys(constants.blockchain.clients).join(', '))); - process.exit(1); + if (communicationConfig) { + clientClass = WhisperGethClient + } else { + clientClass = GethClient; } + userConfig.isDev = (userConfig.isDev || userConfig.default); userConfig.env = options.env; userConfig.onReadyCallback = options.onReadyCallback; @@ -444,5 +440,5 @@ export function BlockchainClient(userConfig, options) { userConfig.logger = options.logger; userConfig.certOptions = options.certOptions; userConfig.isStandalone = options.isStandalone; - return new Blockchain(userConfig, clientClass); + return new Blockchain(userConfig, clientClass, communicationConfig); } diff --git a/packages/embark/src/lib/modules/geth/blockchainProcess.js b/packages/embark/src/lib/modules/geth/blockchainProcess.js index 75b82623c..968bb5eb6 100644 --- a/packages/embark/src/lib/modules/geth/blockchainProcess.js +++ b/packages/embark/src/lib/modules/geth/blockchainProcess.js @@ -9,6 +9,7 @@ class BlockchainProcess extends ProcessWrapper { constructor(options) { super(); this.blockchainConfig = options.blockchainConfig; + this.communicationConfig = options.communicationConfig; this.client = options.client; this.env = options.env; this.isDev = options.isDev; @@ -26,7 +27,8 @@ class BlockchainProcess extends ProcessWrapper { onReadyCallback: this.blockchainReady.bind(this), onExitCallback: this.blockchainExit.bind(this), logger: console - } + }, + this.communicationConfig ); this.blockchain.run(); diff --git a/packages/embark/src/lib/modules/geth/blockchainProcessLauncher.js b/packages/embark/src/lib/modules/geth/blockchainProcessLauncher.js index 390128655..8d1c3f6e1 100644 --- a/packages/embark/src/lib/modules/geth/blockchainProcessLauncher.js +++ b/packages/embark/src/lib/modules/geth/blockchainProcessLauncher.js @@ -10,6 +10,7 @@ export class BlockchainProcessLauncher { this.logger = options.logger; this.normalizeInput = options.normalizeInput; this.blockchainConfig = options.blockchainConfig; + this.communicationConfig = options.communicationConfig; this.locale = options.locale; this.isDev = options.isDev; this.client = options.client; @@ -35,6 +36,7 @@ export class BlockchainProcessLauncher { this.blockchainProcess.send({ action: constants.blockchain.init, options: { blockchainConfig: this.blockchainConfig, + communicationConfig: this.communicationConfig, client: this.client, env: this.env, isDev: this.isDev, diff --git a/packages/embark/src/lib/modules/geth/index.js b/packages/embark/src/lib/modules/geth/index.js index ead50546a..4d7db631f 100644 --- a/packages/embark/src/lib/modules/geth/index.js +++ b/packages/embark/src/lib/modules/geth/index.js @@ -11,6 +11,7 @@ class Geth { this.embark = embark; this.embarkConfig = embark.config.embarkConfig; this.blockchainConfig = embark.config.blockchainConfig; + this.communicationConfig = embark.config.communicationConfig; this.locale = options.locale; this.logger = embark.logger; this.client = options.client; @@ -25,7 +26,6 @@ class Geth { this.events.request("blockchain:node:register", constants.blockchain.clients.geth, (readyCb) => { this.events.request('processes:register', 'blockchain', { launchFn: (cb) => { - // this.startBlockchainNode(readyCb); this.startBlockchainNode(cb); }, stopFn: (cb) => { @@ -40,6 +40,24 @@ class Geth { }); this.registerServiceCheck(); }); + + this.events.request("whisper:node:register", constants.blockchain.clients.geth, readyCb => { + this.events.request('processes:register', 'communication', { + launchFn: cb => { + this.startWhisperNode(cb); + }, + stopFn: cb => { + this.stopWhisperNode(cb); + } + }); + + this.events.request("processes:launch", "communication", (err) => { + if (err) { + this.logger.error(`Error launching whisper process: ${err.message || err}`); + } + readyCb(); + }); + }); } shouldInit() { @@ -96,6 +114,31 @@ class Geth { this.blockchainProcess.startBlockchainNode(callback); } + startWhisperNode(callback) { + this.whisperProcess = new BlockchainProcessLauncher({ + events: this.events, + logger: this.logger, + normalizeInput, + blockchainConfig: this.blockchainConfig, + communicationConfig: this.communicationConfig, + locale: this.locale, + client: this.client, + isDev: this.isDev, + embark: this.embark + }); + this.whisperProcess.startBlockchainNode(callback); + } + + stopWhisperNode(cb) { + if (!this.whisperProcess) { + return cb(); + } + this.whisperProcess.stopBlockchainNode(() => { + this.logger.info(`The whisper process has been stopped.`); + cb(); + }); + } + stopBlockchainNode(cb) { const message = __(`The blockchain process has been stopped. It can be restarted by running ${"service blockchain on".bold} in the Embark console.`); diff --git a/packages/embark/src/lib/modules/geth/whisperClient.js b/packages/embark/src/lib/modules/geth/whisperClient.js new file mode 100644 index 000000000..3ec4f9e58 --- /dev/null +++ b/packages/embark/src/lib/modules/geth/whisperClient.js @@ -0,0 +1,261 @@ +import { __ } from 'embark-i18n'; +import { dappPath, ipcPath } from 'embark-utils'; +const async = require('async'); +const {exec, spawn} = require('child_process'); +const path = require('path'); +const GethMiner = require('./miner'); +const semver = require('semver'); +const constants = require('embark-core/constants'); + +const DEFAULTS = { + "BIN": "geth", + "VERSIONS_SUPPORTED": ">=1.8.14", + "NETWORK_TYPE": "custom", + "NETWORK_ID": 1337, + "RPC_API": ['eth', 'web3', 'net', 'debug', 'personal'], + "WS_API": ['eth', 'web3', 'net', 'shh', 'debug', 'pubsub', 'personal'], + "DEV_WS_API": ['eth', 'web3', 'net', 'shh', 'debug', 'pubsub', 'personal'], + "TARGET_GAS_LIMIT": 8000000 +}; + +class WhisperGethClient { + + static get DEFAULTS() { + return DEFAULTS; + } + + constructor(options) { + this.config = options && options.hasOwnProperty('config') ? options.config : {}; + this.communicationConfig = options.communicationConfig; + this.env = options && options.hasOwnProperty('env') ? options.env : 'development'; + this.isDev = options && options.hasOwnProperty('isDev') ? options.isDev : (this.env === 'development'); + this.name = constants.blockchain.clients.geth; + this.prettyName = "Go-Ethereum (https://github.com/ethereum/go-ethereum)"; + this.bin = this.config.ethereumClientBin || DEFAULTS.BIN; + this.versSupported = DEFAULTS.VERSIONS_SUPPORTED; + this.httpReady = false; + this.wsReady = !this.config.wsRPC; + } + + isReady(data) { + if (data.indexOf('WebSocket endpoint opened') > -1) { + this.wsReady = true; + } + return this.wsReady; + } + + /** + * Check if the client needs some sort of 'keep alive' transactions to avoid freezing by inactivity + * @returns {boolean} if keep alive is needed + */ + needKeepAlive() { + return false; + } + + commonOptions() { + return []; + } + + getMiner() { + return new GethMiner({datadir: this.config.datadir}); + } + + getBinaryPath() { + return this.bin; + } + + determineVersionCommand() { + return this.bin + " version"; + } + + parseVersion(rawVersionOutput) { + let parsed; + const match = rawVersionOutput.match(/Version: (.*)/); + if (match) { + parsed = match[1].trim(); + } + return parsed; + } + + isSupportedVersion(parsedVersion) { + let test; + try { + let v = semver(parsedVersion); + v = `${v.major}.${v.minor}.${v.patch}`; + test = semver.Range(this.versSupported).test(semver(v)); + if (typeof test !== 'boolean') { + test = undefined; + } + } finally { + // eslint-disable-next-line no-unsafe-finally + return test; + } + } + + determineNetworkType(config) { + let cmd; + if (config.networkType === 'testnet') { + cmd = "--testnet"; + } else if (config.networkType === 'rinkeby') { + cmd = "--rinkeby"; + } else if (config.networkType === 'custom') { + cmd = "--networkid=" + config.networkId; + } + return cmd; + } + + runAsArchival(config) { + return config.networkId === 1337 || config.archivalMode; + } + + initGenesisCommmand() { + return ""; + } + + newAccountCommand() { + return ""; + } + + parseNewAccountCommandResultToAddress(data = "") { + if (data.match(/{(\w+)}/)) return "0x" + data.match(/{(\w+)}/)[1]; + return ""; + } + + listAccountsCommand() { + return ""; + } + + parseListAccountsCommandResultToAddress(data = "") { + if (data.match(/{(\w+)}/)) return "0x" + data.match(/{(\w+)}/)[1]; + return ""; + } + + parseListAccountsCommandResultToAddressList(data = "") { + const regex = RegExp(/{(\w+)}/g); + let match; + const accounts = []; + while ((match = regex.exec(data)) !== null) { + accounts.push('0x' + match[1]); + } + return accounts; + } + + parseListAccountsCommandResultToAddressCount(data = "") { + return 0; + } + + determineRpcOptions(config) { + let cmd = []; + cmd.push("--port=30304"); + cmd.push("--rpc"); + cmd.push("--rpcport=9998"); + cmd.push("--rpcaddr=" + config.rpcHost); + + if (config.rpcCorsDomain) { + if (config.rpcCorsDomain === '*') { + console.warn('=================================='); + console.warn(__('rpcCorsDomain set to *')); + console.warn(__('make sure you know what you are doing')); + console.warn('=================================='); + } + cmd.push("--rpccorsdomain=" + config.rpcCorsDomain); + } else { + console.warn('=================================='); + console.warn(__('warning: cors is not set')); + console.warn('=================================='); + } + return cmd; + } + + determineWsOptions(config, communicationConfig) { + let cmd = []; + if (config.wsRPC) { + cmd.push("--ws"); + + cmd.push(`--wsport=${communicationConfig.connection.port || config.wsPost++}`); + cmd.push(`--wsaddr=${communicationConfig.connection.host || config.wsHost++}`); + + if (config.wsOrigins) { + if (config.wsOrigins === '*') { + console.warn('=================================='); + console.warn(__('wsOrigins set to *')); + console.warn(__('make sure you know what you are doing')); + console.warn('=================================='); + } + cmd.push("--wsorigins=" + config.wsOrigins); + } else { + console.warn('=================================='); + console.warn(__('warning: wsOrigins is not set')); + console.warn('=================================='); + } + } + return cmd; + } + + initDevChain(datadir, callback) { + callback(); + } + + mainCommand(address, done) { + let self = this; + let config = this.config; + let rpc_api = this.config.rpcApi; + let ws_api = this.config.wsApi; + let args = []; + async.series([ + function commonOptions(callback) { + let cmd = self.commonOptions(); + args = args.concat(cmd); + callback(null, cmd); + }, + function wsOptions(callback) { + let cmd = self.determineWsOptions(self.config, self.communicationConfig); + args = args.concat(cmd); + callback(null, cmd); + }, + function dontGetPeers(callback) { + if (config.nodiscover) { + args.push("--nodiscover"); + return callback(null, "--nodiscover"); + } + callback(null, ""); + }, + function maxPeers(callback) { + let cmd = "--maxpeers=" + config.maxpeers; + args.push(cmd); + callback(null, cmd); + }, + function bootnodes(callback) { + if (config.bootnodes && config.bootnodes !== "" && config.bootnodes !== []) { + args.push("--bootnodes=" + config.bootnodes); + return callback(null, "--bootnodes=" + config.bootnodes); + } + callback(""); + }, + function whisper(callback) { + rpc_api.push('shh'); + if (ws_api.indexOf('shh') === -1) { + ws_api.push('shh'); + } + args.push("--shh"); + return callback(null, "--shh "); + }, + function rpcApi(callback) { + args.push('--rpcapi=' + rpc_api.join(',')); + callback(null, '--rpcapi=' + rpc_api.join(',')); + }, + function wsApi(callback) { + args.push('--wsapi=' + ws_api.join(',')); + callback(null, '--wsapi=' + ws_api.join(',')); + } + ], function(err) { + if (err) { + throw new Error(err.message); + } + return done(self.bin, args); + }); + } +} + +module.exports = WhisperGethClient; + diff --git a/packages/embarkjs/embarkjs-whisper/src/index.js b/packages/embarkjs/embarkjs-whisper/src/index.js index e7e4bf7ac..6f7f35158 100644 --- a/packages/embarkjs/embarkjs-whisper/src/index.js +++ b/packages/embarkjs/embarkjs-whisper/src/index.js @@ -46,7 +46,7 @@ __embarkWhisperNewWeb3.setProvider = function(options) { }); }; -__embarkWhisperNewWeb3.sendMessage = function(options) { +__embarkWhisperNewWeb3.sendMessage = function(options, callback) { const data = options.data || options.payload; if (!data) { throw new Error("missing option: data"); @@ -64,6 +64,7 @@ __embarkWhisperNewWeb3.sendMessage = function(options) { if (err) { throw new Error(err); } + callback(); }); }; diff --git a/packages/plugins/whisper/package.json b/packages/plugins/whisper/package.json index aedfb998f..41d2e398f 100644 --- a/packages/plugins/whisper/package.json +++ b/packages/plugins/whisper/package.json @@ -62,7 +62,8 @@ "npm-run-all": "4.1.5", "rimraf": "3.0.0", "tslint": "5.16.0", - "typescript": "3.4.5" + "typescript": "3.4.5", + "web3": "1.2.1" }, "engines": { "node": ">=8.12.0 <12.0.0", diff --git a/packages/plugins/whisper/src/api.js b/packages/plugins/whisper/src/api.js index f4299bef2..5c498089d 100644 --- a/packages/plugins/whisper/src/api.js +++ b/packages/plugins/whisper/src/api.js @@ -1,98 +1,49 @@ -import {buildUrlFromConfig, canonicalHost, defaultHost} from 'embark-utils'; -const {parallel} = require('async'); -const {fromEvent} = require('rxjs'); -const {map, takeUntil} = require('rxjs/operators'); -const Web3 = require('web3'); - -import whisper from 'embarkjs-whisper'; - -const sendMessage = whisper.real_sendMessage; -const listenTo = whisper.real_listenTo; +import EmbarkJS from 'embarkjs'; +import EmbarkJSWhisper from 'embarkjs-whisper'; class API { constructor(embark) { this.embark = embark; - this.logger = embark.logger; this.communicationConfig = embark.config.communicationConfig; + } - embark.events.on("blockchain:started", this.registerAPICalls.bind(this)); + async initEmbarkJSWhisper() { + if (Object.keys(EmbarkJS.Messages.Providers).includes("whisper")) return; + + EmbarkJS.Messages.registerProvider('whisper', EmbarkJSWhisper); + EmbarkJS.Messages.setProvider('whisper', this.communicationConfig.connection); } async registerAPICalls() { - const self = this; + this.initEmbarkJSWhisper(); + this.registerSendMessageCall(); + this.registerListenToCall(); + } - const connection = this.communicationConfig.connection || {}; - const config = { - host: canonicalHost(connection.host || defaultHost), - port: connection.port || '8546', - type: connection.type || 'ws' - }; - this.web3 = new Web3(buildUrlFromConfig(config)); + async registerSendMessageCall() { + this.embark.registerAPICall( + 'post', + '/embark-api/communication/sendMessage', + (req, res) => { + EmbarkJS.Messages.sendMessage({ topic: req.body.topic, data: req.body.message }, (err, result) => { + if (err) { + return res.status(500).send({ error: err }); + } + res.send(result); + }); + }); + } - if (self.apiCallsRegistered) { - return; - } - self.apiCallsRegistered = true; - let symKeyID, sig; - parallel([ - function(paraCb) { - self.web3.shh.newSymKey((err, id) => { - symKeyID = id; - paraCb(err); + async registerListenToCall() { + this.embark.registerAPICall( + 'ws', + '/embark-api/communication/listenTo/:topic', + (ws, req) => { + EmbarkJS.Messages.listenTo({ topic: req.params.topic }).subscribe(data => { + ws.send(JSON.stringify(data)); }); - }, - function(paraCb) { - self.web3.shh.newKeyPair((err, id) => { - sig = id; - paraCb(err); - }); - } - ], (err) => { - if (err) { - self.logger.error('Error getting Whisper keys:', err.message || err); - return; - } - self.embark.registerAPICall( - 'post', - '/embark-api/communication/sendMessage', - (req, res) => { - sendMessage({ - topic: req.body.topic, - data: req.body.message, - sig, - symKeyID, - fromAscii: self.web3.utils.asciiToHex, - toHex: self.web3.utils.toHex, - post: self.web3.shh.post - }, (err, result) => { - if (err) { - return res.status(500).send({error: err}); - } - res.send(result); - }); - }); - - self.embark.registerAPICall( - 'ws', - '/embark-api/communication/listenTo/:topic', - (ws, req) => { - const obs = listenTo({ - toAscii: self.web3.utils.hexToAscii, - toHex: self.web3.utils.toHex, - topic: req.params.topic, - sig, - subscribe: self.web3.shh.subscribe, - symKeyID - }).pipe(takeUntil(fromEvent(ws, 'close').pipe(map(() => ( - delete self.webSocketsChannels[req.params.topic] - ))))); - self.webSocketsChannels[req.params.topic] = obs; - obs.subscribe(data => { - ws.send(JSON.stringify(data)); - }); - }); - }); + }); } } diff --git a/packages/plugins/whisper/src/index.js b/packages/plugins/whisper/src/index.js index a8cb34048..41b22d47c 100644 --- a/packages/plugins/whisper/src/index.js +++ b/packages/plugins/whisper/src/index.js @@ -15,27 +15,26 @@ class Whisper { this.modulesPath = dappPath(embark.config.embarkConfig.generationDir, constants.dappArtifacts.symlinkDir); this.api = new API(embark); - this.api.registerAPICalls(); + this.whisperNodes = {}; this.events.request("embarkjs:plugin:register", 'messages', 'whisper', 'embarkjs-whisper'); this.events.request("embarkjs:console:register", 'messages', 'whisper', 'embarkjs-whisper'); - // TODO: should launch its own whisper node - // this.events.on("communication:started", this.connectEmbarkJSProvider.bind(this)); - this.events.on("blockchain:started", this.connectEmbarkJSProvider.bind(this)); + this.events.setCommandHandler("whisper:node:register", (clientName, startCb) => { + this.whisperNodes[clientName] = startCb; + }); this.events.request("communication:node:register", "whisper", (readyCb) => { - // TODO: should launch its own whisper node - // this.events.request('processes:register', 'communication', { - // launchFn: (cb) => { - // this.startProcess(cb); - // }, - // stopFn: (cb) => { this.stopProcess(cb); } - // }); - // this.events.request("processes:launch", "communication", (err) => { - readyCb(); - // }); - // this.registerServiceCheck() + let clientName = this.communicationConfig.client || "geth"; + let registerCb = this.whisperNodes[clientName]; + if (!registerCb) return cb("whisper client " + clientName + " not found"); + registerCb.apply(registerCb, [readyCb]); + }); + + this.events.on("communication:started", () => { + this.api = new API(embark); + this.api.registerAPICalls(); + this.connectEmbarkJSProvider() }); }