Revert "fix(@embark/proxy): Fix contract event subscriptions" (#2005)

* Revert "fix(@embark/core): set loglevel back to info"

This reverts commit a03ffd56e5.

* Revert "fix(@embark/proxy): Fix contract event subscriptions"

This reverts commit 173d53de2f.
This commit is contained in:
Iuri Matias 2019-10-28 16:20:14 -04:00 committed by GitHub
parent 395ae83f9c
commit 867a55c98a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 61 additions and 124 deletions

View File

@ -1,7 +1,6 @@
export interface Logger { export interface Logger {
info(text: string): void; info(text: string): void;
warn(text: string): void; warn(text: string): void;
debug(text: string): void;
trace(text: string): void; trace(text: string): void;
error(text: string, ...args: Array<string | Error>): void; error(text: string, ...args: Array<string | Error>): void;
} }

View File

@ -119,7 +119,7 @@ class Cmd {
.option('--contracts', 'only compile contracts into Embark wrappers') .option('--contracts', 'only compile contracts into Embark wrappers')
.option('--logfile [logfile]', __('filename to output logs (default: none)')) .option('--logfile [logfile]', __('filename to output logs (default: none)'))
.option('-c, --client [client]', __('Use a specific ethereum client [%s] (default: %s)', 'geth, parity', 'geth')) .option('-c, --client [client]', __('Use a specific ethereum client [%s] (default: %s)', 'geth, parity', 'geth'))
.option('--loglevel [loglevel]', __('level of logging to display') + ' ["error", "warn", "info", "debug", "trace"]', /^(error|warn|info|debug|trace)$/i, 'info') .option('--loglevel [loglevel]', __('level of logging to display') + ' ["error", "warn", "info", "debug", "trace"]', /^(error|warn|info|debug|trace)$/i, 'debug')
.option('--locale [locale]', __('language to use (default: en)')) .option('--locale [locale]', __('language to use (default: en)'))
.option('--pipeline [pipeline]', __('webpack config to use (default: production)')) .option('--pipeline [pipeline]', __('webpack config to use (default: production)'))
.description(__('deploy and build dapp at ') + 'dist/ (default: development)') .description(__('deploy and build dapp at ') + 'dist/ (default: development)')
@ -145,7 +145,7 @@ class Cmd {
.option('--nobrowser', __('prevent the development webserver from automatically opening a web browser')) .option('--nobrowser', __('prevent the development webserver from automatically opening a web browser'))
.option('--no-color', __('no colors in case it\'s needed for compatbility purposes')) .option('--no-color', __('no colors in case it\'s needed for compatbility purposes'))
.option('--logfile [logfile]', __('filename to output logs (default: %s)', 'none')) .option('--logfile [logfile]', __('filename to output logs (default: %s)', 'none'))
.option('--loglevel [loglevel]', __('level of logging to display') + ' ["error", "warn", "info", "debug", "trace"]', /^(error|warn|info|debug|trace)$/i, 'info') .option('--loglevel [loglevel]', __('level of logging to display') + ' ["error", "warn", "info", "debug", "trace"]', /^(error|warn|info|debug|trace)$/i, 'debug')
.option('--locale [locale]', __('language to use (default: en)')) .option('--locale [locale]', __('language to use (default: en)'))
.option('--pipeline [pipeline]', __('webpack config to use (default: development)')) .option('--pipeline [pipeline]', __('webpack config to use (default: development)'))
.option('--no-single-use-auth-token', __('disable the single use of token in cockpit')) .option('--no-single-use-auth-token', __('disable the single use of token in cockpit'))

View File

@ -31,7 +31,7 @@ class Engine {
let options = _options || {}; let options = _options || {};
this.events = options.events || this.events || new Events(); this.events = options.events || this.events || new Events();
this.logger = options.logger || new Logger({context: this.context, logLevel: options.logLevel || this.logLevel || 'info', events: this.events, logFile: this.logFile}); this.logger = options.logger || new Logger({context: this.context, logLevel: options.logLevel || this.logLevel || 'debug', events: this.events, logFile: this.logFile});
this.config = new Config({env: this.env, logger: this.logger, events: this.events, context: this.context, webServerConfig: this.webServerConfig, version: this.version}); this.config = new Config({env: this.env, logger: this.logger, events: this.events, context: this.context, webServerConfig: this.webServerConfig, version: this.version});
this.config.loadConfigFiles({embarkConfig: this.embarkConfig, interceptLogs: this.interceptLogs}); this.config.loadConfigFiles({embarkConfig: this.embarkConfig, interceptLogs: this.interceptLogs});
this.plugins = this.config.plugins; this.plugins = this.config.plugins;

View File

@ -1,7 +1,7 @@
import { Embark, Events, Logger } /* supplied by @types/embark in packages/embark-typings */ from "embark"; import {Embark, Events, Logger} /* supplied by @types/embark in packages/embark-typings */ from "embark";
import { __ } from "embark-i18n"; import {__} from "embark-i18n";
import { buildUrl, findNextPort } from "embark-utils"; import {buildUrl, findNextPort} from "embark-utils";
import { Proxy } from "./proxy"; import {Proxy} from "./proxy";
const constants = require("embark-core/constants"); const constants = require("embark-core/constants");
@ -26,15 +26,9 @@ export default class ProxyManager {
this.host = "localhost"; this.host = "localhost";
this.events.on("blockchain:started", async (clientName: string) => { this.events.on("blockchain:started", async (clientName: string) => {
try { await this.setupProxy(clientName);
await this.setupProxy(clientName); this.ready = true;
this.events.emit("proxy:ready");
this.ready = true;
this.events.emit("proxy:ready");
} catch (error) {
this.logger.error(`Error during proxy setup: ${error.message}. Use '--loglevel debug' for more detailed information.`);
this.logger.debug(`Error during proxy setup:\n${error.stack}`);
}
}); });
this.events.on("blockchain:stopped", async (clientName: string, node?: string) => { this.events.on("blockchain:stopped", async (clientName: string, node?: string) => {
this.ready = false; this.ready = false;
@ -87,18 +81,13 @@ export default class ProxyManager {
this.wsPort = port + 1; this.wsPort = port + 1;
this.isWs = clientName === constants.blockchain.vm || (/wss?/).test(this.embark.config.blockchainConfig.endpoint); this.isWs = clientName === constants.blockchain.vm || (/wss?/).test(this.embark.config.blockchainConfig.endpoint);
this.proxy = await new Proxy({ this.proxy = await new Proxy({events: this.events, plugins: this.plugins, logger: this.logger, vms: this.vms});
endpoint: clientName === constants.blockchain.vm ? constants.blockchain.vm : this.embark.config.blockchainConfig.endpoint,
events: this.events,
isWs: this.isWs,
logger: this.logger,
plugins: this.plugins,
vms: this.vms,
});
await this.proxy.serve( await this.proxy.serve(
clientName === constants.blockchain.vm ? constants.blockchain.vm : this.embark.config.blockchainConfig.endpoint,
this.host, this.host,
this.isWs ? this.wsPort : this.rpcPort, this.isWs ? this.wsPort : this.rpcPort,
this.isWs,
); );
} }

View File

@ -18,51 +18,46 @@ export class Proxy {
this.logger = options.logger; this.logger = options.logger;
this.vms = options.vms; this.vms = options.vms;
this.app = null; this.app = null;
this.endpoint = options.endpoint; this.requestManager;
if (options.endpoint === constants.blockchain.vm) {
this.endpoint = this.vms[this.vms.length - 1]();
}
this.isWs = options.isWs;
// used to service all non-long-living WS connections, including any
// request that is not WS and any WS request that is not an `eth_subscribe`
// RPC request
this.requestManager = new Web3RequestManager.Manager(this.endpoint);
this.nodeSubscriptions = {};
} }
async nodeReady() { async serve(endpoint, localHost, localPort, ws) {
if (endpoint === constants.blockchain.vm) {
endpoint = this.vms[this.vms.length - 1]();
}
this.requestManager = new Web3RequestManager.Manager(endpoint);
try { try {
await this.requestManager.send({ method: 'eth_accounts' }); await this.requestManager.send({ method: 'eth_accounts' });
} catch (e) { } catch (e) {
throw new Error(__(`Unable to connect to the blockchain endpoint on ${this.endpoint}`)); throw new Error(__('Unable to connect to the blockchain endpoint'));
} }
}
async serve(localHost, localPort) {
await this.nodeReady();
this.app = express(); this.app = express();
if (this.isWs) { if (ws) {
expressWs(this.app); expressWs(this.app);
} }
this.app.use(cors()); this.app.use(cors());
this.app.use(express.json()); this.app.use(express.json());
this.app.use(express.urlencoded({ extended: true })); this.app.use(express.urlencoded({extended: true}));
if (this.isWs) { if (ws) {
this.app.ws('/', async (conn, wsReq) => { this.app.ws('/', async (ws, _wsReq) => {
// Watch from subscription data for events
this.requestManager.provider.on('data', (result, deprecatedResult) => {
this.respondWs(ws, result || deprecatedResult);
});
conn.on('message', async (msg) => { ws.on('message', async (msg) => {
try { try {
const jsonMsg = JSON.parse(msg); const jsonMsg = JSON.parse(msg);
await this.processRequest(jsonMsg, conn); await this.processRequest(jsonMsg, ws, true);
} }
catch (err) { catch (err) {
const error = __('Error processing request: %s', err.message); const error = __('Error processing request: %s', err.message);
this.logger.error(error); this.logger.error(error);
this.respondWs(conn, error); this.respondWs(ws, error);
} }
}); });
}); });
@ -71,7 +66,7 @@ export class Proxy {
this.app.use(async (req, res) => { this.app.use(async (req, res) => {
// Modify request // Modify request
try { try {
await this.processRequest(req, res); await this.processRequest(req, res, false);
} }
catch (err) { catch (err) {
const error = __('Error processing request: %s', err.message); const error = __('Error processing request: %s', err.message);
@ -89,7 +84,7 @@ export class Proxy {
}); });
} }
async processRequest(request, transport) { async processRequest(request, transport, isWs) {
// Modify request // Modify request
let modifiedRequest; let modifiedRequest;
const rpcRequest = request.method === "POST" ? request.body : request; const rpcRequest = request.method === "POST" ? request.body : request;
@ -101,88 +96,42 @@ export class Proxy {
this.logger.error(__(`Error executing request actions: ${error}`)); this.logger.error(__(`Error executing request actions: ${error}`));
// TODO: Change error code to be more specific. Codes in section 5.1 of the JSON-RPC spec: https://www.jsonrpc.org/specification // TODO: Change error code to be more specific. Codes in section 5.1 of the JSON-RPC spec: https://www.jsonrpc.org/specification
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": request.id }; const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": request.id };
return this.respondError(transport, rpcErrorObj); return this.respondError(transport, rpcErrorObj, isWs);
} }
// Send the possibly modified request to the Node // Send the possibly modified request to the Node
const respData = { jsonrpc: "2.0", id: modifiedRequest.reqData.id }; const respData = { jsonrpc: "2.0", id: modifiedRequest.reqData.id };
if (modifiedRequest.sendToNode !== false) { if (modifiedRequest.sendToNode !== false) {
// kill our manually created long-living connection for eth_subscribe if we have one
if (this.isWs && modifiedRequest.reqData.method === 'eth_unsubscribe') {
const id = modifiedRequest.reqData.params[0];
this.nodeSubscriptions[id] && this.nodeSubscriptions[id].provider.disconnect();
}
// create a long-living WS connection to the node
if (this.isWs && modifiedRequest.reqData.method === 'eth_subscribe') {
// creates a new long-living connection to the node
const currentReqManager = new Web3RequestManager.Manager(this.endpoint);
// kill WS connetion to the node when the client connection closes
transport.on('close', () => currentReqManager.provider.disconnect());
// do the actual forward request to the node
currentReqManager.send(modifiedRequest.reqData, (error, result) => {
if (error) {
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.reqData.id };
return this.respondError(transport, rpcErrorObj);
}
// `result` contains our initial response from the node, ie
// subscription id. Any FUTURE data from the node that needs
// to be forwarded to the client connection will be handled
// in the `.on('data')` event below.
this.logger.debug(`Created subscription: ${result}`);
this.nodeSubscriptions[result] = currentReqManager;
respData.result = result;
// TODO: Kick off emitAcitonsForResponse here
this.respondWs(transport, respData);
});
// Watch for `eth_subscribe` subscription data coming from the node.
// Send the subscription data back across the originating client
// connection.
currentReqManager.provider.on('data', (result, deprecatedResult) => {
result = result || deprecatedResult;
this.logger.debug(`Subscription data received from node and forwarded to originating socket client connection: ${JSON.stringify(result)}`);
// TODO: Kick off emitAcitonsForResponse here
this.respondWs(transport, result);
});
return;
}
try { try {
const result = await this.forwardRequestToNode(modifiedRequest.reqData); const result = await this.forwardRequestToNode(modifiedRequest.reqData);
respData.result = result; respData.result = result;
} catch (fwdReqErr) { }
// The node responded with an error. Set up the error so that it can be catch (fwdReqErr) {
// the node responded with an error. Set up the error so that it can be
// stripped out by modifying the response (via actions for blockchain:proxy:response) // stripped out by modifying the response (via actions for blockchain:proxy:response)
respData.error = fwdReqErr.message || fwdReqErr; respData.error = fwdReqErr.message || fwdReqErr;
} }
try {
const modifiedResp = await this.emitActionsForResponse(modifiedRequest.reqData, respData);
// Send back to the client
if (modifiedResp && modifiedResp.respData && modifiedResp.respData.error) {
// error returned from the node and it wasn't stripped by our response actions
const error = modifiedResp.respData.error.message || modifiedResp.respData.error;
this.logger.error(__(`Error returned from the node: ${error}`));
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedResp.respData.id };
return this.respondError(transport, rpcErrorObj);
}
this.respondOK(transport, modifiedResp.respData);
}
catch (resError) {
// if was an error in response actions (resError), send the error in the response
const error = resError.message || resError;
this.logger.error(__(`Error executing response actions: ${error}`));
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.reqData.id };
return this.respondError(transport, rpcErrorObj);
}
} }
try {
const modifiedResp = await this.emitActionsForResponse(modifiedRequest.reqData, respData);
// Send back to the caller (web3)
if (modifiedResp && modifiedResp.respData && modifiedResp.respData.error) {
// error returned from the node and it wasn't stripped by our response actions
const error = modifiedResp.respData.error.message || modifiedResp.respData.error;
this.logger.error(__(`Error returned from the node: ${error}`));
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedResp.respData.id };
return this.respondError(transport, rpcErrorObj, isWs);
}
this.respondOK(transport, modifiedResp.respData, isWs);
}
catch (resError) {
// if was an error in response actions (resError), send the error in the response
const error = resError.message || resError;
this.logger.error(__(`Error executing response actions: ${error}`));
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.reqData.id };
return this.respondError(transport, rpcErrorObj, isWs);
}
} }
forwardRequestToNode(reqData) { forwardRequestToNode(reqData) {
@ -215,12 +164,12 @@ export class Proxy {
res.status(statusCode).send(response); res.status(statusCode).send(response);
} }
respondError(transport, error) { respondError(transport, error, isWs) {
return this.isWs ? this.respondWs(transport, error) : this.respondHttp(transport, 500, error) return isWs ? this.respondWs(transport, error) : this.respondHttp(transport, 500, error)
} }
respondOK(transport, response) { respondOK(transport, response, isWs) {
return this.isWs ? this.respondWs(transport, response) : this.respondHttp(transport, 200, response) return isWs ? this.respondWs(transport, response) : this.respondHttp(transport, 200, response)
} }
emitActionsForRequest(body) { emitActionsForRequest(body) {