first stab at refactor blockchain launcher

This commit is contained in:
Iuri Matias 2018-07-19 23:31:28 +03:00 committed by Pascal Precht
parent d8effac4cd
commit db7f3d39d3
No known key found for this signature in database
GPG Key ID: 0EE28D8D6FD85D7D
6 changed files with 512 additions and 49 deletions

416
lib/contracts/blockchain.js Normal file
View File

@ -0,0 +1,416 @@
const Web3 = require('web3');
const async = require('async');
const Provider = require('./provider.js');
const BlockchainProcessLauncher = require('../processes/blockchainProcessLauncher');
const utils = require('../utils/utils');
const constants = require('../constants');
const WEB3_READY = 'web3Ready';
class Blockchain {
constructor(options) {
this.plugins = options.plugins;
this.logger = options.logger;
this.events = options.events;
this.contractsConfig = options.contractsConfig;
this.blockchainConfig = options.blockchainConfig;
this.web3 = options.web3;
this.locale = options.locale;
this.isDev = options.isDev;
this.web3Endpoint = '';
this.isWeb3Ready = false;
this.web3StartedInProcess = false;
this.registerProcessLaunch();
if (!this.web3) {
this.initWeb3();
} else {
this.isWeb3Ready = true;
}
//this.registerServiceCheck();
this.registerRequests();
this.registerWeb3Object();
}
registerProcessLaunch() {
const self = this;
this.events.request('processes:register', 'blockchain', (cb) => {
self.startBlockchainNode(cb);
});
}
initWeb3() {
const self = this;
this.web3 = new Web3();
if (this.contractsConfig.deployment.type !== "rpc" && this.contractsConfig.deployment.type !== "ws") {
const message = __("contracts config error: unknown deployment type %s", this.contractsConfig.deployment.type);
this.logger.error(message);
//return cb(message);
}
const protocol = (this.contractsConfig.deployment.type === "rpc") ? this.contractsConfig.deployment.protocol : 'ws';
this.web3Endpoint = utils.buildUrl(protocol, this.contractsConfig.deployment.host, this.contractsConfig.deployment.port);//`${protocol}://${this.contractsConfig.deployment.host}:${this.contractsConfig.deployment.port}`;
const providerOptions = {
web3: this.web3,
accountsConfig: this.contractsConfig.deployment.accounts,
blockchainConfig: this.blockchainConfig,
logger: this.logger,
isDev: this.isDev,
type: this.contractsConfig.deployment.type,
web3Endpoint: self.web3Endpoint
};
this.provider = new Provider(providerOptions);
self.assertNodeConnection(true, (err) => {
if (!err) {
self.provider.startWeb3Provider(() => {
self.provider.fundAccounts(() => {
self.registerWeb3Object();
self.events.emit("check:backOnline:Ethereum");
});
});
return;
}
self.events.request("processes:launch", "blockchain", () => {
console.dir("======> blockchain launched!");
self.provider.startWeb3Provider(() => {
self.provider.fundAccounts(() => {
self.registerWeb3Object();
self.events.emit("check:backOnline:Ethereum");
});
});
});
});
}
initWeb3_old() {
const self = this;
this.web3 = new Web3();
if (this.contractsConfig.deployment.type !== "rpc" && this.contractsConfig.deployment.type !== "ws") {
const message = __("contracts config error: unknown deployment type %s", this.contractsConfig.deployment.type);
this.logger.error(message);
//return cb(message);
}
const protocol = (this.contractsConfig.deployment.type === "rpc") ? this.contractsConfig.deployment.protocol : 'ws';
this.web3Endpoint = utils.buildUrl(protocol, this.contractsConfig.deployment.host, this.contractsConfig.deployment.port);//`${protocol}://${this.contractsConfig.deployment.host}:${this.contractsConfig.deployment.port}`;
const providerOptions = {
web3: this.web3,
accountsConfig: this.contractsConfig.deployment.accounts,
blockchainConfig: this.blockchainConfig,
logger: this.logger,
isDev: this.isDev,
type: this.contractsConfig.deployment.type,
web3Endpoint: self.web3Endpoint
};
this.provider = new Provider(providerOptions);
async.waterfall([
function checkNode(next) {
self.assertNodeConnection(true, (err) => {
if (err && self.web3StartedInProcess) {
// Already started blockchain in another node, we really have a node problem
self.logger.error(__('Unable to start the blockchain process. Is Geth installed?').red);
return next(err);
}
if (!err) {
self.isWeb3Ready = true;
self.events.emit(WEB3_READY);
// if the ethereum node goes offline, we need a check to ensure
// the provider is also stopped
self.events.on('check:wentOffline:Ethereum', () => {
self.logger.trace('Ethereum went offline: stopping web3 provider...');
self.provider.stop();
// once the node goes back online, we can restart the provider
self.events.once('check:backOnline:Ethereum', () => {
self.logger.trace('Ethereum back online: starting web3 provider...');
self.provider.startWeb3Provider(() => {
self.logger.trace('web3 provider restarted after ethereum node came back online');
});
});
});
return next();
}
self.web3StartedInProcess = true;
//self.startBlockchainNode(() => {
self.events.request("processes:launch", "blockchain", () => {
// Need to re-initialize web3 to connect to the new blockchain node
self.provider.stop();
//self.initWeb3(cb);
});
});
},
function startProvider(next) {
self.provider.startWeb3Provider(next);
},
function fundAccountsIfNeeded(next) {
self.provider.fundAccounts(next);
}
], (err) => {
self.registerWeb3Object();
cb(err);
});
}
onReady(callback) {
if (this.isWeb3Ready) {
return callback();
}
this.events.once(WEB3_READY, () => {
callback();
});
}
startBlockchainNode(callback) {
console.dir("==> startBlockchainNode");
const self = this;
let blockchainProcess = new BlockchainProcessLauncher({
events: self.events,
logger: self.logger,
normalizeInput: utils.normalizeInput,
blockchainConfig: self.blockchainConfig,
locale: self.locale,
isDev: self.isDev
});
blockchainProcess.startBlockchainNode();
self.events.once(constants.blockchain.blockchainReady, () => {
console.dir("==> blockchainReady");
callback();
});
self.events.once(constants.blockchain.blockchainExit, () => {
console.dir("==> blockchainExit");
self.provider.stop();
callback();
});
}
registerServiceCheck() {
const self = this;
const NO_NODE = 'noNode';
this.events.request("services:register", 'Ethereum', function (cb) {
async.waterfall([
function checkNodeConnection(next) {
self.assertNodeConnection(true, (err) => {
if (err) {
return next(NO_NODE, {name: "No Blockchain node found", status: 'off'});
}
next();
});
},
function checkVersion(next) {
// TODO: web3_clientVersion method is currently not implemented in web3.js 1.0
self.web3._requestManager.send({method: 'web3_clientVersion', params: []}, (err, version) => {
if (err) {
return next(null, {name: "Ethereum node (version unknown)", status: 'on'});
}
if (version.indexOf("/") < 0) {
return next(null, {name: version, status: 'on'});
}
let nodeName = version.split("/")[0];
let versionNumber = version.split("/")[1].split("-")[0];
let name = nodeName + " " + versionNumber + " (Ethereum)";
return next(null, {name: name, status: 'on'});
});
}
], (err, statusObj) => {
if (err && err !== NO_NODE) {
return cb(err);
}
cb(statusObj);
});
}, 5000, 'off');
}
registerRequests() {
const self = this;
this.events.setCommandHandler("blockchain:defaultAccount:get", function(cb) {
cb(self.defaultAccount());
});
this.events.setCommandHandler("blockchain:defaultAccount:set", function(account, cb) {
self.setDefaultAccount(account);
cb();
});
this.events.setCommandHandler("blockchain:block:byNumber", function(blockNumber, cb) {
self.getBlock(blockNumber, cb);
});
this.events.setCommandHandler("blockchain:gasPrice", function(cb) {
self.getGasPrice(cb);
});
this.events.setCommandHandler("blockchain:contract:create", function(params, cb) {
cb(self.ContractObject(params));
});
}
defaultAccount() {
return this.web3.eth.defaultAccount;
}
setDefaultAccount(account) {
this.web3.eth.defaultAccount = account;
}
getAccounts(cb) {
this.web3.eth.getAccounts(cb);
}
getCode(address, cb) {
this.web3.eth.getCode(address, cb);
}
getBlock(blockNumber, cb) {
this.web3.eth.getBlock(blockNumber, cb);
}
getGasPrice(cb) {
this.web3.eth.getGasPrice(cb);
}
ContractObject(params) {
return new this.web3.eth.Contract(params.abi, params.address);
}
deployContractObject(contractObject, params) {
return contractObject.deploy({arguments: params.arguments, data: params.data});
}
estimateDeployContractGas(deployObject, cb) {
return deployObject.estimateGas().then((gasValue) => {
cb(null, gasValue);
}).catch(cb);
}
deployContractFromObject(deployContractObject, params, cb) {
const self = this;
let hash;
let calledBacked = false;
function callback(err, receipt) {
if (calledBacked) {
return;
}
if (!err && !receipt.contractAddress) {
return; // Not deployed yet. Need to wait
}
if (interval) {
clearInterval(interval);
}
calledBacked = true;
cb(err, receipt);
}
// This interval is there to compensate for the event that sometimes doesn't get triggered when using WebSocket
// FIXME The issue somehow only happens when the blockchain node is started in the same terminal
const interval = setInterval(() => {
if (!hash) {
return; // Wait until we receive the hash
}
self.web3.eth.getTransactionReceipt(hash, (err, receipt) => {
if (!err && !receipt) {
return; // Transaction is not yet complete
}
callback(err, receipt);
});
}, 500);
deployContractObject.send({
from: params.from, gas: params.gas, gasPrice: params.gasPrice
}, function (err, transactionHash) {
if (err) {
return callback(err);
}
hash = transactionHash;
})
.on('receipt', function (receipt) {
if (receipt.contractAddress !== undefined) {
callback(null, receipt);
}
})
.then(function (_contract) {
if (!hash) {
return; // Somehow we didn't get the receipt yet... Interval will catch it
}
self.web3.eth.getTransactionReceipt(hash, callback);
})
.catch(callback);
}
assertNodeConnection(noLogs, cb) {
if (typeof noLogs === 'function') {
cb = noLogs;
noLogs = false;
}
const NO_NODE_ERROR = Error("error connecting to blockchain node");
const self = this;
async.waterfall([
function checkInstance(next) {
if (!self.web3) {
return next(Error("no web3 instance found"));
}
next();
},
function checkProvider(next) {
if (self.web3.currentProvider === undefined) {
return next(NO_NODE_ERROR);
}
next();
},
function pingEndpoint(next) {
if (!self.contractsConfig || !self.contractsConfig.deployment || !self.contractsConfig.deployment.host) {
return next();
}
const {host, port, type, protocol} = self.contractsConfig.deployment;
utils.pingEndpoint(host, port, type, protocol, self.blockchainConfig.wsOrigins.split(',')[0], next);
}
], function (err) {
if (!noLogs && err === NO_NODE_ERROR) {
self.logger.error(("Couldn't connect to an Ethereum node are you sure it's on?").red);
self.logger.info("make sure you have an Ethereum node or simulator running. e.g 'embark blockchain'".magenta);
}
cb(err);
});
}
determineDefaultAccount(cb) {
const self = this;
self.getAccounts(function (err, accounts) {
if (err) {
self.logger.error(err);
return cb(new Error(err));
}
let accountConfig = self.blockchainConfig.account;
let selectedAccount = accountConfig && accountConfig.address;
self.setDefaultAccount(selectedAccount || accounts[0]);
cb();
});
}
registerWeb3Object() {
// doesn't feel quite right, should be a cmd or plugin method
// can just be a command without a callback
this.events.emit("runcode:register", "web3", this.web3);
}
}
module.exports = Blockchain;

View File

@ -36,7 +36,7 @@ class Engine {
this.isDev = this.config && this.config.blockchainConfig && (this.config.blockchainConfig.isDev || this.config.blockchainConfig.default); this.isDev = this.config && this.config.blockchainConfig && (this.config.blockchainConfig.isDev || this.config.blockchainConfig.default);
if (this.interceptLogs || this.interceptLogs === undefined) { if (this.interceptLogs || this.interceptLogs === undefined) {
utils.interceptLogs(console, this.logger); this.doInterceptLogs();
} }
this.ipc = new IPC({logger: this.logger, ipcRole: this.ipcRole}); this.ipc = new IPC({logger: this.logger, ipcRole: this.ipcRole});
@ -91,18 +91,14 @@ class Engine {
} }
processManagerService(_options) { processManagerService(_options) {
const ProcessManager = require('./processes/processManager.js'); const ProcessManager = require('../processes/process_manager.js');
this.processManager = new ProcessManager({ const processManager = new ProcessManager({
events: this.events, events: this.events,
logger: this.logger, logger: this.logger,
plugins: this.plugins plugins: this.plugins
}); });
} }
graphService(_options) {
this.registerModule('graph');
}
pipelineService(_options) { pipelineService(_options) {
const self = this; const self = this;
this.registerModule('pipeline', { this.registerModule('pipeline', {
@ -147,20 +143,30 @@ class Engine {
} }
codeRunnerService(_options) { codeRunnerService(_options) {
const CodeRunner = require('./modules/coderunner/codeRunner.js'); const CodeRunner = require('../coderunner/codeRunner.js');
this.codeRunner = new CodeRunner({ this.codeRunner = new CodeRunner({
config: this.config, config: this.config,
plugins: this.plugins, plugins: this.plugins,
events: this.events, events: this.events,
logger: this.logger, logger: this.logger
ipc: this.ipc
}); });
} }
codeGeneratorService(_options) { codeGeneratorService(_options) {
let self = this; let self = this;
this.registerModule('code_generator', {plugins: self.plugins, env: self.env}); const CodeGenerator = require('../contracts/code_generator.js');
this.codeGenerator = new CodeGenerator({
blockchainConfig: self.config.blockchainConfig,
contractsConfig: self.config.contractsConfig,
plugins: self.plugins,
storageConfig: self.config.storageConfig,
namesystemConfig: self.config.namesystemConfig,
communicationConfig: self.config.communicationConfig,
events: self.events,
env: self.env
});
this.codeGenerator.listenToCommands();
const generateCode = function (modifiedAssets) { const generateCode = function (modifiedAssets) {
self.events.request("code-generator:embarkjs:build", () => { self.events.request("code-generator:embarkjs:build", () => {
@ -204,7 +210,6 @@ class Engine {
self.fileTimeout = setTimeout(() => { self.fileTimeout = setTimeout(() => {
// TODO: still need to redeploy contracts because the original contracts // TODO: still need to redeploy contracts because the original contracts
// config is being corrupted // config is being corrupted
self.config.reloadConfig();
if (fileType === 'asset') { if (fileType === 'asset') {
// Throttle file changes so we re-write only once for all files // Throttle file changes so we re-write only once for all files
self.events.emit('asset-changed', path); self.events.emit('asset-changed', path);
@ -212,6 +217,8 @@ class Engine {
// TODO: for now need to deploy on asset changes as well // TODO: for now need to deploy on asset changes as well
// because the contractsManager config is corrupted after a deploy // because the contractsManager config is corrupted after a deploy
if (fileType === 'contract' || fileType === 'config') { if (fileType === 'contract' || fileType === 'config') {
self.config.reloadConfig();
self.events.request('deploy:contracts', () => {}); self.events.request('deploy:contracts', () => {});
} }
}, 50); }, 50);
@ -248,11 +255,19 @@ class Engine {
wait: options.wait wait: options.wait
}); });
this.registerModule('whisper'); this.registerModule('whisper', {
// TODO: this should not be needed and should be deducted from the config instead
// the eth provider is not necessary the same as the whisper one
web3: this.blockchain.web3
});
} }
libraryManagerService(_options) { libraryManagerService(_options) {
this.registerModule('library_manager'); const LibraryManager = require('../versions/library_manager.js');
this.libraryManager = new LibraryManager({
plugins: this.plugins,
config: this.config
});
} }
testRunnerService(options) { testRunnerService(options) {

View File

@ -11,7 +11,6 @@ function log(eventType, eventName) {
if (['end', 'prefinish', 'error', 'new', 'demo', 'block', 'version'].indexOf(eventName) >= 0) { if (['end', 'prefinish', 'error', 'new', 'demo', 'block', 'version'].indexOf(eventName) >= 0) {
return; return;
} }
// prevents infinite loop when printing events
if (eventType.indexOf("log") >= 0) { if (eventType.indexOf("log") >= 0) {
return; return;
} }

View File

@ -1,3 +1,17 @@
let async = require('async');
const constants = require('./constants');
require('colors');
// Override process.chdir so that we have a partial-implementation PWD for Windows
const realChdir = process.chdir;
process.chdir = (...args) => {
if (!process.env.PWD) {
process.env.PWD = process.cwd();
}
realChdir(...args);
};
let version = require('../package.json').version; let version = require('../package.json').version;
class Embark { class Embark {

View File

@ -1,16 +1,16 @@
let async = require('async'); let async = require('async');
const ContractDeployer = require('./contract_deployer.js'); const utils = require('../utils/utils.js');
const cloneDeep = require('clone-deep'); //require("../utils/debug_util.js")(__filename, async);
class DeployManager { class DeployManager {
constructor(embark, options) { constructor(options) {
const self = this; const self = this;
this.config = embark.config; this.config = options.config;
this.logger = embark.logger; this.logger = options.logger;
this.blockchainConfig = this.config.blockchainConfig; this.blockchainConfig = this.config.blockchainConfig;
this.events = embark.events; this.events = options.events;
this.plugins = options.plugins; this.plugins = options.plugins;
this.blockchain = options.blockchain; this.blockchain = options.blockchain;
this.gasLimit = false; this.gasLimit = false;
@ -18,25 +18,9 @@ class DeployManager {
this.deployOnlyOnConfig = false; this.deployOnlyOnConfig = false;
this.onlyCompile = options.onlyCompile !== undefined ? options.onlyCompile : false; this.onlyCompile = options.onlyCompile !== undefined ? options.onlyCompile : false;
this.contractDeployer = new ContractDeployer({
logger: this.logger,
events: this.events,
plugins: this.plugins
});
this.events.setCommandHandler('deploy:setGasLimit', (gasLimit) => {
self.gasLimit = gasLimit;
});
this.events.setCommandHandler('deploy:contracts', (cb) => { this.events.setCommandHandler('deploy:contracts', (cb) => {
self.deployContracts(cb); self.deployContracts(cb);
}); });
this.events.setCommandHandler('deploy:contracts:test', (cb) => {
self.fatalErrors = true;
self.deployOnlyOnConfig = true;
self.deployContracts(cb);
});
} }
deployAll(done) { deployAll(done) {
@ -116,13 +100,6 @@ class DeployManager {
} }
async.waterfall([ async.waterfall([
function requestBlockchainConnector(callback) {
self.events.request("blockchain:object", (blockchain) => {
self.blockchain = blockchain;
callback();
});
},
function buildContracts(callback) { function buildContracts(callback) {
self.events.request("contracts:build", self.deployOnlyOnConfig, (err) => { self.events.request("contracts:build", self.deployOnlyOnConfig, (err) => {
callback(err); callback(err);
@ -140,9 +117,12 @@ class DeployManager {
// TODO: could be implemented as an event (beforeDeployAll) // TODO: could be implemented as an event (beforeDeployAll)
function checkIsConnectedToBlockchain(callback) { function checkIsConnectedToBlockchain(callback) {
self.blockchain.onReady((err) => { callback();
callback(err); //self.blockchain.onReady(() => {
}); // self.blockchain.assertNodeConnection((err) => {
// callback(err);
// });
//});
}, },
// TODO: this can be done on the fly or as part of the initialization // TODO: this can be done on the fly or as part of the initialization

View File

@ -0,0 +1,39 @@
class ProcessManager {
constructor(options) {
const self = this;
this.logger = options.logger;
this.events = options.events;
this.plugins = options.plugins;
this.processes = {};
self.events.setCommandHandler('processes:register', (name, cb) => {
console.dir("=====> registering " + name);
this.processes[name] = {
state: 'unstarted',
cb: cb
}
});
self.events.setCommandHandler('processes:launch', (name, cb) => {
let process = self.processes[name];
// TODO: should make distinction between starting and running
if (process.state != 'unstarted') {
console.dir("=====> already started " + name);
return cb();
}
console.dir("=====> launching " + name);
process.state = 'starting';
//let pry = require('pryjs');
//eval(pry.it);
process.cb.apply(process.cb, [() => {
process.state = 'running';
console.dir("=====> launched " + name);
cb();
}]);
});
}
}
module.exports = ProcessManager;