refactor(@embark/blockchain_process): swallow errors, revise streams

For reasons unknown, `ECONNRESET` errors on websocket connections to embark's
blockchain proxy are not automatically handled on Windows as they are on macOS
and Linux (or those errors aren't happening on those platforms, it's difficult
to determine). Explicitly swallow such errors so the blockchain process doesn't
crash. Prior to this PR, the crash-behavior can be reproduced on Windows by
running `embark blockchain` and `embark run` in separate terminals and quitting
`embark run` while `embark blockchain` is still running.

Consistently use the `simples` package's `WsParser` to process websocket
traffic instead of using `WsParser` for requests and the `ws` package's
`Websocket.Receiver` for responses.

Consistently use `pump` to connect parser streams instead of using `pump` in
some places and `chain` in others. Drop use of `cloneable` (and the package
dependency) since it was used previously in hopes it would fix the errors, but
it's unnecessary and didn't fix them.
This commit is contained in:
Michael Bradley, Jr 2018-12-09 16:02:32 -06:00 committed by Pascal Precht
parent 7d2ceaacb1
commit e738efe15a
3 changed files with 21 additions and 42 deletions

View File

@ -91,7 +91,6 @@
"chokidar": "2.0.4", "chokidar": "2.0.4",
"clipboardy": "1.2.3", "clipboardy": "1.2.3",
"clone-deep": "4.0.0", "clone-deep": "4.0.0",
"cloneable-readable": "2.0.0",
"colors": "1.3.2", "colors": "1.3.2",
"commander": "2.18.0", "commander": "2.18.0",
"css-loader": "1.0.0", "css-loader": "1.0.0",

View File

@ -2,15 +2,12 @@
const Asm = require('stream-json/Assembler'); const Asm = require('stream-json/Assembler');
const {canonicalHost, defaultHost} = require('../../utils/host'); const {canonicalHost, defaultHost} = require('../../utils/host');
const {chain} = require('stream-chain');
const cloneable = require('cloneable-readable');
const constants = require('../../constants.json'); const constants = require('../../constants.json');
const express = require('express'); const express = require('express');
const {parser} = require('stream-json'); const {parser: jsonParser} = require('stream-json');
const proxyMiddleware = require('http-proxy-middleware'); const proxyMiddleware = require('http-proxy-middleware');
const pump = require('pump'); const pump = require('pump');
const utils = require('../../utils/utils'); const utils = require('../../utils/utils');
const WebSocket = require('ws');
const WsParser = require('simples/lib/parsers/ws'); const WsParser = require('simples/lib/parsers/ws');
const hex = (n) => { const hex = (n) => {
@ -21,17 +18,17 @@ const hex = (n) => {
const parseJsonMaybe = (string) => { const parseJsonMaybe = (string) => {
let object; let object;
if (typeof string === 'string') { if (typeof string === 'string') {
// ignore empty strings
if (string) { if (string) {
try { try {
object = JSON.parse(string); object = JSON.parse(string);
} catch(e) { } catch(e) {
// ignore client/server byte sequences sent when connections are closing
if (Array.from(Buffer.from(string)).map(hex).join(':') !== if (Array.from(Buffer.from(string)).map(hex).join(':') !==
'03:ef:bf:bd') { '03:ef:bf:bd') {
console.error(`Proxy: Error parsing string as JSON '${string}'`); console.error(`Proxy: Error parsing string as JSON '${string}'`);
} }
} }
} else {
console.error('Proxy: Expected a non-empty string');
} }
} else { } else {
console.error(`Proxy: Expected a string but got type '${typeof string}'`); console.error(`Proxy: Expected a string but got type '${typeof string}'`);
@ -137,10 +134,9 @@ exports.serve = async (ipc, host, port, ws, origin) => {
onProxyReq(_proxyReq, req, _res) { onProxyReq(_proxyReq, req, _res) {
if (req.method === 'POST') { if (req.method === 'POST') {
// messages TO the target // messages TO the target
Asm.connectTo(chain([ Asm.connectTo(
req, pump(req, jsonParser())
parser() ).on('done', ({current: object}) => {
])).on('done', ({current: object}) => {
trackRequest(object); trackRequest(object);
}); });
} }
@ -149,10 +145,9 @@ exports.serve = async (ipc, host, port, ws, origin) => {
onProxyRes(proxyRes, req, _res) { onProxyRes(proxyRes, req, _res) {
if (req.method === 'POST') { if (req.method === 'POST') {
// messages FROM the target // messages FROM the target
Asm.connectTo(chain([ Asm.connectTo(
proxyRes, pump(proxyRes, jsonParser())
parser() ).on('done', ({current: object}) => {
])).on('done', ({current: object}) => {
trackResponse(object); trackResponse(object);
}); });
} }
@ -162,22 +157,18 @@ exports.serve = async (ipc, host, port, ws, origin) => {
if (ws) { if (ws) {
proxyOpts.onProxyReqWs = (_proxyReq, _req, socket, _options, _head) => { proxyOpts.onProxyReqWs = (_proxyReq, _req, socket, _options, _head) => {
// messages TO the target // messages TO the target
const wsp = new WsParser(0, false); pump(socket, new WsParser(0, false)).on('frame', ({data: buffer}) => {
wsp.on('frame', ({data: buffer}) => {
const object = parseJsonMaybe(buffer.toString()); const object = parseJsonMaybe(buffer.toString());
trackRequest(object); trackRequest(object);
}); });
pump(cloneable(socket), wsp);
}; };
proxyOpts.onOpen = (proxySocket) => { proxyOpts.onOpen = (proxySocket) => {
// messages FROM the target // messages FROM the target
const recv = new WebSocket.Receiver(); pump(proxySocket, new WsParser(0, true)).on('frame', ({data: buffer}) => {
recv.on('message', (data) => { const object = parseJsonMaybe(buffer.toString());
const object = parseJsonMaybe(data);
trackResponse(object); trackResponse(object);
}); });
pump(cloneable(proxySocket), recv);
}; };
} }
@ -192,7 +183,13 @@ exports.serve = async (ipc, host, port, ws, origin) => {
() => { resolve(server); } () => { resolve(server); }
); );
if (ws) { if (ws) {
server.on('upgrade', proxy.upgrade); server.on('upgrade', (msg, socket, head) => {
const swallowError = (err) => {
console.error(`Proxy: Network error '${err.message}'`);
};
socket.on('error', swallowError);
proxy.upgrade(msg, socket, head);
});
} }
}); });
}; };

View File

@ -2333,14 +2333,6 @@ clone@^1.0.2:
resolved "https://registry.yarnpkg.com/clone/-/clone-1.0.4.tgz#da309cc263df15994c688ca902179ca3c7cd7c7e" resolved "https://registry.yarnpkg.com/clone/-/clone-1.0.4.tgz#da309cc263df15994c688ca902179ca3c7cd7c7e"
integrity sha1-2jCcwmPfFZlMaIypAheco8fNfH4= integrity sha1-2jCcwmPfFZlMaIypAheco8fNfH4=
cloneable-readable@2.0.0:
version "2.0.0"
resolved "https://registry.yarnpkg.com/cloneable-readable/-/cloneable-readable-2.0.0.tgz#57b05f40f0dc95f5d6e18992c845f5fcf5b24f51"
integrity sha512-YjptvQM0hnSgrEQJPnGu3MqaK3iZ+37HaVBf5/wHyViQUQOGz5w+C44DllnInlUQjDISTyErf4Xa+RFEF1QB3Q==
dependencies:
inherits "^2.0.1"
readable-stream "^3.0.0"
co@^4.6.0: co@^4.6.0:
version "4.6.0" version "4.6.0"
resolved "https://registry.yarnpkg.com/co/-/co-4.6.0.tgz#6ea6bdf3d853ae54ccb8e47bfa0bf3f9031fb184" resolved "https://registry.yarnpkg.com/co/-/co-4.6.0.tgz#6ea6bdf3d853ae54ccb8e47bfa0bf3f9031fb184"
@ -8175,15 +8167,6 @@ readable-stream@^1.0.33:
isarray "0.0.1" isarray "0.0.1"
string_decoder "~0.10.x" string_decoder "~0.10.x"
readable-stream@^3.0.0:
version "3.0.6"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.0.6.tgz#351302e4c68b5abd6a2ed55376a7f9a25be3057a"
integrity sha512-9E1oLoOWfhSXHGv6QlwXJim7uNzd9EVlWK+21tCU9Ju/kR0/p2AZYPz4qSchgO8PlLIH4FpZYfzwS+rEksZjIg==
dependencies:
inherits "^2.0.3"
string_decoder "^1.1.1"
util-deprecate "^1.0.1"
readable-stream@~1.0.15: readable-stream@~1.0.15:
version "1.0.34" version "1.0.34"
resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.0.34.tgz#125820e34bc842d2f2aaafafe4c2916ee32c157c" resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.0.34.tgz#125820e34bc842d2f2aaafafe4c2916ee32c157c"
@ -9389,7 +9372,7 @@ string.prototype.padend@^3.0.0:
es-abstract "^1.4.3" es-abstract "^1.4.3"
function-bind "^1.0.2" function-bind "^1.0.2"
string_decoder@^1.0.0, string_decoder@^1.1.1: string_decoder@^1.0.0:
version "1.2.0" version "1.2.0"
resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-1.2.0.tgz#fe86e738b19544afe70469243b2a1ee9240eae8d" resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-1.2.0.tgz#fe86e738b19544afe70469243b2a1ee9240eae8d"
integrity sha512-6YqyX6ZWEYguAxgZzHGL7SsCeGx3V2TtOTqZz1xSTSWnqsbWwbptafNyvf/ACquZUXV3DANr5BDIwNYe1mN42w== integrity sha512-6YqyX6ZWEYguAxgZzHGL7SsCeGx3V2TtOTqZz1xSTSWnqsbWwbptafNyvf/ACquZUXV3DANr5BDIwNYe1mN42w==
@ -10142,7 +10125,7 @@ utf8@^2.1.1:
resolved "https://registry.yarnpkg.com/utf8/-/utf8-2.1.2.tgz#1fa0d9270e9be850d9b05027f63519bf46457d96" resolved "https://registry.yarnpkg.com/utf8/-/utf8-2.1.2.tgz#1fa0d9270e9be850d9b05027f63519bf46457d96"
integrity sha1-H6DZJw6b6FDZsFAn9jUZv0ZFfZY= integrity sha1-H6DZJw6b6FDZsFAn9jUZv0ZFfZY=
util-deprecate@^1.0.1, util-deprecate@~1.0.1: util-deprecate@~1.0.1:
version "1.0.2" version "1.0.2"
resolved "https://registry.yarnpkg.com/util-deprecate/-/util-deprecate-1.0.2.tgz#450d4dc9fa70de732762fbd2d4a28981419a0ccf" resolved "https://registry.yarnpkg.com/util-deprecate/-/util-deprecate-1.0.2.tgz#450d4dc9fa70de732762fbd2d4a28981419a0ccf"
integrity sha1-RQ1Nyfpw3nMnYvvS1KKJgUGaDM8= integrity sha1-RQ1Nyfpw3nMnYvvS1KKJgUGaDM8=