From 50124435d82371d1f5a787a36f3e785258fbacb8 Mon Sep 17 00:00:00 2001 From: Iuri Matias Date: Fri, 27 Jul 2018 16:54:33 -0400 Subject: [PATCH] move blockchain connection code to its own module --- lib/contracts/provider.js | 144 ------------------ lib/modules/blockchain_connector/index.js | 103 +++++++------ .../blockchain_connector/index.js~HEAD} | 115 +++++++------- lib/modules/blockchain_connector/provider.js | 75 +++++++-- .../blockchain_connector/provider.js~HEAD | 82 ++++++++++ 5 files changed, 262 insertions(+), 257 deletions(-) delete mode 100644 lib/contracts/provider.js rename lib/{contracts/blockchain.js => modules/blockchain_connector/index.js~HEAD} (76%) create mode 100644 lib/modules/blockchain_connector/provider.js~HEAD diff --git a/lib/contracts/provider.js b/lib/contracts/provider.js deleted file mode 100644 index 4d8e3fdc..00000000 --- a/lib/contracts/provider.js +++ /dev/null @@ -1,144 +0,0 @@ -const async = require('async'); -const AccountParser = require('../utils/accountParser'); -const fundAccount = require('./fundAccount'); - -class Provider { - constructor(options) { - this.web3 = options.web3; - this.accountsConfig = options.accountsConfig; - this.blockchainConfig = options.blockchainConfig; - this.type = options.type; - this.web3Endpoint = options.web3Endpoint; - this.logger = options.logger; - this.isDev = options.isDev; - this.engine = new ProviderEngine(); - this.asyncMethods = {}; - } - - startWeb3Provider(callback) { - const self = this; - - if (this.type === 'rpc') { - self.engine.addProvider(new RpcSubprovider({ - rpcUrl: self.web3Endpoint - })); - } else if (this.type === 'ws') { - //self.engine.addProvider(new WsSubprovider({ - console.log('USing ws'); - self.addProvider(new SubscriptionSubprovider()); - self.addProvider(new WsSubprovider({ - rpcUrl: self.web3Endpoint, - origin: this.blockchainConfig.wsOrigins.split(',')[0] - })); - //self.provider = new this.web3.providers.WebsocketProvider(self.web3Endpoint, {headers: {Origin: "embark"}}); - } else { - return callback(__("contracts config error: unknown deployment type %s", this.type)); - } - - - // network connectivity error - self.engine.on('error', (err) => { - // report connectivity errors - self.logger.error(err); - }); - - self.engine.start(); - //self.on('error', (err) => { - // console.log('ERR', JSON.stringify(err)); - // // report connectivity errors as trace due to polling - // self.logger.trace('web3 provider error: ', err); - // self.logger.trace('stopping web3 provider due to error'); - - // // prevent continuous polling errors - // self.stop(); - //}); - - //self.web3.setProvider(self); - //self.start(); - - self.accounts = AccountParser.parseAccountsConfig(self.accountsConfig, self.web3, self.logger); - self.addresses = []; - - if (!self.accounts.length) { - return callback(); - } - self.accounts.forEach(account => { - self.addresses.push(account.address); - self.web3.eth.accounts.wallet.add(account); - }); - - self.realAccountFunction = self.web3.eth.getAccounts; - self.web3.eth.getAccounts = function (cb) { - if (!cb) { - cb = function () { - }; - } - return new Promise((resolve, reject) => { - self.realAccountFunction((err, accounts) => { - if (err) { - return cb(err); - } - accounts = accounts.concat(self.addresses); - // accounts = self.addresses.concat(accounts); - cb(null, accounts); - resolve(accounts); - }); - } - realSend(payload, cb); - }; - - callback(); - } - - stop() { - if (this.provider && this.provider.removeAllListeners) { - this.provider.removeAllListeners('connect'); - this.provider.removeAllListeners('error'); - this.provider.removeAllListeners('end'); - this.provider.removeAllListeners('data'); - this.provider.responseCallbacks = {}; - this.provider = null; - } - } - - fundAccounts(callback) { - const self = this; - if (!self.accounts.length) { - return callback(); - } - if (!self.isDev) { - return callback(); - } - async.each(self.accounts, (account, eachCb) => { - fundAccount(self.web3, account.address, account.hexBalance, eachCb); - }, callback); - } - - stop() { - this.engine.stop(); - } - - eth_accounts(payload, cb) { - return cb(null, this.addresses); - } - - sendAsync(payload, callback) { - let method = this.asyncMethods[payload.method]; - if (method) { - return method.call(method, payload, (err, result) => { - if (err) { - return callback(err); - } - let response = {'id': payload.id, 'jsonrpc': '2.0', 'result': result}; - callback(null, response); - }); - } - this.engine.sendAsync.apply(this.engine, arguments); - } - - send() { - return this.engine.send.apply(this.engine, arguments); - } -} - -module.exports = Provider; diff --git a/lib/modules/blockchain_connector/index.js b/lib/modules/blockchain_connector/index.js index c1e96b24..12edb7a8 100644 --- a/lib/modules/blockchain_connector/index.js +++ b/lib/modules/blockchain_connector/index.js @@ -1,34 +1,28 @@ const Web3 = require('web3'); const async = require('async'); const Provider = require('./provider.js'); -const utils = require('../../utils/utils'); -const constants = require('../../constants'); -const embarkJsUtils = require('embarkjs').Utils; +const utils = require('../utils/utils'); const WEB3_READY = 'web3Ready'; -// TODO: consider another name, this is the blockchain connector -class BlockchainConnector { - constructor(embark, options) { +class Blockchain { + constructor(options) { const self = this; this.plugins = options.plugins; - this.logger = embark.logger; - this.events = embark.events; - this.contractsConfig = embark.config.contractsConfig; - this.blockchainConfig = embark.config.blockchainConfig; + this.logger = options.logger; + this.events = options.events; + this.contractsConfig = options.contractsConfig; + this.blockchainConfig = options.blockchainConfig; this.web3 = options.web3; this.isDev = options.isDev; this.web3Endpoint = ''; this.isWeb3Ready = false; + this.web3StartedInProcess = false; self.events.setCommandHandler("blockchain:web3:isReady", (cb) => { cb(self.isWeb3Ready); }); - self.events.setCommandHandler("blockchain:object", (cb) => { - cb(self); - }); - if (!this.web3) { this.initWeb3(); } else { @@ -38,19 +32,9 @@ class BlockchainConnector { this.registerRequests(); this.registerWeb3Object(); this.registerEvents(); - this.subscribeToPendingTransactions(); } - //initWeb3() { - initWeb3(cb) { - if (!cb) { - cb = function(){}; - } - if (this.isWeb3Ready) { - this.events.emit(WEB3_READY); - return cb(); - } - + initWeb3() { const self = this; this.web3 = new Web3(); @@ -232,14 +216,60 @@ class BlockchainConnector { } deployContractFromObject(deployContractObject, params, cb) { - embarkJsUtils.secureSend(this.web3, deployContractObject, { + const self = this; + let hash; + let calledBacked = false; + + function callback(err, receipt) { + if (calledBacked) { + return; + } + if (!err && !receipt.contractAddress) { + return; // Not deployed yet. Need to wait + } + if (interval) { + clearInterval(interval); + } + calledBacked = true; + cb(err, receipt); + } + + // This interval is there to compensate for the event that sometimes doesn't get triggered when using WebSocket + // FIXME The issue somehow only happens when the blockchain node is started in the same terminal + const interval = setInterval(() => { + if (!hash) { + return; // Wait until we receive the hash + } + self.web3.eth.getTransactionReceipt(hash, (err, receipt) => { + if (!err && !receipt) { + return; // Transaction is not yet complete + } + callback(err, receipt); + }); + }, 500); + + deployContractObject.send({ from: params.from, gas: params.gas, gasPrice: params.gasPrice - }, true, cb); + }, function (err, transactionHash) { + if (err) { + return callback(err); + } + hash = transactionHash; + }).on('receipt', function (receipt) { + if (receipt.contractAddress !== undefined) { + callback(null, receipt); + } + }).then(function (_contract) { + if (!hash) { + return; // Somehow we didn't get the receipt yet... Interval will catch it + } + self.web3.eth.getTransactionReceipt(hash, callback); + }).catch(callback); } determineDefaultAccount(cb) { const self = this; - self.getAccounts(function(err, accounts) { + self.getAccounts(function (err, accounts) { if (err) { self.logger.error(err); return cb(new Error(err)); @@ -256,20 +286,7 @@ class BlockchainConnector { // can just be a command without a callback this.events.emit("runcode:register", "web3", this.web3, false); } - - subscribeToPendingTransactions() { - const self = this; - this.onReady(() => { - if (self.logsSubscription) { - self.logsSubscription.unsubscribe(); - } - self.logsSubscription = self.web3.eth - .subscribe('newBlockHeaders', () => {}) - .on("data", function (blockHeader) { - self.events.emit('block:header', blockHeader); - }); - }); - } } -module.exports = BlockchainConnector; +module.exports = Blockchain; + diff --git a/lib/contracts/blockchain.js b/lib/modules/blockchain_connector/index.js~HEAD similarity index 76% rename from lib/contracts/blockchain.js rename to lib/modules/blockchain_connector/index.js~HEAD index e6eb06b7..9720ec12 100644 --- a/lib/contracts/blockchain.js +++ b/lib/modules/blockchain_connector/index.js~HEAD @@ -1,28 +1,34 @@ const Web3 = require('web3'); const async = require('async'); const Provider = require('./provider.js'); -const utils = require('../utils/utils'); +const utils = require('../../utils/utils'); +const constants = require('../../constants'); +const embarkJsUtils = require('embarkjs').Utils; const WEB3_READY = 'web3Ready'; -class Blockchain { - constructor(options) { +// TODO: consider another name, this is the blockchain connector +class BlockchainConnector { + constructor(embark, options) { const self = this; this.plugins = options.plugins; - this.logger = options.logger; - this.events = options.events; - this.contractsConfig = options.contractsConfig; - this.blockchainConfig = options.blockchainConfig; + this.logger = embark.logger; + this.events = embark.events; + this.contractsConfig = embark.config.contractsConfig; + this.blockchainConfig = embark.config.blockchainConfig; this.web3 = options.web3; this.isDev = options.isDev; this.web3Endpoint = ''; this.isWeb3Ready = false; - this.web3StartedInProcess = false; self.events.setCommandHandler("blockchain:web3:isReady", (cb) => { cb(self.isWeb3Ready); }); + self.events.setCommandHandler("blockchain:object", (cb) => { + cb(self); + }); + if (!this.web3) { this.initWeb3(); } else { @@ -32,9 +38,19 @@ class Blockchain { this.registerRequests(); this.registerWeb3Object(); this.registerEvents(); + this.subscribeToPendingTransactions(); } - initWeb3() { + //initWeb3() { + initWeb3(cb) { + if (!cb) { + cb = function(){}; + } + if (this.isWeb3Ready) { + this.events.emit(WEB3_READY); + return cb(); + } + const self = this; this.web3 = new Web3(); @@ -60,6 +76,18 @@ class Blockchain { self.events.request("processes:launch", "blockchain", () => { self.provider.startWeb3Provider(() => { + this.web3.eth.net.getId() + .then(id => { + let networkId = self.blockchainConfig.networkId; + if (!networkId && constants.blockchain.networkIds[self.blockchainConfig.networkType]) { + networkId = constants.blockchain.networkIds[self.blockchainConfig.networkType]; + } + if (id.toString() !== networkId.toString()) { + self.logger.warn(__('Connected to a blockchain node on network {{realId}} while your config specifies {{configId}}', {realId: id, configId: networkId})); + self.logger.warn(__('Make sure you started the right blockchain node')); + } + }) + .catch(console.error); self.provider.fundAccounts(() => { self.isWeb3Ready = true; self.events.emit(WEB3_READY); @@ -204,60 +232,14 @@ class Blockchain { } deployContractFromObject(deployContractObject, params, cb) { - const self = this; - let hash; - let calledBacked = false; - - function callback(err, receipt) { - if (calledBacked) { - return; - } - if (!err && !receipt.contractAddress) { - return; // Not deployed yet. Need to wait - } - if (interval) { - clearInterval(interval); - } - calledBacked = true; - cb(err, receipt); - } - - // This interval is there to compensate for the event that sometimes doesn't get triggered when using WebSocket - // FIXME The issue somehow only happens when the blockchain node is started in the same terminal - const interval = setInterval(() => { - if (!hash) { - return; // Wait until we receive the hash - } - self.web3.eth.getTransactionReceipt(hash, (err, receipt) => { - if (!err && !receipt) { - return; // Transaction is not yet complete - } - callback(err, receipt); - }); - }, 500); - - deployContractObject.send({ + embarkJsUtils.secureSend(this.web3, deployContractObject, { from: params.from, gas: params.gas, gasPrice: params.gasPrice - }, function (err, transactionHash) { - if (err) { - return callback(err); - } - hash = transactionHash; - }).on('receipt', function (receipt) { - if (receipt.contractAddress !== undefined) { - callback(null, receipt); - } - }).then(function (_contract) { - if (!hash) { - return; // Somehow we didn't get the receipt yet... Interval will catch it - } - self.web3.eth.getTransactionReceipt(hash, callback); - }).catch(callback); + }, true, cb); } determineDefaultAccount(cb) { const self = this; - self.getAccounts(function (err, accounts) { + self.getAccounts(function(err, accounts) { if (err) { self.logger.error(err); return cb(new Error(err)); @@ -274,7 +256,20 @@ class Blockchain { // can just be a command without a callback this.events.emit("runcode:register", "web3", this.web3); } + + subscribeToPendingTransactions() { + const self = this; + this.onReady(() => { + if (self.logsSubscription) { + self.logsSubscription.unsubscribe(); + } + self.logsSubscription = self.web3.eth + .subscribe('newBlockHeaders', () => {}) + .on("data", function (blockHeader) { + self.events.emit('block:header', blockHeader); + }); + }); + } } -module.exports = Blockchain; - +module.exports = BlockchainConnector; diff --git a/lib/modules/blockchain_connector/provider.js b/lib/modules/blockchain_connector/provider.js index 27576c46..478246d6 100644 --- a/lib/modules/blockchain_connector/provider.js +++ b/lib/modules/blockchain_connector/provider.js @@ -11,13 +11,17 @@ class Provider { this.web3Endpoint = options.web3Endpoint; this.logger = options.logger; this.isDev = options.isDev; + this.engine = new ProviderEngine(); + this.asyncMethods = {}; } startWeb3Provider(callback) { const self = this; if (this.type === 'rpc') { - self.provider = new this.web3.providers.HttpProvider(self.web3Endpoint); + self.engine.addProvider(new RpcSubprovider({ + rpcUrl: self.web3Endpoint + })); } else if (this.type === 'ws') { self.provider = new this.web3.providers.WebsocketProvider(self.web3Endpoint, {headers: {Origin: "embark"}}); self.provider.on('error', e => self.logger.error('Websocket Error', e)); @@ -26,7 +30,26 @@ class Provider { return callback(__("contracts config error: unknown deployment type %s", this.type)); } - self.web3.setProvider(self.provider); + + // network connectivity error + self.engine.on('error', (err) => { + // report connectivity errors + self.logger.error(err); + }); + + self.engine.start(); + //self.on('error', (err) => { + // console.log('ERR', JSON.stringify(err)); + // // report connectivity errors as trace due to polling + // self.logger.trace('web3 provider error: ', err); + // self.logger.trace('stopping web3 provider due to error'); + + // // prevent continuous polling errors + // self.stop(); + //}); + + //self.web3.setProvider(self); + //self.start(); self.accounts = AccountParser.parseAccountsConfig(self.accountsConfig, self.web3, self.logger); self.addresses = []; @@ -38,16 +61,22 @@ class Provider { self.addresses.push(account.address); self.web3.eth.accounts.wallet.add(account); }); - self.web3.eth.defaultAccount = self.addresses[0]; - const realSend = self.provider.send.bind(self.provider); - self.provider.send = function (payload, cb) { - if (payload.method === 'eth_accounts') { - return realSend(payload, function (err, result) { + + self.realAccountFunction = self.web3.eth.getAccounts; + self.web3.eth.getAccounts = function (cb) { + if (!cb) { + cb = function () { + }; + } + return new Promise((resolve, reject) => { + self.realAccountFunction((err, accounts) => { if (err) { return cb(err); } - result.result = result.result.concat(self.addresses); - cb(null, result); + accounts = accounts.concat(self.addresses); + // accounts = self.addresses.concat(accounts); + cb(null, accounts); + resolve(accounts); }); } realSend(payload, cb); @@ -75,10 +104,36 @@ class Provider { if (!self.isDev) { return callback(); } - async.eachLimit(self.accounts, 1, (account, eachCb) => { + async.each(self.accounts, (account, eachCb) => { fundAccount(self.web3, account.address, account.hexBalance, eachCb); }, callback); } + + stop() { + this.engine.stop(); + } + + eth_accounts(payload, cb) { + return cb(null, this.addresses); + } + + sendAsync(payload, callback) { + let method = this.asyncMethods[payload.method]; + if (method) { + return method.call(method, payload, (err, result) => { + if (err) { + return callback(err); + } + let response = {'id': payload.id, 'jsonrpc': '2.0', 'result': result}; + callback(null, response); + }); + } + this.engine.sendAsync.apply(this.engine, arguments); + } + + send() { + return this.engine.send.apply(this.engine, arguments); + } } module.exports = Provider; diff --git a/lib/modules/blockchain_connector/provider.js~HEAD b/lib/modules/blockchain_connector/provider.js~HEAD new file mode 100644 index 00000000..59b1bdb9 --- /dev/null +++ b/lib/modules/blockchain_connector/provider.js~HEAD @@ -0,0 +1,82 @@ +const async = require('async'); +const AccountParser = require('../../utils/accountParser'); +const fundAccount = require('./fundAccount'); + +class Provider { + constructor(options) { + this.web3 = options.web3; + this.accountsConfig = options.accountsConfig; + this.blockchainConfig = options.blockchainConfig; + this.type = options.type; + this.web3Endpoint = options.web3Endpoint; + this.logger = options.logger; + this.isDev = options.isDev; + } + + startWeb3Provider(callback) { + const self = this; + + if (this.type === 'rpc') { + self.provider = new this.web3.providers.HttpProvider(self.web3Endpoint); + } else if (this.type === 'ws') { + self.provider = new this.web3.providers.WebsocketProvider(self.web3Endpoint, {headers: {Origin: "embark"}}); + } else { + return callback(__("contracts config error: unknown deployment type %s", this.type)); + } + + self.web3.setProvider(self.provider); + + self.accounts = AccountParser.parseAccountsConfig(self.accountsConfig, self.web3, self.logger); + self.addresses = []; + + if (!self.accounts.length) { + return callback(); + } + self.accounts.forEach(account => { + self.addresses.push(account.address); + self.web3.eth.accounts.wallet.add(account); + }); + self.web3.eth.defaultAccount = self.addresses[0]; + const realSend = self.provider.send.bind(self.provider); + self.provider.send = function (payload, cb) { + if (payload.method === 'eth_accounts') { + return realSend(payload, function (err, result) { + if (err) { + return cb(err); + } + result.result = result.result.concat(self.addresses); + cb(null, result); + }); + } + realSend(payload, cb); + }; + + callback(); + } + + stop() { + if (this.provider && this.provider.removeAllListeners) { + this.provider.removeAllListeners('connect'); + this.provider.removeAllListeners('error'); + this.provider.removeAllListeners('end'); + this.provider.removeAllListeners('data'); + this.provider.responseCallbacks = {}; + this.provider = null; + } + } + + fundAccounts(callback) { + const self = this; + if (!self.accounts.length) { + return callback(); + } + if (!self.isDev) { + return callback(); + } + async.eachLimit(self.accounts, 1, (account, eachCb) => { + fundAccount(self.web3, account.address, account.hexBalance, eachCb); + }, callback); + } +} + +module.exports = Provider;