refactor(@embark/blockchain_process): improve the blockchain proxy

Use proper stream parsing to consistently track JSON-RPC messages.

For HTTP POST requests use the `stream-json` package to assemble request and
response message objects.

For WebSocket requests continue to use `simples/lib/parsers/ws` to process
stream frames into messages. For Websocket responses use the Receiver class of
the `ws` package to process stream data into messages. In both cases, make use
of the `cloneable-readable` and `stream-chain` packages to avoid leaks.

This mishmash of stream parsing approaches is the result of much
experimentation to find a working solution. For example,
`simples/lib/parsers/ws` does't work for processing WebSocket responses and
`ws.Receiver` doesn't work for processing requests. Additional revisions may be
necessary.

Revise `blockchain_process/dev_funds.js` to use web3's HTTP provider if a DApp
disables the WebSocket proxy.
This commit is contained in:
Michael Bradley, Jr 2018-11-25 22:03:44 -06:00 committed by Michael Bradley
parent 160015311e
commit 801932b726
4 changed files with 226 additions and 134 deletions

View File

@ -84,6 +84,7 @@
"chokidar": "2.0.4",
"clipboardy": "1.2.3",
"clone-deep": "4.0.0",
"cloneable-readable": "2.0.0",
"colors": "1.3.2",
"commander": "2.18.0",
"compression-webpack-plugin": "2.0.0",
@ -114,7 +115,7 @@
"hard-source-webpack-plugin": "0.12.0",
"helmet": "3.13.0",
"hosted-git-info": "2.7.1",
"http-proxy": "1.17.0",
"http-proxy-middleware": "0.19.0",
"http-shutdown": "1.2.0",
"i18n": "0.8.3",
"ipfs-api": "17.2.4",
@ -139,6 +140,7 @@
"pkg-up": "2.0.0",
"promptly": "2.2.0",
"propose": "0.0.5",
"pump": "3.0.0",
"react-scripts": "1.1.5",
"remix-debug-debugtest": "0.2.14",
"remix-tests": "0.0.21",
@ -150,6 +152,7 @@
"simples": "0.8.8",
"solc": "0.5.0",
"source-map-support": "0.5.9",
"stream-json": "1.1.3",
"string-replace-async": "1.2.1",
"style-loader": "0.23.1",
"subdir": "0.0.3",
@ -164,7 +167,8 @@
"webpack": "4.19.0",
"webpack-bundle-analyzer": "2.13.1",
"websocket": "1.0.28",
"window-size": "1.1.1"
"window-size": "1.1.1",
"ws": "6.1.2"
},
"devDependencies": {
"@babel/cli": "7.1.2",

View File

@ -12,7 +12,25 @@ class DevFunds {
this.password = this.blockchainConfig.account.password ? readFileSync(dappPath(this.blockchainConfig.account.password), 'utf8').replace('\n', '') : 'dev_password';
this.networkId = null;
this.balance = Web3.utils.toWei("1", "ether");
this.provider = options.provider || new Web3.providers.WebsocketProvider(buildUrl('ws', this.blockchainConfig.wsHost, this.blockchainConfig.wsPort), {headers: {Origin: constants.embarkResourceOrigin}});
if (options.provider) {
this.provider = options.provider;
} else if (this.blockchainConfig.wsRPC) {
this.provider = new Web3.providers.WebsocketProvider(
buildUrl(
'ws',
this.blockchainConfig.wsHost,
this.blockchainConfig.wsPort
),
{headers: {Origin: constants.embarkResourceOrigin}});
} else {
this.provider = new Web3.providers.HttpProvider(
buildUrl(
'http',
this.blockchainConfig.rpcHost,
this.blockchainConfig.rpcPort
)
);
}
this.web3 = new Web3(this.provider);
if (this.blockchainConfig.account.balance) {
this.balance = this.blockchainConfig.account.balance;

View File

@ -1,156 +1,180 @@
const httpProxy = require('http-proxy');
const http = require('http');
const constants = require('../../constants.json');
const utils = require('../../utils/utils');
let commList = {};
let transactions = {};
let receipts = {};
/* global __ exports require */
const Asm = require('stream-json/Assembler');
const {canonicalHost, defaultHost} = require('../../utils/host');
const {chain} = require('stream-chain');
const cloneable = require('cloneable-readable');
const constants = require('../../constants.json');
const express = require('express');
const {parser} = require('stream-json');
const proxyMiddleware = require('http-proxy-middleware');
const pump = require('pump');
const utils = require('../../utils/utils');
const WebSocket = require('ws');
const WsParser = require('simples/lib/parsers/ws');
const parseRequest = function (reqBody) {
let jsonO;
const parseJsonMaybe = (string) => {
let object;
try {
jsonO = JSON.parse(reqBody);
if (string && typeof string === 'string') object = JSON.parse(string);
} catch (e) {
return; // Request is not a json. Do nothing
console.error('Error parsing string as JSON', string);
} finally {
// eslint-disable-next-line no-unsafe-finally
return object;
}
if (jsonO.method === "eth_sendTransaction") {
commList[jsonO.id] = {
};
exports.serve = async (ipc, host, port, ws, origin) => {
const commList = {};
const receipts = {};
const transactions = {};
const trackRequest = (req) => {
if (!req) return;
try {
if (req.method === 'eth_sendTransaction') {
commList[req.id] = {
type: 'contract-log',
address: jsonO.params[0].to,
data: jsonO.params[0].data
address: req.params[0].to,
data: req.params[0].data
};
} else if (jsonO.method === "eth_getTransactionReceipt") {
if (transactions[jsonO.params[0]]) {
transactions[jsonO.params[0]].receiptId = jsonO.id;
receipts[jsonO.id] = transactions[jsonO.params[0]].commListId;
} else if (req.method === 'eth_getTransactionReceipt') {
if (transactions[req.params[0]]) {
transactions[req.params[0]].receiptId = req.id;
receipts[req.id] = transactions[req.params[0]].commListId;
}
}
} catch (e) {
console.error('Error tracking request message', JSON.stringify(req));
}
};
const parseResponse = function (ipc, resBody) {
let jsonO;
const trackResponse = (res) => {
if (!res) return;
try {
jsonO = JSON.parse(resBody);
} catch (e) {
return; // Response is not a json. Do nothing
if (commList[res.id]) {
commList[res.id].transactionHash = res.result;
transactions[res.result] = {commListId: res.id};
} else if (receipts[res.id] && res.result && res.result.blockNumber) {
// TODO find out why commList[receipts[res.id]] is sometimes not defined
if (!commList[receipts[res.id]]) {
commList[receipts[res.id]] = {};
}
if (commList[jsonO.id]) {
commList[jsonO.id].transactionHash = jsonO.result;
transactions[jsonO.result] = {commListId: jsonO.id};
} else if (receipts[jsonO.id] && jsonO.result && jsonO.result.blockNumber) {
// TODO find out why commList[receipts[jsonO.id]] is sometimes not defined
if (!commList[receipts[jsonO.id]]) {
commList[receipts[jsonO.id]] = {};
}
commList[receipts[jsonO.id]].blockNumber = jsonO.result.blockNumber;
commList[receipts[jsonO.id]].gasUsed = jsonO.result.gasUsed;
commList[receipts[jsonO.id]].status = jsonO.result.status;
commList[receipts[res.id]].blockNumber = res.result.blockNumber;
commList[receipts[res.id]].gasUsed = res.result.gasUsed;
commList[receipts[res.id]].status = res.result.status;
if (ipc.connected && !ipc.connecting) {
ipc.request('log', commList[receipts[jsonO.id]]);
ipc.request('log', commList[receipts[res.id]]);
} else {
const message = commList[receipts[jsonO.id]];
const message = commList[receipts[res.id]];
ipc.connecting = true;
ipc.connect(() => {
ipc.connecting = false;
ipc.request('log', message);
});
}
delete transactions[commList[receipts[jsonO.id]].transactionHash];
delete commList[receipts[jsonO.id]];
delete receipts[jsonO.id];
delete transactions[commList[receipts[res.id]].transactionHash];
delete commList[receipts[res.id]];
delete receipts[res.id];
}
} catch (e) {
console.error('Error tracking response message', JSON.stringify(res));
}
};
exports.serve = async function (ipc, host, port, ws, origin) {
const _origin = origin ? origin.split(',')[0] : undefined;
const start = Date.now();
function awaitTarget() {
await (function waitOnTarget() {
return new Promise(resolve => {
utils.pingEndpoint(
canonicalHost(host), port, ws ? 'ws': false, 'http', _origin, async (err) => {
canonicalHost(host),
port,
ws ? 'ws': false,
'http',
origin ? origin.split(',')[0] : undefined,
(err) => {
if (!err || (Date.now() - start > 10000)) {
return resolve();
resolve();
} else {
utils.timer(250).then(waitOnTarget).then(resolve);
}
await utils.timer(250).then(awaitTarget).then(resolve);
}
);
});
}
}());
await awaitTarget();
const proxyOpts = {
logLevel: 'warn',
target: `http://${canonicalHost(host)}:${port}`,
ws: ws,
let proxy = httpProxy.createProxyServer({
target: {
host: canonicalHost(host),
port: port
onError(err, _req, _res) {
console.error(
__('Error forwarding requests to blockchain/simulator'),
err.message
);
},
ws: ws
});
proxy.on('error', function (e) {
console.error(__("Error forwarding requests to blockchain/simulator"), e.message);
onProxyReq(_proxyReq, req, _res) {
if (req.method === 'POST') {
// messages TO the target
Asm.connectTo(chain([
req,
parser()
])).on('done', ({current: object}) => {
trackRequest(object);
});
proxy.on('proxyRes', (proxyRes) => {
let resBody = [];
proxyRes.on('data', (b) => resBody.push(b));
proxyRes.on('end', function () {
resBody = Buffer.concat(resBody).toString();
if (resBody) {
parseResponse(ipc, resBody);
}
});
});
},
let server = http.createServer((req, res) => {
let reqBody = [];
req.on('data', (b) => {
reqBody.push(b);
})
.on('end', () => {
reqBody = Buffer.concat(reqBody).toString();
if (reqBody) {
parseRequest(reqBody);
}
onProxyRes(proxyRes, req, _res) {
if (req.method === 'POST') {
// messages FROM the target
Asm.connectTo(chain([
proxyRes,
parser()
])).on('done', ({current: object}) => {
trackResponse(object);
});
if (!ws) {
proxy.web(req, res);
}
});
}
};
if (ws) {
const WsParser = require('simples/lib/parsers/ws'); // npm install simples
server.on('upgrade', function (req, socket, head) {
proxy.ws(req, socket, head);
});
proxy.on('open', (proxySocket) => {
proxySocket.on('data', (data) => {
parseResponse(ipc, data.toString().substr(data.indexOf("{")));
});
});
proxy.on('proxyReqWs', (proxyReq, req, socket) => {
var parser = new WsParser(0, false);
socket.pipe(parser);
parser.on('frame', function (frame) {
parseRequest(frame.data);
});
proxyOpts.onProxyReqWs = (_proxyReq, _req, socket, _options, _head) => {
// messages TO the target
const wsp = new WsParser(0, false);
wsp.on('frame', ({data: buffer}) => {
const object = parseJsonMaybe(buffer.toString());
trackRequest(object);
});
pump(cloneable(socket), wsp);
};
proxyOpts.onOpen = (proxySocket) => {
// messages FROM the target
const recv = new WebSocket.Receiver();
recv.on('message', (data) => {
const object = parseJsonMaybe(data);
trackResponse(object);
});
pump(cloneable(proxySocket), recv);
};
}
const listenPort = port - constants.blockchain.servicePortOnProxy;
const proxy = proxyMiddleware(proxyOpts);
const app = express();
app.use('*', proxy);
return new Promise(resolve => {
server.listen(listenPort, defaultHost, () => {
resolve(server);
});
const server = app.listen(
port - constants.blockchain.servicePortOnProxy,
defaultHost,
() => { resolve(server); }
);
if (ws) {
server.on('upgrade', proxy.upgrade);
}
});
};

View File

@ -3661,6 +3661,14 @@ clone@^1.0.2:
resolved "https://registry.yarnpkg.com/clone/-/clone-1.0.4.tgz#da309cc263df15994c688ca902179ca3c7cd7c7e"
integrity sha1-2jCcwmPfFZlMaIypAheco8fNfH4=
cloneable-readable@2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/cloneable-readable/-/cloneable-readable-2.0.0.tgz#57b05f40f0dc95f5d6e18992c845f5fcf5b24f51"
integrity sha512-YjptvQM0hnSgrEQJPnGu3MqaK3iZ+37HaVBf5/wHyViQUQOGz5w+C44DllnInlUQjDISTyErf4Xa+RFEF1QB3Q==
dependencies:
inherits "^2.0.1"
readable-stream "^3.0.0"
co@^4.6.0:
version "4.6.0"
resolved "https://registry.yarnpkg.com/co/-/co-4.6.0.tgz#6ea6bdf3d853ae54ccb8e47bfa0bf3f9031fb184"
@ -7496,6 +7504,16 @@ http-parser-js@>=0.4.0:
resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.0.tgz#d65edbede84349d0dc30320815a15d39cc3cbbd8"
integrity sha512-cZdEF7r4gfRIq7ezX9J0T+kQmJNOub71dWbgAXVHDct80TKP4MCETtZQ31xyv38UwgzkWPYF/Xc0ge55dW9Z9w==
http-proxy-middleware@0.19.0:
version "0.19.0"
resolved "https://registry.yarnpkg.com/http-proxy-middleware/-/http-proxy-middleware-0.19.0.tgz#40992b5901dc44bc7bc3795da81b0b248eca02d8"
integrity sha512-Ab/zKDy2B0404mz83bgki0HHv/xqpYKAyFXhopAiJaVAUSJfLYrpBYynTl4ZSUJ7TqrAgjarTsxdX5yBb4unRQ==
dependencies:
http-proxy "^1.17.0"
is-glob "^4.0.0"
lodash "^4.17.10"
micromatch "^3.1.10"
http-proxy-middleware@~0.17.4:
version "0.17.4"
resolved "https://registry.yarnpkg.com/http-proxy-middleware/-/http-proxy-middleware-0.17.4.tgz#642e8848851d66f09d4f124912846dbaeb41b833"
@ -7506,7 +7524,7 @@ http-proxy-middleware@~0.17.4:
lodash "^4.17.2"
micromatch "^2.3.11"
http-proxy@1.17.0, http-proxy@^1.16.2:
http-proxy@^1.16.2, http-proxy@^1.17.0:
version "1.17.0"
resolved "https://registry.yarnpkg.com/http-proxy/-/http-proxy-1.17.0.tgz#7ad38494658f84605e2f6db4436df410f4e5be9a"
integrity sha512-Taqn+3nNvYRfJ3bGvKfBSRwy1v6eePlm3oc/aWVxZp57DQr5Eq3xhKJi7Z4hZpS8PC3H4qI+Yly5EmFacGuA/g==
@ -11707,6 +11725,14 @@ pull-traverse@^1.0.3:
resolved "https://registry.yarnpkg.com/pull-traverse/-/pull-traverse-1.0.3.tgz#74fb5d7be7fa6bd7a78e97933e199b7945866938"
integrity sha1-dPtde+f6a9enjpeTPhmbeUWGaTg=
pump@3.0.0, pump@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/pump/-/pump-3.0.0.tgz#b4a2116815bde2f4e1ea602354e8c75565107a64"
integrity sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==
dependencies:
end-of-stream "^1.1.0"
once "^1.3.1"
pump@^1.0.3:
version "1.0.3"
resolved "https://registry.yarnpkg.com/pump/-/pump-1.0.3.tgz#5dfe8311c33bbf6fc18261f9f34702c47c08a954"
@ -11723,14 +11749,6 @@ pump@^2.0.0, pump@^2.0.1:
end-of-stream "^1.1.0"
once "^1.3.1"
pump@^3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/pump/-/pump-3.0.0.tgz#b4a2116815bde2f4e1ea602354e8c75565107a64"
integrity sha512-LwZy+p3SFs1Pytd/jYct4wpv49HiYCqd9Rlc5ZVdk0V+8Yzv6jR5Blk3TRmPL1ft69TxP0IMZGJ+WPFU2BFhww==
dependencies:
end-of-stream "^1.1.0"
once "^1.3.1"
pumpify@^1.3.3:
version "1.5.1"
resolved "https://registry.yarnpkg.com/pumpify/-/pumpify-1.5.1.tgz#36513be246ab27570b1a374a5ce278bfd74370ce"
@ -12054,6 +12072,15 @@ readable-stream@^1.0.33:
isarray "0.0.1"
string_decoder "~0.10.x"
readable-stream@^3.0.0:
version "3.0.6"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.0.6.tgz#351302e4c68b5abd6a2ed55376a7f9a25be3057a"
integrity sha512-9E1oLoOWfhSXHGv6QlwXJim7uNzd9EVlWK+21tCU9Ju/kR0/p2AZYPz4qSchgO8PlLIH4FpZYfzwS+rEksZjIg==
dependencies:
inherits "^2.0.3"
string_decoder "^1.1.1"
util-deprecate "^1.0.1"
readdirp@^2.0.0:
version "2.2.1"
resolved "https://registry.yarnpkg.com/readdirp/-/readdirp-2.2.1.tgz#0e87622a3325aa33e892285caf8b4e846529a525"
@ -13554,6 +13581,11 @@ stream-browserify@^2.0.1:
inherits "~2.0.1"
readable-stream "^2.0.2"
stream-chain@^2.0.3:
version "2.0.3"
resolved "https://registry.yarnpkg.com/stream-chain/-/stream-chain-2.0.3.tgz#9155237a719c8bb2de883c2d1af66d1546f910c9"
integrity sha512-uCjJTDTAOgBrM2dLkS3mJGICUV3fL0eiTsGQ0bXiKU6et6/7dkTzZCp0eP1d8ZIwiWjthvQZlSx7NxMt7t1aFQ==
stream-each@^1.1.0:
version "1.2.3"
resolved "https://registry.yarnpkg.com/stream-each/-/stream-each-1.2.3.tgz#ebe27a0c389b04fbcc233642952e10731afa9bae"
@ -13573,6 +13605,13 @@ stream-http@^2.7.2:
to-arraybuffer "^1.0.0"
xtend "^4.0.0"
stream-json@1.1.3:
version "1.1.3"
resolved "https://registry.yarnpkg.com/stream-json/-/stream-json-1.1.3.tgz#4ebef75590f3af2a7291726a8e2bb2ce06d2c166"
integrity sha512-y+ChhCov2A5nDqC2aZ6HKXs3OvDlvAp0Ps3BF1P/Iv8tUZJQQsMVaSzk0WryVTVoGITKv01UYahCXMpAs7I0lQ==
dependencies:
stream-chain "^2.0.3"
stream-shift@^1.0.0:
version "1.0.0"
resolved "https://registry.yarnpkg.com/stream-shift/-/stream-shift-1.0.0.tgz#d5c752825e5367e786f78e18e445ea223a155952"
@ -13646,7 +13685,7 @@ string.prototype.trim@~1.1.2:
es-abstract "^1.5.0"
function-bind "^1.0.2"
string_decoder@^1.0.0:
string_decoder@^1.0.0, string_decoder@^1.1.1:
version "1.2.0"
resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-1.2.0.tgz#fe86e738b19544afe70469243b2a1ee9240eae8d"
integrity sha512-6YqyX6ZWEYguAxgZzHGL7SsCeGx3V2TtOTqZz1xSTSWnqsbWwbptafNyvf/ACquZUXV3DANr5BDIwNYe1mN42w==
@ -14665,7 +14704,7 @@ utf8@^2.1.1:
resolved "https://registry.yarnpkg.com/utf8/-/utf8-2.1.2.tgz#1fa0d9270e9be850d9b05027f63519bf46457d96"
integrity sha1-H6DZJw6b6FDZsFAn9jUZv0ZFfZY=
util-deprecate@~1.0.1:
util-deprecate@^1.0.1, util-deprecate@~1.0.1:
version "1.0.2"
resolved "https://registry.yarnpkg.com/util-deprecate/-/util-deprecate-1.0.2.tgz#450d4dc9fa70de732762fbd2d4a28981419a0ccf"
integrity sha1-RQ1Nyfpw3nMnYvvS1KKJgUGaDM8=
@ -15838,6 +15877,13 @@ write@^0.2.1:
dependencies:
mkdirp "^0.5.1"
ws@6.1.2:
version "6.1.2"
resolved "https://registry.yarnpkg.com/ws/-/ws-6.1.2.tgz#3cc7462e98792f0ac679424148903ded3b9c3ad8"
integrity sha512-rfUqzvz0WxmSXtJpPMX2EeASXabOrSMk1ruMOV3JBTBjo4ac2lDjGGsbQSyxj8Odhw5fBib8ZKEjDNvgouNKYw==
dependencies:
async-limiter "~1.0.0"
ws@^3.0.0:
version "3.3.3"
resolved "https://registry.yarnpkg.com/ws/-/ws-3.3.3.tgz#f1cf84fe2d5e901ebce94efaece785f187a228f2"