fix(@embark/proxy): Fix unsubsribe handling and add new provider

When `eth_unsubscribe` is received in the proxy, ensure this request is forwarded through on the correct socket (the same socket that was used for the corresponding `eth_subscribe`).

Move subscription handling for `eth_subscribe` and `eth_unsubscribe` to RpcModifiers (in `rpc-manager` package).

For each `eth_subscribe` request, a new `RequestManager` is created. Since the endpoint property on the proxy class was updated to be a provider, the same provider was being assigned to each new `RequestManager` and thus creating multiple event handlers for each subscription created. To circumvent this, we are now creating a new provider for each `RequestManager`.

Co-authored-by: Pascal Precht <pascal.precht@googlemail.com>
This commit is contained in:
emizzle 2019-11-11 16:40:36 +11:00 committed by Pascal Precht
parent 3c760c3515
commit f6f45077e9
No known key found for this signature in database
GPG Key ID: 0EE28D8D6FD85D7D
18 changed files with 437 additions and 219 deletions

View File

@ -4,7 +4,7 @@ contract SimpleStorage {
uint public storedData;
address public registar;
address owner;
event EventOnSet2(bool passed, string message);
event EventOnSet2(bool passed, string message, uint setValue);
constructor(uint initialValue) public {
storedData = initialValue;
@ -20,7 +20,7 @@ contract SimpleStorage {
function set2(uint x) public {
storedData = x;
emit EventOnSet2(true, "hi");
emit EventOnSet2(true, "hi", x);
}
function set3(uint x) public {

View File

@ -51,10 +51,9 @@ contract("SimpleStorage", function() {
});
it('listens to events', function(done) {
SimpleStorage.once('EventOnSet2', async function(error, _result) {
SimpleStorage.once('EventOnSet2', async function(error, result) {
assert.strictEqual(error, null);
let result = await SimpleStorage.methods.get().call();
assert.strictEqual(parseInt(result, 10), 150);
assert.strictEqual(parseInt(result.returnValues.setValue, 10), 150);
done(error);
});
@ -63,7 +62,7 @@ contract("SimpleStorage", function() {
it('asserts event triggered', async function() {
const tx = await SimpleStorage.methods.set2(160).send();
assert.eventEmitted(tx, 'EventOnSet2', {passed: true, message: "hi"});
assert.eventEmitted(tx, 'EventOnSet2', {passed: true, message: "hi", setValue: "160"});
});
it("should revert with a value lower than 5", async function() {

View File

@ -35,6 +35,7 @@
"webpackDone": "webpackDone"
},
"blockchain": {
"ethereum": "ethereum",
"vm": "vm",
"call": "eth_call",
"clients": {

View File

@ -53,6 +53,7 @@ export interface Config {
rpcCorsDomain: string;
wsRPC: boolean;
isDev: boolean;
client: string;
};
webServerConfig: {
certOptions: {

View File

@ -1,6 +1,6 @@
class Ganache {
constructor(embark) {
embark.events.request('proxy:vm:register', () => {
embark.events.request('blockchain:vm:register', () => {
const ganache = require('ganache-cli');
return ganache.provider();
});

View File

@ -21,30 +21,30 @@ export default class EthAccounts extends RpcModifier {
constructor(embark: Embark, rpcModifierEvents: Events) {
super(embark, rpcModifierEvents);
this.embark.registerActionForEvent("blockchain:proxy:response", this.checkResponseFor_eth_accounts.bind(this));
this.embark.registerActionForEvent("blockchain:proxy:response", this.ethAccountsResponse.bind(this));
}
private async checkResponseFor_eth_accounts(params: any, callback: Callback<any>) {
private async ethAccountsResponse(params: any, callback: Callback<any>) {
if (!(METHODS_TO_MODIFY.includes(params.reqData.method))) {
if (!(METHODS_TO_MODIFY.includes(params.request.method))) {
return callback(null, params);
}
this.logger.trace(__(`Modifying blockchain '${params.reqData.method}' response:`));
this.logger.trace(__(`Original request/response data: ${JSON.stringify(params)}`));
this.logger.trace(__(`Modifying blockchain '${params.request.method}' response:`));
this.logger.trace(__(`Original request/response data: ${JSON.stringify({ request: params.request, response: params.response })}`));
try {
if (!arrayEqual(params.respData.result, this._nodeAccounts || [])) {
if (!arrayEqual(params.response.result, this._nodeAccounts || [])) {
// reset backing variables so accounts is recalculated
await this.rpcModifierEvents.request2("nodeAccounts:updated", params.respData.result);
await this.rpcModifierEvents.request2("nodeAccounts:updated", params.response.result);
}
const accounts = await this.accounts;
if (!(accounts && accounts.length)) {
return callback(null, params);
}
params.respData.result = accounts.map((acc) => acc.address || acc);
this.logger.trace(__(`Modified request/response data: ${JSON.stringify(params)}`));
params.response.result = accounts.map((acc) => acc.address || acc);
this.logger.trace(__(`Modified request/response data: ${JSON.stringify({ request: params.request, response: params.response })}`));
} catch (err) {
return callback(err);
}

View File

@ -11,7 +11,7 @@ export default class EthSendTransaction extends RpcModifier {
constructor(embark: Embark, rpcModifierEvents: Events) {
super(embark, rpcModifierEvents);
embark.registerActionForEvent("blockchain:proxy:request", this.checkRequestFor_eth_sendTransaction.bind(this));
embark.registerActionForEvent("blockchain:proxy:request", this.ethSendTransactionRequest.bind(this));
// Allow to run transaction in parallel by resolving the nonce manually.
// For each transaction, resolve the nonce by taking the max of current transaction count and the cache we keep locally.
@ -52,8 +52,8 @@ export default class EthSendTransaction extends RpcModifier {
callback(null, this.nonceCache[address]);
});
}
private async checkRequestFor_eth_sendTransaction(params: any, callback: Callback<any>) {
if (!(params.reqData.method === blockchainConstants.transactionMethods.eth_sendTransaction)) {
private async ethSendTransactionRequest(params: any, callback: Callback<any>) {
if (!(params.request.method === blockchainConstants.transactionMethods.eth_sendTransaction)) {
return callback(null, params);
}
const accounts = await this.accounts;
@ -61,26 +61,26 @@ export default class EthSendTransaction extends RpcModifier {
return callback(null, params);
}
this.logger.trace(__(`Modifying blockchain '${params.reqData.method}' request:`));
this.logger.trace(__(`Original request data: ${JSON.stringify(params)}`));
this.logger.trace(__(`Modifying blockchain '${params.request.method}' request:`));
this.logger.trace(__(`Original request data: ${JSON.stringify({ request: params.request, response: params.response })}`));
try {
// Check if we have that account in our wallet
const account = accounts.find((acc) => Web3.utils.toChecksumAddress(acc.address) === Web3.utils.toChecksumAddress(params.reqData.params[0].from));
const account = accounts.find((acc) => Web3.utils.toChecksumAddress(acc.address) === Web3.utils.toChecksumAddress(params.request.params[0].from));
if (account && account.privateKey) {
return this.signTransactionQueue.push({ payload: params.reqData.params[0], account }, (err: any, newPayload: any) => {
return this.signTransactionQueue.push({ payload: params.request.params[0], account }, (err: any, newPayload: any) => {
if (err) {
return callback(err, null);
}
params.reqData.method = blockchainConstants.transactionMethods.eth_sendRawTransaction;
params.reqData.params = [newPayload];
params.request.method = blockchainConstants.transactionMethods.eth_sendRawTransaction;
params.request.params = [newPayload];
callback(err, params);
});
}
} catch (err) {
return callback(err);
}
this.logger.trace(__(`Modified request/response data: ${JSON.stringify(params)}`));
this.logger.trace(__(`Modified request/response data: ${JSON.stringify({ request: params.request, response: params.response })}`));
callback(null, params);
}
}

View File

@ -8,40 +8,40 @@ export default class EthSignTypedData extends RpcModifier {
constructor(embark: Embark, rpcModifierEvents: Events) {
super(embark, rpcModifierEvents);
this.embark.registerActionForEvent("blockchain:proxy:request", this.checkRequestFor_eth_signTypedData.bind(this));
this.embark.registerActionForEvent("blockchain:proxy:response", this.checkResponseFor_eth_signTypedData.bind(this));
this.embark.registerActionForEvent("blockchain:proxy:request", this.ethSignTypedDataRequest.bind(this));
this.embark.registerActionForEvent("blockchain:proxy:response", this.ethSignTypedDataResponse.bind(this));
}
private async checkRequestFor_eth_signTypedData(params: any, callback: Callback<any>) {
private async ethSignTypedDataRequest(params: any, callback: Callback<any>) {
// check for:
// - eth_signTypedData
// - eth_signTypedData_v3
// - eth_signTypedData_v4
// - personal_signTypedData (parity)
if (params.reqData.method.includes("signTypedData")) {
if (params.request.method.includes("signTypedData")) {
// indicate that we do not want this call to go to the node
params.sendToNode = false;
return callback(null, params);
}
callback(null, params);
}
private async checkResponseFor_eth_signTypedData(params: any, callback: Callback<any>) {
private async ethSignTypedDataResponse(params: any, callback: Callback<any>) {
// check for:
// - eth_signTypedData
// - eth_signTypedData_v3
// - eth_signTypedData_v4
// - personal_signTypedData (parity)
if (!params.reqData.method.includes("signTypedData")) {
if (!params.request.method.includes("signTypedData")) {
return callback(null, params);
}
this.logger.trace(__(`Modifying blockchain '${params.reqData.method}' response:`));
this.logger.trace(__(`Original request/response data: ${JSON.stringify(params)}`));
this.logger.trace(__(`Modifying blockchain '${params.request.method}' response:`));
this.logger.trace(__(`Original request/response data: ${JSON.stringify({ request: params.request, response: params.response })}`));
try {
const accounts = await this.accounts;
const [fromAddr, typedData] = params.reqData.params;
const [fromAddr, typedData] = params.request.params;
const account = accounts.find((acc) => Web3.utils.toChecksumAddress(acc.address) === Web3.utils.toChecksumAddress(fromAddr));
if (!(account && account.privateKey)) {
return callback(
@ -51,8 +51,8 @@ export default class EthSignTypedData extends RpcModifier {
const toSign = transaction.getToSignHash(typeof typedData === "string" ? JSON.parse(typedData) : typedData);
const signature = sign(toSign, [account.privateKey]);
params.respData.result = signature[0];
this.logger.trace(__(`Modified request/response data: ${JSON.stringify(params)}`));
params.response.result = signature[0];
this.logger.trace(__(`Modified request/response data: ${JSON.stringify({ request: params.request, response: params.response })}`));
} catch (err) {
return callback(err);
}

View File

@ -0,0 +1,43 @@
import { Callback, Embark, Events } /* supplied by @types/embark in packages/embark-typings */ from "embark";
import { __ } from "embark-i18n";
import RpcModifier from "./rpcModifier";
export default class EthSubscribe extends RpcModifier {
constructor(embark: Embark, rpcModifierEvents: Events) {
super(embark, rpcModifierEvents);
embark.registerActionForEvent("blockchain:proxy:request", this.ethSubscribeRequest.bind(this));
embark.registerActionForEvent("blockchain:proxy:response", this.ethSubscribeResponse.bind(this));
}
private async ethSubscribeRequest(params: any, callback: Callback<any>) {
// check for eth_subscribe and websockets
if (params.isWs && params.request.method === "eth_subscribe") {
// indicate that we do not want this call to go to the node
params.sendToNode = false;
return callback(null, params);
}
callback(null, params);
}
private async ethSubscribeResponse(params: any, callback: Callback<any>) {
const { isWs, transport, request, response } = params;
// check for eth_subscribe and websockets
if (!(isWs && request.method.includes("eth_subscribe"))) {
return callback(null, params);
}
this.logger.trace(__(`Modifying blockchain '${request.method}' response:`));
this.logger.trace(__(`Original request/response data: ${JSON.stringify({ request, response })}`));
try {
const nodeResponse = await this.events.request2("proxy:websocket:subscribe", transport, request, response);
params.response = nodeResponse;
this.logger.trace(__(`Modified request/response data: ${JSON.stringify({ request, response: params.response })}`));
} catch (err) {
return callback(err);
}
callback(null, params);
}
}

View File

@ -0,0 +1,44 @@
import { Callback, Embark, Events } /* supplied by @types/embark in packages/embark-typings */ from "embark";
import { __ } from "embark-i18n";
import RpcModifier from "./rpcModifier";
export default class EthUnsubscribe extends RpcModifier {
constructor(embark: Embark, rpcModifierEvents: Events) {
super(embark, rpcModifierEvents);
embark.registerActionForEvent("blockchain:proxy:request", this.ethUnsubscribeRequest.bind(this));
embark.registerActionForEvent("blockchain:proxy:response", this.ethUnsubscribeResponse.bind(this));
}
private async ethUnsubscribeRequest(params: any, callback: Callback<any>) {
// check for eth_subscribe and websockets
if (params.isWs && params.request.method === "eth_unsubscribe") {
// indicate that we do not want this call to go to the node
params.sendToNode = false;
return callback(null, params);
}
callback(null, params);
}
private async ethUnsubscribeResponse(params: any, callback: Callback<any>) {
const { isWs, request, response } = params;
// check for eth_subscribe and websockets
if (!(isWs && request.method.includes("eth_unsubscribe"))) {
return callback(null, params);
}
this.logger.trace(__(`Modifying blockchain '${request.method}' response:`));
this.logger.trace(__(`Original request/response data: ${JSON.stringify({ request, response })}`));
try {
const nodeResponse = await this.events.request2("proxy:websocket:unsubscribe", request, response);
params.response = nodeResponse;
this.logger.trace(__(`Modified request/response data: ${JSON.stringify({ request, response: params.response })}`));
} catch (err) {
return callback(err);
}
callback(null, params);
}
}

View File

@ -4,6 +4,8 @@ import Web3 from "web3";
import EthAccounts from "./eth_accounts";
import EthSendTransaction from "./eth_sendTransaction";
import EthSignTypedData from "./eth_signTypedData";
import EthSubscribe from "./eth_subscribe";
import EthUnsubscribe from "./eth_unsubscribe";
import PersonalNewAccount from "./personal_newAccount";
import RpcModifier from "./rpcModifier";
@ -34,7 +36,14 @@ export default class RpcManager {
}
return this.updateAccounts(this._nodeAccounts, cb);
});
this.modifiers = [PersonalNewAccount, EthAccounts, EthSendTransaction, EthSignTypedData].map((rpcModifier) => new rpcModifier(this.embark, this.rpcModifierEvents));
this.modifiers = [
PersonalNewAccount,
EthAccounts,
EthSendTransaction,
EthSignTypedData,
EthSubscribe,
EthUnsubscribe
].map((rpcModifier) => new rpcModifier(this.embark, this.rpcModifierEvents));
}
private async updateAccounts(updatedNodeAccounts: any[], cb: Callback<null>) {
for (const modifier of this.modifiers) {

View File

@ -7,16 +7,16 @@ export default class PersonalNewAccount extends RpcModifier {
constructor(embark: Embark, rpcModifierEvents: Events) {
super(embark, rpcModifierEvents);
embark.registerActionForEvent("blockchain:proxy:response", this.checkResponseFor_personal_newAccount.bind(this));
embark.registerActionForEvent("blockchain:proxy:response", this.personalNewAccountResponse.bind(this));
}
private async checkResponseFor_personal_newAccount(params: any, callback: Callback<any>) {
if (params.reqData.method !== blockchainConstants.transactionMethods.personal_newAccount) {
private async personalNewAccountResponse(params: any, callback: Callback<any>) {
if (params.request.method !== blockchainConstants.transactionMethods.personal_newAccount) {
return callback(null, params);
}
// emit event so tx modifiers can refresh accounts
await this.rpcModifierEvents.request2("nodeAccounts:added", params.respData.result);
await this.rpcModifierEvents.request2("nodeAccounts:added", params.response.result);
callback(null, params);
}

View File

@ -1,9 +1,9 @@
const async = require('async');
import {__} from 'embark-i18n';
import { __ } from 'embark-i18n';
const Web3 = require('web3');
const {blockchain: blockchainConstants} = require('embark-core/constants');
import {dappPath, getAddressToContract, getTransactionParams, hexToNumber} from 'embark-utils';
const { blockchain: blockchainConstants } = require('embark-core/constants');
import { dappPath, getAddressToContract, getTransactionParams, hexToNumber } from 'embark-utils';
const Transaction = require('ethereumjs-tx');
@ -98,23 +98,23 @@ class TransactionLogger {
}
async _onLogRequest(args) {
const method = args.reqData.method;
const method = args.request.method;
if (!this.contractsDeployed || !LISTENED_METHODS.includes(method)) {
return;
}
if (method === blockchainConstants.transactionMethods.eth_sendTransaction) {
// We just gather data and wait for the receipt
this.transactions[args.respData.result] = {
address: args.reqData.params[0].to,
data: args.reqData.params[0].data,
txHash: args.respData.result
this.transactions[args.response.result] = {
address: args.request.params[0].to,
data: args.request.params[0].data,
txHash: args.response.result
};
return;
} else if (method === blockchainConstants.transactionMethods.eth_sendRawTransaction) {
const rawData = Buffer.from(ethUtil.stripHexPrefix(args.reqData.params[0]), 'hex');
const rawData = Buffer.from(ethUtil.stripHexPrefix(args.request.params[0]), 'hex');
const tx = new Transaction(rawData, 'hex');
this.transactions[args.respData.result] = {
this.transactions[args.response.result] = {
address: '0x' + tx.to.toString('hex'),
data: '0x' + tx.data.toString('hex')
};
@ -123,22 +123,22 @@ class TransactionLogger {
let dataObject;
if (method === blockchainConstants.transactionMethods.eth_getTransactionReceipt) {
dataObject = args.respData.result;
dataObject = args.response.result;
if (!dataObject) {
return;
}
if (this.transactions[args.respData.result.transactionHash]) {
if (this.transactions[args.response.result.transactionHash]) {
// This is the normal case. If we don't get here, it's because we missed a TX
dataObject = Object.assign(dataObject, this.transactions[args.respData.result.transactionHash]);
delete this.transactions[args.respData.result.transactionHash]; // No longer needed
dataObject = Object.assign(dataObject, this.transactions[args.response.result.transactionHash]);
delete this.transactions[args.response.result.transactionHash]; // No longer needed
} else {
// Was not a eth_getTransactionReceipt in the context of a transaction
return;
}
} else {
dataObject = args.reqData.params[0];
dataObject = args.request.params[0];
}
const {to: address, data} = dataObject;
const { to: address, data } = dataObject;
if (!address) {
// It's a deployment
return;
@ -152,7 +152,7 @@ class TransactionLogger {
});
}
const {name, silent} = contract;
const { name, silent } = contract;
if (silent && !this.outputDone) {
return;
}
@ -169,12 +169,12 @@ class TransactionLogger {
}
if (method === blockchainConstants.transactionMethods.eth_call) {
const log = Object.assign({}, args, {name, functionName, paramString});
const log = Object.assign({}, args, { name, functionName, paramString });
log.status = '0x1';
return this.events.emit('contracts:log', log);
}
let {transactionHash, blockNumber, gasUsed, status} = args.respData.result;
let { transactionHash, blockNumber, gasUsed, status } = args.response.result;
let reason;
if (status !== '0x0' && status !== '0x1') {
status = !status ? '0x0' : '0x1';
@ -192,7 +192,7 @@ class TransactionLogger {
gasUsed = hexToNumber(gasUsed);
blockNumber = hexToNumber(blockNumber);
const log = Object.assign({}, args, {name, functionName, paramString, gasUsed, blockNumber, reason, status, transactionHash});
const log = Object.assign({}, args, { name, functionName, paramString, gasUsed, blockNumber, reason, status, transactionHash });
this.events.emit('contracts:log', log);
this.logger.info(`Blockchain>`.underline + ` ${name}.${functionName}(${paramString})`.bold + ` | ${transactionHash} | gas:${gasUsed} | blk:${blockNumber} | status:${status}${reason ? ` | reason: "${reason}"` : ''}`);

View File

@ -9,30 +9,35 @@ class BlockchainClient {
this.blockchainClients = {};
this.client = null;
this.vms = [];
this.events.setCommandHandler("blockchain:client:register", (clientName, blockchainClient) => {
this.blockchainClients[clientName] = blockchainClient;
this.client = blockchainClient;
});
this.events.setCommandHandler("blockchain:vm:register", (handler) => {
this.vms.push(handler());
});
// TODO: unclear currently if this belongs here so it's a bit hardcoded for now
this.events.setCommandHandler("blockchain:client:provider", (clientName, cb) => {
this.events.request("proxy:endpoint:get", (err, endpoint) => {
if (err) {
return cb(err);
}
if (endpoint.startsWith('ws')) {
return cb(null, new Web3.providers.WebsocketProvider(endpoint, {
headers: {Origin: constants.embarkResourceOrigin},
// TODO remove this when Geth fixes this: https://github.com/ethereum/go-ethereum/issues/16846
// Edit: This has been fixed in Geth 1.9, but we don't support 1.9 yet and still support 1.8
clientConfig: {
fragmentationThreshold: 81920
}
}));
}
const web3 = new Web3(endpoint);
cb(null, web3.currentProvider);
});
this.events.setCommandHandler("blockchain:client:vmProvider", async (cb) => {
if (!this.vms.length) {
return cb(`Failed to get the VM provider. Please register one using 'blockchain:vm:register', or by ensuring the 'embark-ganache' package is registered.`);
}
return cb(null, this.vms[this.vms.length - 1]);
});
this.events.setCommandHandler("blockchain:client:provider", async (clientName, endpoint, cb) => {
if (!cb && typeof endpoint === "function") {
cb = endpoint;
endpoint = null;
}
let provider;
try {
provider = await this._getProvider(clientName, endpoint);
}
catch (err) {
return cb(`Error getting provider: ${err.message || err}`);
}
cb(null, provider);
});
// TODO: maybe not the ideal event to listen to?
@ -46,7 +51,28 @@ class BlockchainClient {
// set default account
});
}
async _getProvider(clientName, endpoint) {
// Passing in an endpoint allows us to customise which URL the provider connects to.
// If no endpoint is provided, the provider will connect to the proxy.
// Explicity setting an endpoint is useful for cases where we want to connect directly
// to the node (ie in the proxy).
if (!endpoint) {
// will return the proxy URL
endpoint = await this.events.request2("proxy:endpoint:get");
}
if (endpoint.startsWith('ws')) {
return new Web3.providers.WebsocketProvider(endpoint, {
headers: { Origin: constants.embarkResourceOrigin },
// TODO remove this when Geth fixes this: https://github.com/ethereum/go-ethereum/issues/16846
// Edit: This has been fixed in Geth 1.9, but we don't support 1.9 yet and still support 1.8
clientConfig: {
fragmentationThreshold: 81920
}
});
}
const web3 = new Web3(endpoint);
return web3.currentProvider;
}
}
module.exports = BlockchainClient;

View File

@ -1,5 +1,5 @@
import {Embark, Events } /* supplied by @types/embark in packages/embark-typings */ from "embark";
import {__} from "embark-i18n";
import { Embark, Events } /* supplied by @types/embark in packages/embark-typings */ from "embark";
import { __ } from "embark-i18n";
import { buildUrl, findNextPort } from "embark-utils";
import { Logger } from 'embark-logger';
import { Proxy } from "./proxy";
@ -17,13 +17,14 @@ export default class ProxyManager {
private wsPort = 0;
private ready = false;
private isWs = false;
private vms: any[];
private isVm = false;
private _endpoint: string = "";
private inited: boolean = false;
constructor(private embark: Embark, options: any) {
this.logger = embark.logger;
this.events = embark.events;
this.plugins = options.plugins;
this.vms = [];
this.host = "localhost";
@ -50,19 +51,28 @@ export default class ProxyManager {
this.events.setCommandHandler("proxy:endpoint:get", async (cb) => {
await this.onReady();
if (!this.embark.config.blockchainConfig.proxy) {
return cb(null, this.embark.config.blockchainConfig.endpoint);
cb(null, (await this.endpoint));
});
}
private get endpoint() {
return (async () => {
if (this._endpoint) {
return this._endpoint;
}
if (!this.embark.config.blockchainConfig.proxy) {
this._endpoint = this.embark.config.blockchainConfig.endpoint;
return this._endpoint;
}
await this.init();
// TODO Check if the proxy can support HTTPS, though it probably doesn't matter since it's local
if (this.isWs) {
return cb(null, buildUrl("ws", this.host, this.wsPort, "ws"));
this._endpoint = buildUrl("ws", this.host, this.wsPort, "ws");
return this._endpoint;
}
cb(null, buildUrl("http", this.host, this.rpcPort, "rpc"));
});
this.events.setCommandHandler("proxy:vm:register", (handler: any) => {
this.vms.push(handler);
});
this._endpoint = buildUrl("http", this.host, this.rpcPort, "rpc");
return this._endpoint;
})();
}
public onReady() {
@ -76,50 +86,63 @@ export default class ProxyManager {
});
}
private async init() {
if (this.inited) {
return;
}
this.inited = true;
// setup ports
const port = await findNextPort(this.embark.config.blockchainConfig.rpcPort + constants.blockchain.servicePortOnProxy);
this.rpcPort = port;
this.wsPort = port + 1;
// setup proxy details
this.isVm = this.embark.config.blockchainConfig.client === constants.blockchain.vm;
this.isWs = this.isVm || (/wss?/).test(this.embark.config.blockchainConfig.endpoint);
}
private async setupProxy(clientName: string) {
await this.init();
if (!this.embark.config.blockchainConfig.proxy) {
return;
}
if (this.httpProxy || this.wsProxy) {
throw new Error("Proxy is already started");
}
const port = await findNextPort(this.embark.config.blockchainConfig.rpcPort + constants.blockchain.servicePortOnProxy);
this.rpcPort = port;
this.wsPort = port + 1;
this.isWs = clientName === constants.blockchain.vm || (/wss?/).test(this.embark.config.blockchainConfig.endpoint);
const endpoint = this.embark.config.blockchainConfig.endpoint;
// HTTP
if (clientName !== constants.blockchain.vm) {
if (!this.isVm) {
this.httpProxy = await new Proxy({
endpoint: this.embark.config.blockchainConfig.endpoint,
endpoint,
events: this.events,
isWs: false,
logger: this.logger,
plugins: this.plugins,
vms: this.vms,
isVm: this.isVm
})
.serve(
this.host,
this.rpcPort,
);
this.logger.info(`HTTP Proxy for node endpoint ${this.embark.config.blockchainConfig.endpoint} listening on ${buildUrl("http", this.host, this.rpcPort, "rpc")}`);
.serve(
this.host,
this.rpcPort,
);
this.logger.info(`HTTP Proxy for node endpoint ${endpoint} listening on ${buildUrl("http", this.host, this.rpcPort, "rpc")}`);
}
if (this.isWs) {
const endpoint = clientName === constants.blockchain.vm ? constants.blockchain.vm : this.embark.config.blockchainConfig.endpoint;
this.wsProxy = await new Proxy({
endpoint,
events: this.events,
isWs: true,
logger: this.logger,
plugins: this.plugins,
vms: this.vms,
isVm: this.isVm
})
.serve(
this.host,
this.wsPort,
);
this.logger.info(`WS Proxy for node endpoint ${endpoint} listening on ${buildUrl("ws", this.host, this.wsPort, "ws")}`);
.serve(
this.host,
this.wsPort,
);
this.logger.info(`WS Proxy for node endpoint ${this.isVm ? 'vm' : endpoint} listening on ${buildUrl("ws", this.host, this.wsPort, "ws")}`);
}
}
private stopProxy() {

View File

@ -4,7 +4,6 @@ import express from 'express';
import expressWs from 'express-ws';
import cors from 'cors';
const Web3RequestManager = require('web3-core-requestmanager');
const Web3WsProvider = require('web3-providers-ws');
const constants = require("embark-core/constants");
const ACTION_TIMEOUT = 5000;
@ -17,33 +16,50 @@ export class Proxy {
this.timeouts = {};
this.plugins = options.plugins;
this.logger = options.logger;
this.vms = options.vms;
this.app = null;
this.endpoint = options.endpoint;
if (options.endpoint === constants.blockchain.vm) {
this.endpoint = this.vms[this.vms.length - 1]();
}
if (typeof this.endpoint === 'string' && this.endpoint.startsWith('ws')) {
this.endpoint = new Web3WsProvider(this.endpoint, {
headers: {Origin: constants.embarkResourceOrigin},
// TODO remove this when Geth fixes this: https://github.com/ethereum/go-ethereum/issues/16846
// Edit: This has been fixed in Geth 1.9, but we don't support 1.9 yet and still support 1.8
clientConfig: {
fragmentationThreshold: 81920
}
});
}
this.events = options.events;
this.isWs = options.isWs;
// used to service all non-long-living WS connections, including any
// request that is not WS and any WS request that is not an `eth_subscribe`
// RPC request
this.requestManager = new Web3RequestManager.Manager(this.endpoint);
this.isVm = options.isVm;
this.nodeSubscriptions = {};
this._requestManager = null;
this.clientName = options.isVm ? constants.blockchain.vm : constants.blockchain.ethereum;
this.events.setCommandHandler("proxy:websocket:subscribe", this.handleSubscribe.bind(this));
this.events.setCommandHandler("proxy:websocket:unsubscribe", this.handleUnsubscribe.bind(this));
}
// used to service all non-long-living WS connections, including any
// request that is not WS and any WS request that is not an `eth_subscribe`
// RPC request
get requestManager() {
return (async () => {
if (!this._requestManager) {
const provider = await this._createWebSocketProvider(this.endpoint);
this._requestManager = this._createWeb3RequestManager(provider);
}
return this._requestManager;
})();
}
async _createWebSocketProvider(endpoint) {
// if we are using a VM (ie for tests), then try to get the VM provider
if (this.isVm) {
return this.events.request2("blockchain:client:vmProvider");
}
// pass in endpoint to ensure we get a provider with a connection to the node
return this.events.request2("blockchain:client:provider", this.clientName, endpoint);
}
_createWeb3RequestManager(provider) {
return new Web3RequestManager.Manager(provider);
}
async nodeReady() {
try {
await this.requestManager.send({ method: 'eth_accounts' });
const reqMgr = await this.requestManager;
await reqMgr.send({ method: 'eth_accounts' });
} catch (e) {
throw new Error(__(`Unable to connect to the blockchain endpoint on ${this.endpoint}`));
}
@ -63,7 +79,7 @@ export class Proxy {
this.app.use(express.urlencoded({ extended: true }));
if (this.isWs) {
this.app.ws('/', async (conn, _wsReq) => {
this.app.ws('/', async (conn, wsReq) => {
conn.on('message', async (msg) => {
try {
@ -73,6 +89,7 @@ export class Proxy {
catch (err) {
const error = __('Error processing request: %s', err.message);
this.logger.error(error);
this.logger.debug(`Request causing error: ${JSON.stringify(wsReq)}`);
this.respondWs(conn, error);
}
});
@ -103,9 +120,9 @@ export class Proxy {
async processRequest(request, transport) {
// Modify request
let modifiedRequest;
const rpcRequest = request.method === "POST" ? request.body : request;
request = request.method === "POST" ? request.body : request;
try {
modifiedRequest = await this.emitActionsForRequest(rpcRequest);
modifiedRequest = await this.emitActionsForRequest(request, transport);
}
catch (reqError) {
const error = reqError.message || reqError;
@ -116,92 +133,44 @@ export class Proxy {
}
// Send the possibly modified request to the Node
const respData = { jsonrpc: "2.0", id: modifiedRequest.reqData.id };
const response = { jsonrpc: "2.0", id: modifiedRequest.request.id };
if (modifiedRequest.sendToNode !== false) {
// kill our manually created long-living connection for eth_subscribe if we have one
if (this.isWs && modifiedRequest.reqData.method === 'eth_unsubscribe') {
const id = modifiedRequest.reqData.params[0];
if (this.nodeSubscriptions[id] && this.nodeSubscriptions[id].provider && this.nodeSubscriptions[id].provider.disconnect) {
this.nodeSubscriptions[id].provider.disconnect();
}
}
// create a long-living WS connection to the node
if (this.isWs && modifiedRequest.reqData.method === 'eth_subscribe') {
// creates a new long-living connection to the node
const currentReqManager = new Web3RequestManager.Manager(this.endpoint);
// kill WS connetion to the node when the client connection closes
transport.on('close', () => currentReqManager.provider.disconnect());
// do the actual forward request to the node
currentReqManager.send(modifiedRequest.reqData, (error, result) => {
if (error) {
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.reqData.id };
return this.respondError(transport, rpcErrorObj);
}
// `result` contains our initial response from the node, ie
// subscription id. Any FUTURE data from the node that needs
// to be forwarded to the client connection will be handled
// in the `.on('data')` event below.
this.logger.debug(`Created subscription: ${result}`);
this.nodeSubscriptions[result] = currentReqManager;
respData.result = result;
// TODO: Kick off emitAcitonsForResponse here
this.respondWs(transport, respData);
});
// Watch for `eth_subscribe` subscription data coming from the node.
// Send the subscription data back across the originating client
// connection.
currentReqManager.provider.on('data', (result, deprecatedResult) => {
result = result || deprecatedResult;
this.logger.debug(`Subscription data received from node and forwarded to originating socket client connection: ${JSON.stringify(result)}`);
// TODO: Kick off emitAcitonsForResponse here
this.respondWs(transport, result);
});
return;
}
try {
const result = await this.forwardRequestToNode(modifiedRequest.reqData);
respData.result = result;
const result = await this.forwardRequestToNode(modifiedRequest.request);
response.result = result;
} catch (fwdReqErr) {
// The node responded with an error. Set up the error so that it can be
// stripped out by modifying the response (via actions for blockchain:proxy:response)
respData.error = fwdReqErr.message || fwdReqErr;
response.error = fwdReqErr.message || fwdReqErr;
}
}
try {
const modifiedResp = await this.emitActionsForResponse(modifiedRequest.reqData, respData);
const modifiedResp = await this.emitActionsForResponse(modifiedRequest.request, response, transport);
// Send back to the client
if (modifiedResp && modifiedResp.respData && modifiedResp.respData.error) {
if (modifiedResp && modifiedResp.response && modifiedResp.response.error) {
// error returned from the node and it wasn't stripped by our response actions
const error = modifiedResp.respData.error.message || modifiedResp.respData.error;
const error = modifiedResp.response.error.message || modifiedResp.response.error;
this.logger.error(__(`Error returned from the node: ${error}`));
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedResp.respData.id };
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedResp.response.id };
return this.respondError(transport, rpcErrorObj);
}
this.respondOK(transport, modifiedResp.respData);
this.respondOK(transport, modifiedResp.response);
}
catch (resError) {
// if was an error in response actions (resError), send the error in the response
const error = resError.message || resError;
this.logger.error(__(`Error executing response actions: ${error}`));
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.reqData.id };
const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.request.id };
return this.respondError(transport, rpcErrorObj);
}
}
forwardRequestToNode(reqData) {
return new Promise((resolve, reject) => {
this.requestManager.send(reqData, (fwdReqErr, result) => {
forwardRequestToNode(request) {
return new Promise(async (resolve, reject) => {
const reqMgr = await this.requestManager;
reqMgr.send(request, (fwdReqErr, result) => {
if (fwdReqErr) {
return reject(fwdReqErr);
}
@ -210,6 +179,98 @@ export class Proxy {
});
}
async handleSubscribe(clientSocket, request, response, cb) {
let currentReqManager = await this.requestManager;
if (!this.isVm) {
const provider = await this._createWebSocketProvider(this.endpoint);
// creates a new long-living connection to the node
currentReqManager = this._createWeb3RequestManager(provider);
// kill WS connetion to the node when the client connection closes
clientSocket.on('close', () => currentReqManager.provider.disconnect());
}
// do the actual forward request to the node
currentReqManager.send(request, (error, subscriptionId) => {
if (error) {
return cb(error);
}
// `result` contains our initial response from the node, ie
// subscription id. Any FUTURE data from the node that needs
// to be forwarded to the client connection will be handled
// in the `.on('data')` event below.
this.logger.debug(`Created subscription: ${subscriptionId} for ${JSON.stringify(request.params)}`);
this.logger.debug(`Subscription request: ${JSON.stringify(request)} `);
// add the websocket req manager for this subscription to memory so it
// can be referenced later
this.nodeSubscriptions[subscriptionId] = currentReqManager;
// Watch for `eth_subscribe` subscription data coming from the node.
// Send the subscription data back across the originating client
// connection.
currentReqManager.provider.on('data', async (subscriptionResponse, deprecatedResponse) => {
subscriptionResponse = subscriptionResponse || deprecatedResponse;
// filter out any subscription data that is not meant to be passed back to the client
// This is only needed when using a VM because the VM uses only one provider. When there is
// only one provider, that single provider has 'data' events subscribed to it for each `eth_subscribe`.
// When any subscription data is returned from the node, it will fire the 'data' event for ALL
// `eth_subscribe`s. This filter prevents responding to sockets unnecessarily.
if (!subscriptionResponse.params || subscriptionResponse.params.subscription !== subscriptionId) {
return;
}
this.logger.debug(`Subscription data received from node and forwarded to originating socket client connection: ${JSON.stringify(subscriptionResponse)} `);
// allow modification of the node subscription data sent to the client
subscriptionResponse = await this.emitActionsForResponse(subscriptionResponse, subscriptionResponse, clientSocket);
this.respondWs(clientSocket, subscriptionResponse.response);
});
// send a response to the original requesting inbound client socket
// (ie the browser or embark) with the result of the subscription
// request from the node
response.result = subscriptionId;
cb(null, response);
});
}
async handleUnsubscribe(request, response, cb) {
// kill our manually created long-living connection for eth_subscribe if we have one
const subscriptionId = request.params[0];
const currentReqManager = this.nodeSubscriptions[subscriptionId];
if (!currentReqManager) {
return this.logger.error(`Failed to unsubscribe from subscription '${subscriptionId}' because the proxy failed to find an active connection to the node.`);
}
// forward unsubscription request to the node
currentReqManager.send(request, (error, result) => {
if (error) {
return cb(error);
}
// `result` contains 'true' if the unsubscription request was successful
this.logger.debug(`Unsubscription result for subscription '${JSON.stringify(request.params)}': ${result} `);
this.logger.debug(`Unsubscription request: ${JSON.stringify(request)} `);
// if unsubscribe succeeded, disconnect connection and remove connection from memory
if (result === true) {
if (currentReqManager.provider && currentReqManager.provider.disconnect && !this.isVm) {
currentReqManager.provider.disconnect();
}
delete this.nodeSubscriptions[subscriptionId];
}
// result should be true/false
response.result = result;
cb(null, response);
});
}
respondWs(ws, response) {
if (typeof response === "object") {
response = JSON.stringify(response);
@ -223,7 +284,7 @@ export class Proxy {
2: "closing",
3: "closed"
};
this.logger.warn(`[Proxy]: Failed to send WebSocket response because the socket is ${stateMap[ws.readyState]}. Response: ${response}`);
this.logger.warn(`[Proxy]: Failed to send WebSocket response because the socket is ${stateMap[ws.readyState]}.Response: ${response} `);
}
respondHttp(res, statusCode, response) {
res.status(statusCode).send(response);
@ -237,22 +298,23 @@ export class Proxy {
return this.isWs ? this.respondWs(transport, response) : this.respondHttp(transport, 200, response);
}
emitActionsForRequest(body) {
emitActionsForRequest(request, transport) {
return new Promise((resolve, reject) => {
let calledBack = false;
const data = { request, isWs: this.isWs, transport };
setTimeout(() => {
if (calledBack) {
return;
}
this.logger.warn(__('Action for request "%s" timed out', body.method));
this.logger.debug(body);
this.logger.warn(__('Action for request "%s" timed out', request.method));
this.logger.debug(request);
calledBack = true;
resolve({ reqData: body });
resolve(data);
}, ACTION_TIMEOUT);
this.plugins.emitAndRunActionsForEvent('blockchain:proxy:request',
{ reqData: body },
(err, resp) => {
data,
(err, result) => {
if (calledBack) {
// Action timed out
return;
@ -261,33 +323,34 @@ export class Proxy {
this.logger.error(__('Error parsing the request in the proxy'));
this.logger.error(err);
// Reset the data to the original request so that it can be used anyway
resp = { reqData: body };
result = data;
calledBack = true;
return reject(err);
}
calledBack = true;
resolve(resp);
resolve(result);
});
});
}
emitActionsForResponse(reqData, respData) {
emitActionsForResponse(request, response, transport) {
return new Promise((resolve, reject) => {
const data = { request, response, isWs: this.isWs, transport };
let calledBack = false;
setTimeout(() => {
if (calledBack) {
return;
}
this.logger.warn(__('Action for response "%s" timed out', reqData.method));
this.logger.debug(reqData);
this.logger.debug(respData);
this.logger.warn(__('Action for response "%s" timed out', request.method));
this.logger.debug(request);
this.logger.debug(response);
calledBack = true;
resolve({ respData });
resolve(data);
}, ACTION_TIMEOUT);
this.plugins.emitAndRunActionsForEvent('blockchain:proxy:response',
{ respData, reqData },
(err, resp) => {
data,
(err, result) => {
if (calledBack) {
// Action timed out
return;
@ -296,10 +359,12 @@ export class Proxy {
this.logger.error(__('Error parsing the response in the proxy'));
this.logger.error(err);
calledBack = true;
reject(err);
// Reset the data to the original response so that it can be used anyway
result = data;
return reject(err);
}
calledBack = true;
resolve(resp);
resolve(result);
});
});
}

View File

@ -15,7 +15,7 @@ class Reporter {
wireGasUsage() {
const {events} = this.embark;
events.on('blockchain:proxy:response', (params) => {
const {result} = params.respData;
const { result } = params.response;
if (!result || !result.gasUsed) {
return;

View File

@ -477,5 +477,12 @@ embark.registerActionForEvent("deployment:contract:beforeDeploy", async (params,
- `deployment:deployContracts:afterAll`: Called after all contracts have deployed. No params
- `tests:contracts:compile:before`: Called before the contracts are compiled in the context of the test. Only param is `contractFiles`
- `tests:contracts:compile:after`: Called after the contracts are compiled in the context of the test. Only param is `compiledContracts`
- `blockchain:proxy:request`: Called before a request from Embark or the Dapp is sent to the blockchain node. You can modify or react to the payload of the request. Only param is `reqData`, an object containing the payload
- `blockchain:proxy:response`: Called before the node response is sent back to Embark or the Dapp. You can modify or react to the payload of the response. Two params, `reqData` and `respData`, objects containing the payloads
- `blockchain:proxy:request`: Called before a request from Embark or the Dapp is sent to the blockchain node. You can modify or react to the payload of the request. Params are:
- `request`: an object containing the request payload
- `transport`: an object containing the client's websocket connection to the proxy
- `isWs`: a boolean flag indicating if the request was performed using websockets
- `blockchain:proxy:response`: Called before the node response is sent back to Embark or the Dapp. You can modify or react to the payload of the response. Params are:
- `request`: an object containing the request payload
- `response`: an object containing the response payload
- `transport`: an object containing the client's websocket connection to the proxy
- `isWs`: a boolean flag indicating if the request was performed using websockets