From e738efe15a148ffb1b935220807c269df68cd2bc Mon Sep 17 00:00:00 2001 From: "Michael Bradley, Jr" Date: Sun, 9 Dec 2018 16:02:32 -0600 Subject: [PATCH] refactor(@embark/blockchain_process): swallow errors, revise streams For reasons unknown, `ECONNRESET` errors on websocket connections to embark's blockchain proxy are not automatically handled on Windows as they are on macOS and Linux (or those errors aren't happening on those platforms, it's difficult to determine). Explicitly swallow such errors so the blockchain process doesn't crash. Prior to this PR, the crash-behavior can be reproduced on Windows by running `embark blockchain` and `embark run` in separate terminals and quitting `embark run` while `embark blockchain` is still running. Consistently use the `simples` package's `WsParser` to process websocket traffic instead of using `WsParser` for requests and the `ws` package's `Websocket.Receiver` for responses. Consistently use `pump` to connect parser streams instead of using `pump` in some places and `chain` in others. Drop use of `cloneable` (and the package dependency) since it was used previously in hopes it would fix the errors, but it's unnecessary and didn't fix them. --- package.json | 1 - src/lib/modules/blockchain_process/proxy.js | 41 ++++++++++----------- yarn.lock | 21 +---------- 3 files changed, 21 insertions(+), 42 deletions(-) diff --git a/package.json b/package.json index 938d27f4c..593bc16e1 100644 --- a/package.json +++ b/package.json @@ -91,7 +91,6 @@ "chokidar": "2.0.4", "clipboardy": "1.2.3", "clone-deep": "4.0.0", - "cloneable-readable": "2.0.0", "colors": "1.3.2", "commander": "2.18.0", "css-loader": "1.0.0", diff --git a/src/lib/modules/blockchain_process/proxy.js b/src/lib/modules/blockchain_process/proxy.js index aa6e4ae23..c3ad2d5e7 100644 --- a/src/lib/modules/blockchain_process/proxy.js +++ b/src/lib/modules/blockchain_process/proxy.js @@ -2,15 +2,12 @@ const Asm = require('stream-json/Assembler'); const {canonicalHost, defaultHost} = require('../../utils/host'); -const {chain} = require('stream-chain'); -const cloneable = require('cloneable-readable'); const constants = require('../../constants.json'); const express = require('express'); -const {parser} = require('stream-json'); +const {parser: jsonParser} = require('stream-json'); const proxyMiddleware = require('http-proxy-middleware'); const pump = require('pump'); const utils = require('../../utils/utils'); -const WebSocket = require('ws'); const WsParser = require('simples/lib/parsers/ws'); const hex = (n) => { @@ -21,17 +18,17 @@ const hex = (n) => { const parseJsonMaybe = (string) => { let object; if (typeof string === 'string') { + // ignore empty strings if (string) { try { object = JSON.parse(string); } catch(e) { + // ignore client/server byte sequences sent when connections are closing if (Array.from(Buffer.from(string)).map(hex).join(':') !== '03:ef:bf:bd') { console.error(`Proxy: Error parsing string as JSON '${string}'`); } } - } else { - console.error('Proxy: Expected a non-empty string'); } } else { console.error(`Proxy: Expected a string but got type '${typeof string}'`); @@ -137,10 +134,9 @@ exports.serve = async (ipc, host, port, ws, origin) => { onProxyReq(_proxyReq, req, _res) { if (req.method === 'POST') { // messages TO the target - Asm.connectTo(chain([ - req, - parser() - ])).on('done', ({current: object}) => { + Asm.connectTo( + pump(req, jsonParser()) + ).on('done', ({current: object}) => { trackRequest(object); }); } @@ -149,10 +145,9 @@ exports.serve = async (ipc, host, port, ws, origin) => { onProxyRes(proxyRes, req, _res) { if (req.method === 'POST') { // messages FROM the target - Asm.connectTo(chain([ - proxyRes, - parser() - ])).on('done', ({current: object}) => { + Asm.connectTo( + pump(proxyRes, jsonParser()) + ).on('done', ({current: object}) => { trackResponse(object); }); } @@ -162,22 +157,18 @@ exports.serve = async (ipc, host, port, ws, origin) => { if (ws) { proxyOpts.onProxyReqWs = (_proxyReq, _req, socket, _options, _head) => { // messages TO the target - const wsp = new WsParser(0, false); - wsp.on('frame', ({data: buffer}) => { + pump(socket, new WsParser(0, false)).on('frame', ({data: buffer}) => { const object = parseJsonMaybe(buffer.toString()); trackRequest(object); }); - pump(cloneable(socket), wsp); }; proxyOpts.onOpen = (proxySocket) => { // messages FROM the target - const recv = new WebSocket.Receiver(); - recv.on('message', (data) => { - const object = parseJsonMaybe(data); + pump(proxySocket, new WsParser(0, true)).on('frame', ({data: buffer}) => { + const object = parseJsonMaybe(buffer.toString()); trackResponse(object); }); - pump(cloneable(proxySocket), recv); }; } @@ -192,7 +183,13 @@ exports.serve = async (ipc, host, port, ws, origin) => { () => { resolve(server); } ); if (ws) { - server.on('upgrade', proxy.upgrade); + server.on('upgrade', (msg, socket, head) => { + const swallowError = (err) => { + console.error(`Proxy: Network error '${err.message}'`); + }; + socket.on('error', swallowError); + proxy.upgrade(msg, socket, head); + }); } }); }; diff --git a/yarn.lock b/yarn.lock index 27c5474e1..aff33bcbe 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2333,14 +2333,6 @@ clone@^1.0.2: resolved "https://registry.yarnpkg.com/clone/-/clone-1.0.4.tgz#da309cc263df15994c688ca902179ca3c7cd7c7e" integrity sha1-2jCcwmPfFZlMaIypAheco8fNfH4= -cloneable-readable@2.0.0: - version "2.0.0" - resolved "https://registry.yarnpkg.com/cloneable-readable/-/cloneable-readable-2.0.0.tgz#57b05f40f0dc95f5d6e18992c845f5fcf5b24f51" - integrity sha512-YjptvQM0hnSgrEQJPnGu3MqaK3iZ+37HaVBf5/wHyViQUQOGz5w+C44DllnInlUQjDISTyErf4Xa+RFEF1QB3Q== - dependencies: - inherits "^2.0.1" - readable-stream "^3.0.0" - co@^4.6.0: version "4.6.0" resolved "https://registry.yarnpkg.com/co/-/co-4.6.0.tgz#6ea6bdf3d853ae54ccb8e47bfa0bf3f9031fb184" @@ -8175,15 +8167,6 @@ readable-stream@^1.0.33: isarray "0.0.1" string_decoder "~0.10.x" -readable-stream@^3.0.0: - version "3.0.6" - resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-3.0.6.tgz#351302e4c68b5abd6a2ed55376a7f9a25be3057a" - integrity sha512-9E1oLoOWfhSXHGv6QlwXJim7uNzd9EVlWK+21tCU9Ju/kR0/p2AZYPz4qSchgO8PlLIH4FpZYfzwS+rEksZjIg== - dependencies: - inherits "^2.0.3" - string_decoder "^1.1.1" - util-deprecate "^1.0.1" - readable-stream@~1.0.15: version "1.0.34" resolved "https://registry.yarnpkg.com/readable-stream/-/readable-stream-1.0.34.tgz#125820e34bc842d2f2aaafafe4c2916ee32c157c" @@ -9389,7 +9372,7 @@ string.prototype.padend@^3.0.0: es-abstract "^1.4.3" function-bind "^1.0.2" -string_decoder@^1.0.0, string_decoder@^1.1.1: +string_decoder@^1.0.0: version "1.2.0" resolved "https://registry.yarnpkg.com/string_decoder/-/string_decoder-1.2.0.tgz#fe86e738b19544afe70469243b2a1ee9240eae8d" integrity sha512-6YqyX6ZWEYguAxgZzHGL7SsCeGx3V2TtOTqZz1xSTSWnqsbWwbptafNyvf/ACquZUXV3DANr5BDIwNYe1mN42w== @@ -10142,7 +10125,7 @@ utf8@^2.1.1: resolved "https://registry.yarnpkg.com/utf8/-/utf8-2.1.2.tgz#1fa0d9270e9be850d9b05027f63519bf46457d96" integrity sha1-H6DZJw6b6FDZsFAn9jUZv0ZFfZY= -util-deprecate@^1.0.1, util-deprecate@~1.0.1: +util-deprecate@~1.0.1: version "1.0.2" resolved "https://registry.yarnpkg.com/util-deprecate/-/util-deprecate-1.0.2.tgz#450d4dc9fa70de732762fbd2d4a28981419a0ccf" integrity sha1-RQ1Nyfpw3nMnYvvS1KKJgUGaDM8=