mirror of https://github.com/embarklabs/embark.git
fix(@embark/proxy): Fix contract event subscriptions
Fix the proxy’s handling of WebSocket connections when subscribing to contract events and node data using the `eth_subscribe` RPC request. Previously, the client connection that the subscription data was sent to was often in a closed state. It was determined that this connection was the wrong connection to forward the data in the first place. The connection was in fact generally the connection created for the Ethereum service check which was then (correctly and subsequently) closed after it had finished its operation. The flow of a proxy request handling a WebSocket “eth_subscribe” RPC request is now as follows: 1. A WebSocket RPC request `”eth_subscribe”` is sent from a client to the proxy. 2. Proxy forwards the request to the node by way of a new instance of `RequestManager`. 3. When the node receives an event matching the subscription, it sends the event data back to same socket connection it received the request on (ie the specific instance of `RequestManager`). 4. The `RequestManager` fires the `”data”` event containing the subscription data, and this event is picked up in the proxy. 5. The proxy then forwards the subscription data on to the originating WS client connection. All other requests (ie non-WS or WS RPC requests that are not `eth_subscribe`) will be serviced to/from the node using a single `RequestManager` instance. Co-authored-by: Pascal Precht <pascal.precht@gmail.com>
This commit is contained in:
parent
a4b3ef454a
commit
173d53de2f
|
@ -1,6 +1,7 @@
|
||||||
export interface Logger {
|
export interface Logger {
|
||||||
info(text: string): void;
|
info(text: string): void;
|
||||||
warn(text: string): void;
|
warn(text: string): void;
|
||||||
|
debug(text: string): void;
|
||||||
trace(text: string): void;
|
trace(text: string): void;
|
||||||
error(text: string, ...args: Array<string | Error>): void;
|
error(text: string, ...args: Array<string | Error>): void;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
import {Embark, Events, Logger} /* supplied by @types/embark in packages/embark-typings */ from "embark";
|
import { Embark, Events, Logger } /* supplied by @types/embark in packages/embark-typings */ from "embark";
|
||||||
import {__} from "embark-i18n";
|
import { __ } from "embark-i18n";
|
||||||
import {buildUrl, findNextPort} from "embark-utils";
|
import { buildUrl, findNextPort } from "embark-utils";
|
||||||
import {Proxy} from "./proxy";
|
import { Proxy } from "./proxy";
|
||||||
|
|
||||||
const constants = require("embark-core/constants");
|
const constants = require("embark-core/constants");
|
||||||
|
|
||||||
|
@ -26,9 +26,15 @@ export default class ProxyManager {
|
||||||
this.host = "localhost";
|
this.host = "localhost";
|
||||||
|
|
||||||
this.events.on("blockchain:started", async (clientName: string) => {
|
this.events.on("blockchain:started", async (clientName: string) => {
|
||||||
await this.setupProxy(clientName);
|
try {
|
||||||
this.ready = true;
|
await this.setupProxy(clientName);
|
||||||
this.events.emit("proxy:ready");
|
|
||||||
|
this.ready = true;
|
||||||
|
this.events.emit("proxy:ready");
|
||||||
|
} catch (error) {
|
||||||
|
this.logger.error(`Error during proxy setup: ${error.message}. Use '--loglevel debug' for more detailed information.`);
|
||||||
|
this.logger.debug(`Error during proxy setup:\n${error.stack}`);
|
||||||
|
}
|
||||||
});
|
});
|
||||||
this.events.on("blockchain:stopped", async (clientName: string, node?: string) => {
|
this.events.on("blockchain:stopped", async (clientName: string, node?: string) => {
|
||||||
this.ready = false;
|
this.ready = false;
|
||||||
|
@ -81,13 +87,18 @@ export default class ProxyManager {
|
||||||
this.wsPort = port + 1;
|
this.wsPort = port + 1;
|
||||||
this.isWs = clientName === constants.blockchain.vm || (/wss?/).test(this.embark.config.blockchainConfig.endpoint);
|
this.isWs = clientName === constants.blockchain.vm || (/wss?/).test(this.embark.config.blockchainConfig.endpoint);
|
||||||
|
|
||||||
this.proxy = await new Proxy({events: this.events, plugins: this.plugins, logger: this.logger, vms: this.vms});
|
this.proxy = await new Proxy({
|
||||||
|
endpoint: clientName === constants.blockchain.vm ? constants.blockchain.vm : this.embark.config.blockchainConfig.endpoint,
|
||||||
|
events: this.events,
|
||||||
|
isWs: this.isWs,
|
||||||
|
logger: this.logger,
|
||||||
|
plugins: this.plugins,
|
||||||
|
vms: this.vms,
|
||||||
|
});
|
||||||
|
|
||||||
await this.proxy.serve(
|
await this.proxy.serve(
|
||||||
clientName === constants.blockchain.vm ? constants.blockchain.vm : this.embark.config.blockchainConfig.endpoint,
|
|
||||||
this.host,
|
this.host,
|
||||||
this.isWs ? this.wsPort : this.rpcPort,
|
this.isWs ? this.wsPort : this.rpcPort,
|
||||||
this.isWs,
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,46 +18,51 @@ export class Proxy {
|
||||||
this.logger = options.logger;
|
this.logger = options.logger;
|
||||||
this.vms = options.vms;
|
this.vms = options.vms;
|
||||||
this.app = null;
|
this.app = null;
|
||||||
this.requestManager;
|
this.endpoint = options.endpoint;
|
||||||
|
if (options.endpoint === constants.blockchain.vm) {
|
||||||
|
this.endpoint = this.vms[this.vms.length - 1]();
|
||||||
|
}
|
||||||
|
this.isWs = options.isWs;
|
||||||
|
// used to service all non-long-living WS connections, including any
|
||||||
|
// request that is not WS and any WS request that is not an `eth_subscribe`
|
||||||
|
// RPC request
|
||||||
|
this.requestManager = new Web3RequestManager.Manager(this.endpoint);
|
||||||
|
this.nodeSubscriptions = {};
|
||||||
}
|
}
|
||||||
|
|
||||||
async serve(endpoint, localHost, localPort, ws) {
|
async nodeReady() {
|
||||||
if (endpoint === constants.blockchain.vm) {
|
|
||||||
endpoint = this.vms[this.vms.length - 1]();
|
|
||||||
}
|
|
||||||
this.requestManager = new Web3RequestManager.Manager(endpoint);
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await this.requestManager.send({ method: 'eth_accounts' });
|
await this.requestManager.send({ method: 'eth_accounts' });
|
||||||
} catch (e) {
|
} catch (e) {
|
||||||
throw new Error(__('Unable to connect to the blockchain endpoint'));
|
throw new Error(__(`Unable to connect to the blockchain endpoint on ${this.endpoint}`));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async serve(localHost, localPort) {
|
||||||
|
|
||||||
|
await this.nodeReady();
|
||||||
|
|
||||||
this.app = express();
|
this.app = express();
|
||||||
if (ws) {
|
if (this.isWs) {
|
||||||
expressWs(this.app);
|
expressWs(this.app);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.app.use(cors());
|
this.app.use(cors());
|
||||||
this.app.use(express.json());
|
this.app.use(express.json());
|
||||||
this.app.use(express.urlencoded({extended: true}));
|
this.app.use(express.urlencoded({ extended: true }));
|
||||||
|
|
||||||
if (ws) {
|
if (this.isWs) {
|
||||||
this.app.ws('/', async (ws, _wsReq) => {
|
this.app.ws('/', async (conn, wsReq) => {
|
||||||
// Watch from subscription data for events
|
|
||||||
this.requestManager.provider.on('data', (result, deprecatedResult) => {
|
|
||||||
this.respondWs(ws, result || deprecatedResult);
|
|
||||||
});
|
|
||||||
|
|
||||||
ws.on('message', async (msg) => {
|
conn.on('message', async (msg) => {
|
||||||
try {
|
try {
|
||||||
const jsonMsg = JSON.parse(msg);
|
const jsonMsg = JSON.parse(msg);
|
||||||
await this.processRequest(jsonMsg, ws, true);
|
await this.processRequest(jsonMsg, conn);
|
||||||
}
|
}
|
||||||
catch (err) {
|
catch (err) {
|
||||||
const error = __('Error processing request: %s', err.message);
|
const error = __('Error processing request: %s', err.message);
|
||||||
this.logger.error(error);
|
this.logger.error(error);
|
||||||
this.respondWs(ws, error);
|
this.respondWs(conn, error);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
@ -66,7 +71,7 @@ export class Proxy {
|
||||||
this.app.use(async (req, res) => {
|
this.app.use(async (req, res) => {
|
||||||
// Modify request
|
// Modify request
|
||||||
try {
|
try {
|
||||||
await this.processRequest(req, res, false);
|
await this.processRequest(req, res);
|
||||||
}
|
}
|
||||||
catch (err) {
|
catch (err) {
|
||||||
const error = __('Error processing request: %s', err.message);
|
const error = __('Error processing request: %s', err.message);
|
||||||
|
@ -84,7 +89,7 @@ export class Proxy {
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
async processRequest(request, transport, isWs) {
|
async processRequest(request, transport) {
|
||||||
// Modify request
|
// Modify request
|
||||||
let modifiedRequest;
|
let modifiedRequest;
|
||||||
const rpcRequest = request.method === "POST" ? request.body : request;
|
const rpcRequest = request.method === "POST" ? request.body : request;
|
||||||
|
@ -96,42 +101,88 @@ export class Proxy {
|
||||||
this.logger.error(__(`Error executing request actions: ${error}`));
|
this.logger.error(__(`Error executing request actions: ${error}`));
|
||||||
// TODO: Change error code to be more specific. Codes in section 5.1 of the JSON-RPC spec: https://www.jsonrpc.org/specification
|
// TODO: Change error code to be more specific. Codes in section 5.1 of the JSON-RPC spec: https://www.jsonrpc.org/specification
|
||||||
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": request.id };
|
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": request.id };
|
||||||
return this.respondError(transport, rpcErrorObj, isWs);
|
return this.respondError(transport, rpcErrorObj);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Send the possibly modified request to the Node
|
// Send the possibly modified request to the Node
|
||||||
const respData = { jsonrpc: "2.0", id: modifiedRequest.reqData.id };
|
const respData = { jsonrpc: "2.0", id: modifiedRequest.reqData.id };
|
||||||
if (modifiedRequest.sendToNode !== false) {
|
if (modifiedRequest.sendToNode !== false) {
|
||||||
|
|
||||||
|
// kill our manually created long-living connection for eth_subscribe if we have one
|
||||||
|
if (this.isWs && modifiedRequest.reqData.method === 'eth_unsubscribe') {
|
||||||
|
const id = modifiedRequest.reqData.params[0];
|
||||||
|
this.nodeSubscriptions[id] && this.nodeSubscriptions[id].provider.disconnect();
|
||||||
|
}
|
||||||
|
// create a long-living WS connection to the node
|
||||||
|
if (this.isWs && modifiedRequest.reqData.method === 'eth_subscribe') {
|
||||||
|
|
||||||
|
// creates a new long-living connection to the node
|
||||||
|
const currentReqManager = new Web3RequestManager.Manager(this.endpoint);
|
||||||
|
|
||||||
|
// kill WS connetion to the node when the client connection closes
|
||||||
|
transport.on('close', () => currentReqManager.provider.disconnect());
|
||||||
|
|
||||||
|
// do the actual forward request to the node
|
||||||
|
currentReqManager.send(modifiedRequest.reqData, (error, result) => {
|
||||||
|
if (error) {
|
||||||
|
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.reqData.id };
|
||||||
|
return this.respondError(transport, rpcErrorObj);
|
||||||
|
}
|
||||||
|
// `result` contains our initial response from the node, ie
|
||||||
|
// subscription id. Any FUTURE data from the node that needs
|
||||||
|
// to be forwarded to the client connection will be handled
|
||||||
|
// in the `.on('data')` event below.
|
||||||
|
this.logger.debug(`Created subscription: ${result}`);
|
||||||
|
this.nodeSubscriptions[result] = currentReqManager;
|
||||||
|
respData.result = result;
|
||||||
|
// TODO: Kick off emitAcitonsForResponse here
|
||||||
|
this.respondWs(transport, respData);
|
||||||
|
});
|
||||||
|
|
||||||
|
// Watch for `eth_subscribe` subscription data coming from the node.
|
||||||
|
// Send the subscription data back across the originating client
|
||||||
|
// connection.
|
||||||
|
currentReqManager.provider.on('data', (result, deprecatedResult) => {
|
||||||
|
result = result || deprecatedResult;
|
||||||
|
this.logger.debug(`Subscription data received from node and forwarded to originating socket client connection: ${JSON.stringify(result)}`);
|
||||||
|
// TODO: Kick off emitAcitonsForResponse here
|
||||||
|
this.respondWs(transport, result);
|
||||||
|
});
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
const result = await this.forwardRequestToNode(modifiedRequest.reqData);
|
const result = await this.forwardRequestToNode(modifiedRequest.reqData);
|
||||||
respData.result = result;
|
respData.result = result;
|
||||||
}
|
} catch (fwdReqErr) {
|
||||||
catch (fwdReqErr) {
|
// The node responded with an error. Set up the error so that it can be
|
||||||
// the node responded with an error. Set up the error so that it can be
|
|
||||||
// stripped out by modifying the response (via actions for blockchain:proxy:response)
|
// stripped out by modifying the response (via actions for blockchain:proxy:response)
|
||||||
respData.error = fwdReqErr.message || fwdReqErr;
|
respData.error = fwdReqErr.message || fwdReqErr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
const modifiedResp = await this.emitActionsForResponse(modifiedRequest.reqData, respData);
|
||||||
|
// Send back to the client
|
||||||
|
if (modifiedResp && modifiedResp.respData && modifiedResp.respData.error) {
|
||||||
|
// error returned from the node and it wasn't stripped by our response actions
|
||||||
|
const error = modifiedResp.respData.error.message || modifiedResp.respData.error;
|
||||||
|
this.logger.error(__(`Error returned from the node: ${error}`));
|
||||||
|
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedResp.respData.id };
|
||||||
|
return this.respondError(transport, rpcErrorObj);
|
||||||
|
}
|
||||||
|
this.respondOK(transport, modifiedResp.respData);
|
||||||
|
}
|
||||||
|
catch (resError) {
|
||||||
|
// if was an error in response actions (resError), send the error in the response
|
||||||
|
const error = resError.message || resError;
|
||||||
|
this.logger.error(__(`Error executing response actions: ${error}`));
|
||||||
|
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.reqData.id };
|
||||||
|
return this.respondError(transport, rpcErrorObj);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
|
||||||
const modifiedResp = await this.emitActionsForResponse(modifiedRequest.reqData, respData);
|
|
||||||
// Send back to the caller (web3)
|
|
||||||
if (modifiedResp && modifiedResp.respData && modifiedResp.respData.error) {
|
|
||||||
// error returned from the node and it wasn't stripped by our response actions
|
|
||||||
const error = modifiedResp.respData.error.message || modifiedResp.respData.error;
|
|
||||||
this.logger.error(__(`Error returned from the node: ${error}`));
|
|
||||||
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedResp.respData.id };
|
|
||||||
return this.respondError(transport, rpcErrorObj, isWs);
|
|
||||||
}
|
|
||||||
this.respondOK(transport, modifiedResp.respData, isWs);
|
|
||||||
}
|
|
||||||
catch (resError) {
|
|
||||||
// if was an error in response actions (resError), send the error in the response
|
|
||||||
const error = resError.message || resError;
|
|
||||||
this.logger.error(__(`Error executing response actions: ${error}`));
|
|
||||||
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.reqData.id };
|
|
||||||
return this.respondError(transport, rpcErrorObj, isWs);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
forwardRequestToNode(reqData) {
|
forwardRequestToNode(reqData) {
|
||||||
|
@ -164,12 +215,12 @@ export class Proxy {
|
||||||
res.status(statusCode).send(response);
|
res.status(statusCode).send(response);
|
||||||
}
|
}
|
||||||
|
|
||||||
respondError(transport, error, isWs) {
|
respondError(transport, error) {
|
||||||
return isWs ? this.respondWs(transport, error) : this.respondHttp(transport, 500, error)
|
return this.isWs ? this.respondWs(transport, error) : this.respondHttp(transport, 500, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
respondOK(transport, response, isWs) {
|
respondOK(transport, response) {
|
||||||
return isWs ? this.respondWs(transport, response) : this.respondHttp(transport, 200, response)
|
return this.isWs ? this.respondWs(transport, response) : this.respondHttp(transport, 200, response)
|
||||||
}
|
}
|
||||||
|
|
||||||
emitActionsForRequest(body) {
|
emitActionsForRequest(body) {
|
||||||
|
|
Loading…
Reference in New Issue