From 801932b7267efa37ab20fd5769203deded887c03 Mon Sep 17 00:00:00 2001 From: "Michael Bradley, Jr" Date: Sun, 25 Nov 2018 22:03:44 -0600 Subject: [PATCH] refactor(@embark/blockchain_process): improve the blockchain proxy Use proper stream parsing to consistently track JSON-RPC messages. For HTTP POST requests use the `stream-json` package to assemble request and response message objects. For WebSocket requests continue to use `simples/lib/parsers/ws` to process stream frames into messages. For Websocket responses use the Receiver class of the `ws` package to process stream data into messages. In both cases, make use of the `cloneable-readable` and `stream-chain` packages to avoid leaks. This mishmash of stream parsing approaches is the result of much experimentation to find a working solution. For example, `simples/lib/parsers/ws` does't work for processing WebSocket responses and `ws.Receiver` doesn't work for processing requests. Additional revisions may be necessary. Revise `blockchain_process/dev_funds.js` to use web3's HTTP provider if a DApp disables the WebSocket proxy. --- package.json | 8 +- .../modules/blockchain_process/dev_funds.js | 20 +- src/lib/modules/blockchain_process/proxy.js | 264 ++++++++++-------- yarn.lock | 68 ++++- 4 files changed, 226 insertions(+), 134 deletions(-) diff --git a/package.json b/package.json index 7f3ff18ee..bf268bee6 100644 --- a/package.json +++ b/package.json @@ -84,6 +84,7 @@ "chokidar": "2.0.4", "clipboardy": "1.2.3", "clone-deep": "4.0.0", + "cloneable-readable": "2.0.0", "colors": "1.3.2", "commander": "2.18.0", "compression-webpack-plugin": "2.0.0", @@ -114,7 +115,7 @@ "hard-source-webpack-plugin": "0.12.0", "helmet": "3.13.0", "hosted-git-info": "2.7.1", - "http-proxy": "1.17.0", + "http-proxy-middleware": "0.19.0", "http-shutdown": "1.2.0", "i18n": "0.8.3", "ipfs-api": "17.2.4", @@ -139,6 +140,7 @@ "pkg-up": "2.0.0", "promptly": "2.2.0", "propose": "0.0.5", + "pump": "3.0.0", "react-scripts": "1.1.5", "remix-debug-debugtest": "0.2.14", "remix-tests": "0.0.21", @@ -150,6 +152,7 @@ "simples": "0.8.8", "solc": "0.5.0", "source-map-support": "0.5.9", + "stream-json": "1.1.3", "string-replace-async": "1.2.1", "style-loader": "0.23.1", "subdir": "0.0.3", @@ -164,7 +167,8 @@ "webpack": "4.19.0", "webpack-bundle-analyzer": "2.13.1", "websocket": "1.0.28", - "window-size": "1.1.1" + "window-size": "1.1.1", + "ws": "6.1.2" }, "devDependencies": { "@babel/cli": "7.1.2", diff --git a/src/lib/modules/blockchain_process/dev_funds.js b/src/lib/modules/blockchain_process/dev_funds.js index fb1cb1d7f..c13cca00d 100644 --- a/src/lib/modules/blockchain_process/dev_funds.js +++ b/src/lib/modules/blockchain_process/dev_funds.js @@ -12,7 +12,25 @@ class DevFunds { this.password = this.blockchainConfig.account.password ? readFileSync(dappPath(this.blockchainConfig.account.password), 'utf8').replace('\n', '') : 'dev_password'; this.networkId = null; this.balance = Web3.utils.toWei("1", "ether"); - this.provider = options.provider || new Web3.providers.WebsocketProvider(buildUrl('ws', this.blockchainConfig.wsHost, this.blockchainConfig.wsPort), {headers: {Origin: constants.embarkResourceOrigin}}); + if (options.provider) { + this.provider = options.provider; + } else if (this.blockchainConfig.wsRPC) { + this.provider = new Web3.providers.WebsocketProvider( + buildUrl( + 'ws', + this.blockchainConfig.wsHost, + this.blockchainConfig.wsPort + ), + {headers: {Origin: constants.embarkResourceOrigin}}); + } else { + this.provider = new Web3.providers.HttpProvider( + buildUrl( + 'http', + this.blockchainConfig.rpcHost, + this.blockchainConfig.rpcPort + ) + ); + } this.web3 = new Web3(this.provider); if (this.blockchainConfig.account.balance) { this.balance = this.blockchainConfig.account.balance; diff --git a/src/lib/modules/blockchain_process/proxy.js b/src/lib/modules/blockchain_process/proxy.js index afdfbc1ec..359fc4ef7 100644 --- a/src/lib/modules/blockchain_process/proxy.js +++ b/src/lib/modules/blockchain_process/proxy.js @@ -1,156 +1,180 @@ -const httpProxy = require('http-proxy'); -const http = require('http'); -const constants = require('../../constants.json'); -const utils = require('../../utils/utils'); - -let commList = {}; -let transactions = {}; -let receipts = {}; +/* global __ exports require */ +const Asm = require('stream-json/Assembler'); const {canonicalHost, defaultHost} = require('../../utils/host'); +const {chain} = require('stream-chain'); +const cloneable = require('cloneable-readable'); +const constants = require('../../constants.json'); +const express = require('express'); +const {parser} = require('stream-json'); +const proxyMiddleware = require('http-proxy-middleware'); +const pump = require('pump'); +const utils = require('../../utils/utils'); +const WebSocket = require('ws'); +const WsParser = require('simples/lib/parsers/ws'); -const parseRequest = function (reqBody) { - let jsonO; +const parseJsonMaybe = (string) => { + let object; try { - jsonO = JSON.parse(reqBody); + if (string && typeof string === 'string') object = JSON.parse(string); } catch (e) { - return; // Request is not a json. Do nothing - } - if (jsonO.method === "eth_sendTransaction") { - commList[jsonO.id] = { - type: 'contract-log', - address: jsonO.params[0].to, - data: jsonO.params[0].data - }; - } else if (jsonO.method === "eth_getTransactionReceipt") { - if (transactions[jsonO.params[0]]) { - transactions[jsonO.params[0]].receiptId = jsonO.id; - receipts[jsonO.id] = transactions[jsonO.params[0]].commListId; - } + console.error('Error parsing string as JSON', string); + } finally { + // eslint-disable-next-line no-unsafe-finally + return object; } }; -const parseResponse = function (ipc, resBody) { - let jsonO; - try { - jsonO = JSON.parse(resBody); - } catch (e) { - return; // Response is not a json. Do nothing - } - if (commList[jsonO.id]) { - commList[jsonO.id].transactionHash = jsonO.result; - transactions[jsonO.result] = {commListId: jsonO.id}; - } else if (receipts[jsonO.id] && jsonO.result && jsonO.result.blockNumber) { - // TODO find out why commList[receipts[jsonO.id]] is sometimes not defined - if (!commList[receipts[jsonO.id]]) { - commList[receipts[jsonO.id]] = {}; +exports.serve = async (ipc, host, port, ws, origin) => { + const commList = {}; + const receipts = {}; + const transactions = {}; + + const trackRequest = (req) => { + if (!req) return; + try { + if (req.method === 'eth_sendTransaction') { + commList[req.id] = { + type: 'contract-log', + address: req.params[0].to, + data: req.params[0].data + }; + } else if (req.method === 'eth_getTransactionReceipt') { + if (transactions[req.params[0]]) { + transactions[req.params[0]].receiptId = req.id; + receipts[req.id] = transactions[req.params[0]].commListId; + } + } + } catch (e) { + console.error('Error tracking request message', JSON.stringify(req)); } - commList[receipts[jsonO.id]].blockNumber = jsonO.result.blockNumber; - commList[receipts[jsonO.id]].gasUsed = jsonO.result.gasUsed; - commList[receipts[jsonO.id]].status = jsonO.result.status; + }; - if (ipc.connected && !ipc.connecting) { - ipc.request('log', commList[receipts[jsonO.id]]); - } else { - const message = commList[receipts[jsonO.id]]; - ipc.connecting = true; - ipc.connect(() => { - ipc.connecting = false; - ipc.request('log', message); - }); + const trackResponse = (res) => { + if (!res) return; + try { + if (commList[res.id]) { + commList[res.id].transactionHash = res.result; + transactions[res.result] = {commListId: res.id}; + } else if (receipts[res.id] && res.result && res.result.blockNumber) { + // TODO find out why commList[receipts[res.id]] is sometimes not defined + if (!commList[receipts[res.id]]) { + commList[receipts[res.id]] = {}; + } + commList[receipts[res.id]].blockNumber = res.result.blockNumber; + commList[receipts[res.id]].gasUsed = res.result.gasUsed; + commList[receipts[res.id]].status = res.result.status; + + if (ipc.connected && !ipc.connecting) { + ipc.request('log', commList[receipts[res.id]]); + } else { + const message = commList[receipts[res.id]]; + ipc.connecting = true; + ipc.connect(() => { + ipc.connecting = false; + ipc.request('log', message); + }); + } + delete transactions[commList[receipts[res.id]].transactionHash]; + delete commList[receipts[res.id]]; + delete receipts[res.id]; + } + } catch (e) { + console.error('Error tracking response message', JSON.stringify(res)); } + }; - delete transactions[commList[receipts[jsonO.id]].transactionHash]; - delete commList[receipts[jsonO.id]]; - delete receipts[jsonO.id]; - } -}; - -exports.serve = async function (ipc, host, port, ws, origin) { - const _origin = origin ? origin.split(',')[0] : undefined; const start = Date.now(); - - function awaitTarget() { + await (function waitOnTarget() { return new Promise(resolve => { utils.pingEndpoint( - canonicalHost(host), port, ws ? 'ws': false, 'http', _origin, async (err) => { + canonicalHost(host), + port, + ws ? 'ws': false, + 'http', + origin ? origin.split(',')[0] : undefined, + (err) => { if (!err || (Date.now() - start > 10000)) { - return resolve(); + resolve(); + } else { + utils.timer(250).then(waitOnTarget).then(resolve); } - await utils.timer(250).then(awaitTarget).then(resolve); } ); }); - } + }()); - await awaitTarget(); + const proxyOpts = { + logLevel: 'warn', + target: `http://${canonicalHost(host)}:${port}`, + ws: ws, - let proxy = httpProxy.createProxyServer({ - target: { - host: canonicalHost(host), - port: port + onError(err, _req, _res) { + console.error( + __('Error forwarding requests to blockchain/simulator'), + err.message + ); }, - ws: ws - }); - proxy.on('error', function (e) { - console.error(__("Error forwarding requests to blockchain/simulator"), e.message); - }); - - proxy.on('proxyRes', (proxyRes) => { - let resBody = []; - proxyRes.on('data', (b) => resBody.push(b)); - proxyRes.on('end', function () { - resBody = Buffer.concat(resBody).toString(); - if (resBody) { - parseResponse(ipc, resBody); + onProxyReq(_proxyReq, req, _res) { + if (req.method === 'POST') { + // messages TO the target + Asm.connectTo(chain([ + req, + parser() + ])).on('done', ({current: object}) => { + trackRequest(object); + }); } - }); - }); + }, - let server = http.createServer((req, res) => { - let reqBody = []; - req.on('data', (b) => { - reqBody.push(b); - }) - .on('end', () => { - reqBody = Buffer.concat(reqBody).toString(); - if (reqBody) { - parseRequest(reqBody); - } - }); - - if (!ws) { - proxy.web(req, res); + onProxyRes(proxyRes, req, _res) { + if (req.method === 'POST') { + // messages FROM the target + Asm.connectTo(chain([ + proxyRes, + parser() + ])).on('done', ({current: object}) => { + trackResponse(object); + }); + } } - }); + }; if (ws) { - const WsParser = require('simples/lib/parsers/ws'); // npm install simples - - server.on('upgrade', function (req, socket, head) { - proxy.ws(req, socket, head); - }); - - proxy.on('open', (proxySocket) => { - proxySocket.on('data', (data) => { - parseResponse(ipc, data.toString().substr(data.indexOf("{"))); + proxyOpts.onProxyReqWs = (_proxyReq, _req, socket, _options, _head) => { + // messages TO the target + const wsp = new WsParser(0, false); + wsp.on('frame', ({data: buffer}) => { + const object = parseJsonMaybe(buffer.toString()); + trackRequest(object); }); - }); + pump(cloneable(socket), wsp); + }; - proxy.on('proxyReqWs', (proxyReq, req, socket) => { - var parser = new WsParser(0, false); - socket.pipe(parser); - parser.on('frame', function (frame) { - parseRequest(frame.data); + proxyOpts.onOpen = (proxySocket) => { + // messages FROM the target + const recv = new WebSocket.Receiver(); + recv.on('message', (data) => { + const object = parseJsonMaybe(data); + trackResponse(object); }); - - }); + pump(cloneable(proxySocket), recv); + }; } - const listenPort = port - constants.blockchain.servicePortOnProxy; + + const proxy = proxyMiddleware(proxyOpts); + const app = express(); + app.use('*', proxy); + return new Promise(resolve => { - server.listen(listenPort, defaultHost, () => { - resolve(server); - }); + const server = app.listen( + port - constants.blockchain.servicePortOnProxy, + defaultHost, + () => { resolve(server); } + ); + if (ws) { + server.on('upgrade', proxy.upgrade); + } }); }; diff --git a/yarn.lock b/yarn.lock index 000a7e18b..e814e1bc1 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3661,6 +3661,14 @@ clone@^1.0.2: resolved "https://registry.yarnpkg.com/clone/-/clone-1.0.4.tgz#da309cc263df15994c688ca902179ca3c7cd7c7e" integrity sha1-2jCcwmPfFZlMaIypAheco8fNfH4= +cloneable-readable@2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/cloneable-readable/-/cloneable-readable-2.0.0.tgz#57b05f40f0dc95f5d6e18992c845f5fcf5b24f51" + integrity sha512-YjptvQM0hnSgrEQJPnGu3MqaK3iZ+37HaVBf5/wHyViQUQOGz5w+C44DllnInlUQjDISTyErf4Xa+RFEF1QB3Q== + dependencies: + inherits "^2.0.1" + readable-stream "^3.0.0" + co@^4.6.0: version "4.6.0" resolved "https://registry.yarnpkg.com/co/-/co-4.6.0.tgz#6ea6bdf3d853ae54ccb8e47bfa0bf3f9031fb184" @@ -7496,6 +7504,16 @@ http-parser-js@>=0.4.0: resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.0.tgz#d65edbede84349d0dc30320815a15d39cc3cbbd8" integrity sha512-cZdEF7r4gfRIq7ezX9J0T+kQmJNOub71dWbgAXVHDct80TKP4MCETtZQ31xyv38UwgzkWPYF/Xc0ge55dW9Z9w== +http-proxy-middleware@0.19.0: + version "0.19.0" + resolved "https://registry.yarnpkg.com/http-proxy-middleware/-/http-proxy-middleware-0.19.0.tgz#40992b5901dc44bc7bc3795da81b0b248eca02d8" + integrity sha512-Ab/zKDy2B0404mz83bgki0HHv/xqpYKAyFXhopAiJaVAUSJfLYrpBYynTl4ZSUJ7TqrAgjarTsxdX5yBb4unRQ== + dependencies: + http-proxy "^1.17.0" + is-glob "^4.0.0" + lodash "^4.17.10" + micromatch "^3.1.10" + http-proxy-middleware@~0.17.4: version "0.17.4" resolved "https://registry.yarnpkg.com/http-proxy-middleware/-/http-proxy-middleware-0.17.4.tgz#642e8848851d66f09d4f124912846dbaeb41b833" @@ -7506,7 +7524,7 @@ http-proxy-middleware@~0.17.4: lodash "^4.17.2" micromatch "^2.3.11" -http-proxy@1.17.0, http-proxy@^1.16.2: +http-proxy@^1.16.2, http-proxy@^1.17.0: version "1.17.0" resolved "https://registry.yarnpkg.com/http-proxy/-/http-proxy-1.17.0.tgz#7ad38494658f84605e2f6db4436df410f4e5be9a" integrity sha512-Taqn+3nNvYRfJ3bGvKfBSRwy1v6eePlm3oc/aWVxZp57DQr5Eq3xhKJi7Z4hZpS8PC3H4qI+Yly5EmFacGuA/g== @@ -11707,6 +11725,14 @@ pull-traverse@^1.0.3: resolved "https://registry.yarnpkg.com/pull-traverse/-/pull-traverse-1.0.3.tgz#74fb5d7be7fa6bd7a78e97933e199b7945866938" integrity sha1-dPtde+f6a9enjpeTPhmbeUWGaTg= +pump@3.0.0, pump@^3.0.0: + version "3.0.0" + resolved "https://registry.yarnpkg.com/pump/-/pump-3.0.0.tgz#b4a2116815bde2f4e1ea602354e8c75565107a64" + integrity sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww== + dependencies: + end-of-stream "^1.1.0" + once "^1.3.1" + pump@^1.0.3: version "1.0.3" resolved "https://registry.yarnpkg.com/pump/-/pump-1.0.3.tgz#5dfe8311c33bbf6fc18261f9f34702c47c08a954" @@ -11723,14 +11749,6 @@ pump@^2.0.0, pump@^2.0.1: end-of-stream "^1.1.0" once "^1.3.1" -pump@^3.0.0: - version "3.0.0" - resolved "https://registry.yarnpkg.com/pump/-/pump-3.0.0.tgz#b4a2116815bde2f4e1ea602354e8c75565107a64" - integrity sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww== - dependencies: - end-of-stream "^1.1.0" - once "^1.3.1" - pumpify@^1.3.3: version "1.5.1" resolved "https://registry.yarnpkg.com/pumpify/-/pumpify-1.5.1.tgz#36513be246ab27570b1a374a5ce278bfd74370ce" @@ -12054,6 +12072,15 @@ readable-stream@^1.0.33: isarray "0.0.1" string_decoder "~0.10.x" +readable-stream@^3.0.0: + version "3.0.6" + resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.0.6.tgz#351302e4c68b5abd6a2ed55376a7f9a25be3057a" + integrity sha512-9E1oLoOWfhSXHGv6QlwXJim7uNzd9EVlWK+21tCU9Ju/kR0/p2AZYPz4qSchgO8PlLIH4FpZYfzwS+rEksZjIg== + dependencies: + inherits "^2.0.3" + string_decoder "^1.1.1" + util-deprecate "^1.0.1" + readdirp@^2.0.0: version "2.2.1" resolved "https://registry.yarnpkg.com/readdirp/-/readdirp-2.2.1.tgz#0e87622a3325aa33e892285caf8b4e846529a525" @@ -13554,6 +13581,11 @@ stream-browserify@^2.0.1: inherits "~2.0.1" readable-stream "^2.0.2" +stream-chain@^2.0.3: + version "2.0.3" + resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.0.3.tgz#9155237a719c8bb2de883c2d1af66d1546f910c9" + integrity sha512-uCjJTDTAOgBrM2dLkS3mJGICUV3fL0eiTsGQ0bXiKU6et6/7dkTzZCp0eP1d8ZIwiWjthvQZlSx7NxMt7t1aFQ== + stream-each@^1.1.0: version "1.2.3" resolved "https://registry.yarnpkg.com/stream-each/-/stream-each-1.2.3.tgz#ebe27a0c389b04fbcc233642952e10731afa9bae" @@ -13573,6 +13605,13 @@ stream-http@^2.7.2: to-arraybuffer "^1.0.0" xtend "^4.0.0" +stream-json@1.1.3: + version "1.1.3" + resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.1.3.tgz#4ebef75590f3af2a7291726a8e2bb2ce06d2c166" + integrity sha512-y+ChhCov2A5nDqC2aZ6HKXs3OvDlvAp0Ps3BF1P/Iv8tUZJQQsMVaSzk0WryVTVoGITKv01UYahCXMpAs7I0lQ== + dependencies: + stream-chain "^2.0.3" + stream-shift@^1.0.0: version "1.0.0" resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.0.tgz#d5c752825e5367e786f78e18e445ea223a155952" @@ -13646,7 +13685,7 @@ string.prototype.trim@~1.1.2: es-abstract "^1.5.0" function-bind "^1.0.2" -string_decoder@^1.0.0: +string_decoder@^1.0.0, string_decoder@^1.1.1: version "1.2.0" resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-1.2.0.tgz#fe86e738b19544afe70469243b2a1ee9240eae8d" integrity sha512-6YqyX6ZWEYguAxgZzHGL7SsCeGx3V2TtOTqZz1xSTSWnqsbWwbptafNyvf/ACquZUXV3DANr5BDIwNYe1mN42w== @@ -14665,7 +14704,7 @@ utf8@^2.1.1: resolved "https://registry.yarnpkg.com/utf8/-/utf8-2.1.2.tgz#1fa0d9270e9be850d9b05027f63519bf46457d96" integrity sha1-H6DZJw6b6FDZsFAn9jUZv0ZFfZY= -util-deprecate@~1.0.1: +util-deprecate@^1.0.1, util-deprecate@~1.0.1: version "1.0.2" resolved "https://registry.yarnpkg.com/util-deprecate/-/util-deprecate-1.0.2.tgz#450d4dc9fa70de732762fbd2d4a28981419a0ccf" integrity sha1-RQ1Nyfpw3nMnYvvS1KKJgUGaDM8= @@ -15838,6 +15877,13 @@ write@^0.2.1: dependencies: mkdirp "^0.5.1" +ws@6.1.2: + version "6.1.2" + resolved "https://registry.yarnpkg.com/ws/-/ws-6.1.2.tgz#3cc7462e98792f0ac679424148903ded3b9c3ad8" + integrity sha512-rfUqzvz0WxmSXtJpPMX2EeASXabOrSMk1ruMOV3JBTBjo4ac2lDjGGsbQSyxj8Odhw5fBib8ZKEjDNvgouNKYw== + dependencies: + async-limiter "~1.0.0" + ws@^3.0.0: version "3.3.3" resolved "https://registry.yarnpkg.com/ws/-/ws-3.3.3.tgz#f1cf84fe2d5e901ebce94efaece785f187a228f2"