From f6f45077e9accfba6455d0817612e4449a58e1bb Mon Sep 17 00:00:00 2001 From: emizzle Date: Mon, 11 Nov 2019 16:40:36 +1100 Subject: [PATCH] fix(@embark/proxy): Fix unsubsribe handling and add new provider When `eth_unsubscribe` is received in the proxy, ensure this request is forwarded through on the correct socket (the same socket that was used for the corresponding `eth_subscribe`). Move subscription handling for `eth_subscribe` and `eth_unsubscribe` to RpcModifiers (in `rpc-manager` package). For each `eth_subscribe` request, a new `RequestManager` is created. Since the endpoint property on the proxy class was updated to be a provider, the same provider was being assigned to each new `RequestManager` and thus creating multiple event handlers for each subscription created. To circumvent this, we are now creating a new provider for each `RequestManager`. Co-authored-by: Pascal Precht --- .../app/app/contracts/simple_storage.sol | 4 +- dapps/tests/app/test/simple_storage_spec.js | 7 +- packages/core/core/constants.json | 1 + packages/core/typings/src/embark.d.ts | 1 + packages/plugins/ganache/src/index.js | 2 +- .../rpc-manager/src/lib/eth_accounts.ts | 18 +- .../src/lib/eth_sendTransaction.ts | 20 +- .../rpc-manager/src/lib/eth_signTypedData.ts | 22 +- .../rpc-manager/src/lib/eth_subscribe.ts | 43 +++ .../rpc-manager/src/lib/eth_unsubscribe.ts | 44 +++ packages/plugins/rpc-manager/src/lib/index.ts | 11 +- .../src/lib/personal_newAccount.ts | 8 +- .../plugins/transaction-logger/src/index.js | 40 +-- packages/stack/blockchain-client/src/index.js | 64 +++-- packages/stack/proxy/src/index.ts | 87 +++--- packages/stack/proxy/src/proxy.js | 271 +++++++++++------- .../stack/test-runner/src/lib/reporter.js | 2 +- site/source/docs/plugin_reference.md | 11 +- 18 files changed, 437 insertions(+), 219 deletions(-) create mode 100644 packages/plugins/rpc-manager/src/lib/eth_subscribe.ts create mode 100644 packages/plugins/rpc-manager/src/lib/eth_unsubscribe.ts diff --git a/dapps/tests/app/app/contracts/simple_storage.sol b/dapps/tests/app/app/contracts/simple_storage.sol index 3afc8d780..5ab6275c0 100644 --- a/dapps/tests/app/app/contracts/simple_storage.sol +++ b/dapps/tests/app/app/contracts/simple_storage.sol @@ -4,7 +4,7 @@ contract SimpleStorage { uint public storedData; address public registar; address owner; - event EventOnSet2(bool passed, string message); + event EventOnSet2(bool passed, string message, uint setValue); constructor(uint initialValue) public { storedData = initialValue; @@ -20,7 +20,7 @@ contract SimpleStorage { function set2(uint x) public { storedData = x; - emit EventOnSet2(true, "hi"); + emit EventOnSet2(true, "hi", x); } function set3(uint x) public { diff --git a/dapps/tests/app/test/simple_storage_spec.js b/dapps/tests/app/test/simple_storage_spec.js index d03df3d24..ed04802a8 100644 --- a/dapps/tests/app/test/simple_storage_spec.js +++ b/dapps/tests/app/test/simple_storage_spec.js @@ -51,10 +51,9 @@ contract("SimpleStorage", function() { }); it('listens to events', function(done) { - SimpleStorage.once('EventOnSet2', async function(error, _result) { + SimpleStorage.once('EventOnSet2', async function(error, result) { assert.strictEqual(error, null); - let result = await SimpleStorage.methods.get().call(); - assert.strictEqual(parseInt(result, 10), 150); + assert.strictEqual(parseInt(result.returnValues.setValue, 10), 150); done(error); }); @@ -63,7 +62,7 @@ contract("SimpleStorage", function() { it('asserts event triggered', async function() { const tx = await SimpleStorage.methods.set2(160).send(); - assert.eventEmitted(tx, 'EventOnSet2', {passed: true, message: "hi"}); + assert.eventEmitted(tx, 'EventOnSet2', {passed: true, message: "hi", setValue: "160"}); }); it("should revert with a value lower than 5", async function() { diff --git a/packages/core/core/constants.json b/packages/core/core/constants.json index d2afd9ebe..9b009e35d 100644 --- a/packages/core/core/constants.json +++ b/packages/core/core/constants.json @@ -35,6 +35,7 @@ "webpackDone": "webpackDone" }, "blockchain": { + "ethereum": "ethereum", "vm": "vm", "call": "eth_call", "clients": { diff --git a/packages/core/typings/src/embark.d.ts b/packages/core/typings/src/embark.d.ts index 4ce2ee85e..440f89b10 100644 --- a/packages/core/typings/src/embark.d.ts +++ b/packages/core/typings/src/embark.d.ts @@ -53,6 +53,7 @@ export interface Config { rpcCorsDomain: string; wsRPC: boolean; isDev: boolean; + client: string; }; webServerConfig: { certOptions: { diff --git a/packages/plugins/ganache/src/index.js b/packages/plugins/ganache/src/index.js index 7653a580f..ebbc3b8aa 100644 --- a/packages/plugins/ganache/src/index.js +++ b/packages/plugins/ganache/src/index.js @@ -1,6 +1,6 @@ class Ganache { constructor(embark) { - embark.events.request('proxy:vm:register', () => { + embark.events.request('blockchain:vm:register', () => { const ganache = require('ganache-cli'); return ganache.provider(); }); diff --git a/packages/plugins/rpc-manager/src/lib/eth_accounts.ts b/packages/plugins/rpc-manager/src/lib/eth_accounts.ts index 463d0f246..29105bd01 100644 --- a/packages/plugins/rpc-manager/src/lib/eth_accounts.ts +++ b/packages/plugins/rpc-manager/src/lib/eth_accounts.ts @@ -21,30 +21,30 @@ export default class EthAccounts extends RpcModifier { constructor(embark: Embark, rpcModifierEvents: Events) { super(embark, rpcModifierEvents); - this.embark.registerActionForEvent("blockchain:proxy:response", this.checkResponseFor_eth_accounts.bind(this)); + this.embark.registerActionForEvent("blockchain:proxy:response", this.ethAccountsResponse.bind(this)); } - private async checkResponseFor_eth_accounts(params: any, callback: Callback) { + private async ethAccountsResponse(params: any, callback: Callback) { - if (!(METHODS_TO_MODIFY.includes(params.reqData.method))) { + if (!(METHODS_TO_MODIFY.includes(params.request.method))) { return callback(null, params); } - this.logger.trace(__(`Modifying blockchain '${params.reqData.method}' response:`)); - this.logger.trace(__(`Original request/response data: ${JSON.stringify(params)}`)); + this.logger.trace(__(`Modifying blockchain '${params.request.method}' response:`)); + this.logger.trace(__(`Original request/response data: ${JSON.stringify({ request: params.request, response: params.response })}`)); try { - if (!arrayEqual(params.respData.result, this._nodeAccounts || [])) { + if (!arrayEqual(params.response.result, this._nodeAccounts || [])) { // reset backing variables so accounts is recalculated - await this.rpcModifierEvents.request2("nodeAccounts:updated", params.respData.result); + await this.rpcModifierEvents.request2("nodeAccounts:updated", params.response.result); } const accounts = await this.accounts; if (!(accounts && accounts.length)) { return callback(null, params); } - params.respData.result = accounts.map((acc) => acc.address || acc); - this.logger.trace(__(`Modified request/response data: ${JSON.stringify(params)}`)); + params.response.result = accounts.map((acc) => acc.address || acc); + this.logger.trace(__(`Modified request/response data: ${JSON.stringify({ request: params.request, response: params.response })}`)); } catch (err) { return callback(err); } diff --git a/packages/plugins/rpc-manager/src/lib/eth_sendTransaction.ts b/packages/plugins/rpc-manager/src/lib/eth_sendTransaction.ts index 6456b4fec..f2b129d43 100644 --- a/packages/plugins/rpc-manager/src/lib/eth_sendTransaction.ts +++ b/packages/plugins/rpc-manager/src/lib/eth_sendTransaction.ts @@ -11,7 +11,7 @@ export default class EthSendTransaction extends RpcModifier { constructor(embark: Embark, rpcModifierEvents: Events) { super(embark, rpcModifierEvents); - embark.registerActionForEvent("blockchain:proxy:request", this.checkRequestFor_eth_sendTransaction.bind(this)); + embark.registerActionForEvent("blockchain:proxy:request", this.ethSendTransactionRequest.bind(this)); // Allow to run transaction in parallel by resolving the nonce manually. // For each transaction, resolve the nonce by taking the max of current transaction count and the cache we keep locally. @@ -52,8 +52,8 @@ export default class EthSendTransaction extends RpcModifier { callback(null, this.nonceCache[address]); }); } - private async checkRequestFor_eth_sendTransaction(params: any, callback: Callback) { - if (!(params.reqData.method === blockchainConstants.transactionMethods.eth_sendTransaction)) { + private async ethSendTransactionRequest(params: any, callback: Callback) { + if (!(params.request.method === blockchainConstants.transactionMethods.eth_sendTransaction)) { return callback(null, params); } const accounts = await this.accounts; @@ -61,26 +61,26 @@ export default class EthSendTransaction extends RpcModifier { return callback(null, params); } - this.logger.trace(__(`Modifying blockchain '${params.reqData.method}' request:`)); - this.logger.trace(__(`Original request data: ${JSON.stringify(params)}`)); + this.logger.trace(__(`Modifying blockchain '${params.request.method}' request:`)); + this.logger.trace(__(`Original request data: ${JSON.stringify({ request: params.request, response: params.response })}`)); try { // Check if we have that account in our wallet - const account = accounts.find((acc) => Web3.utils.toChecksumAddress(acc.address) === Web3.utils.toChecksumAddress(params.reqData.params[0].from)); + const account = accounts.find((acc) => Web3.utils.toChecksumAddress(acc.address) === Web3.utils.toChecksumAddress(params.request.params[0].from)); if (account && account.privateKey) { - return this.signTransactionQueue.push({ payload: params.reqData.params[0], account }, (err: any, newPayload: any) => { + return this.signTransactionQueue.push({ payload: params.request.params[0], account }, (err: any, newPayload: any) => { if (err) { return callback(err, null); } - params.reqData.method = blockchainConstants.transactionMethods.eth_sendRawTransaction; - params.reqData.params = [newPayload]; + params.request.method = blockchainConstants.transactionMethods.eth_sendRawTransaction; + params.request.params = [newPayload]; callback(err, params); }); } } catch (err) { return callback(err); } - this.logger.trace(__(`Modified request/response data: ${JSON.stringify(params)}`)); + this.logger.trace(__(`Modified request/response data: ${JSON.stringify({ request: params.request, response: params.response })}`)); callback(null, params); } } diff --git a/packages/plugins/rpc-manager/src/lib/eth_signTypedData.ts b/packages/plugins/rpc-manager/src/lib/eth_signTypedData.ts index cb9f73ab8..7f5746d14 100644 --- a/packages/plugins/rpc-manager/src/lib/eth_signTypedData.ts +++ b/packages/plugins/rpc-manager/src/lib/eth_signTypedData.ts @@ -8,40 +8,40 @@ export default class EthSignTypedData extends RpcModifier { constructor(embark: Embark, rpcModifierEvents: Events) { super(embark, rpcModifierEvents); - this.embark.registerActionForEvent("blockchain:proxy:request", this.checkRequestFor_eth_signTypedData.bind(this)); - this.embark.registerActionForEvent("blockchain:proxy:response", this.checkResponseFor_eth_signTypedData.bind(this)); + this.embark.registerActionForEvent("blockchain:proxy:request", this.ethSignTypedDataRequest.bind(this)); + this.embark.registerActionForEvent("blockchain:proxy:response", this.ethSignTypedDataResponse.bind(this)); } - private async checkRequestFor_eth_signTypedData(params: any, callback: Callback) { + private async ethSignTypedDataRequest(params: any, callback: Callback) { // check for: // - eth_signTypedData // - eth_signTypedData_v3 // - eth_signTypedData_v4 // - personal_signTypedData (parity) - if (params.reqData.method.includes("signTypedData")) { + if (params.request.method.includes("signTypedData")) { // indicate that we do not want this call to go to the node params.sendToNode = false; return callback(null, params); } callback(null, params); } - private async checkResponseFor_eth_signTypedData(params: any, callback: Callback) { + private async ethSignTypedDataResponse(params: any, callback: Callback) { // check for: // - eth_signTypedData // - eth_signTypedData_v3 // - eth_signTypedData_v4 // - personal_signTypedData (parity) - if (!params.reqData.method.includes("signTypedData")) { + if (!params.request.method.includes("signTypedData")) { return callback(null, params); } - this.logger.trace(__(`Modifying blockchain '${params.reqData.method}' response:`)); - this.logger.trace(__(`Original request/response data: ${JSON.stringify(params)}`)); + this.logger.trace(__(`Modifying blockchain '${params.request.method}' response:`)); + this.logger.trace(__(`Original request/response data: ${JSON.stringify({ request: params.request, response: params.response })}`)); try { const accounts = await this.accounts; - const [fromAddr, typedData] = params.reqData.params; + const [fromAddr, typedData] = params.request.params; const account = accounts.find((acc) => Web3.utils.toChecksumAddress(acc.address) === Web3.utils.toChecksumAddress(fromAddr)); if (!(account && account.privateKey)) { return callback( @@ -51,8 +51,8 @@ export default class EthSignTypedData extends RpcModifier { const toSign = transaction.getToSignHash(typeof typedData === "string" ? JSON.parse(typedData) : typedData); const signature = sign(toSign, [account.privateKey]); - params.respData.result = signature[0]; - this.logger.trace(__(`Modified request/response data: ${JSON.stringify(params)}`)); + params.response.result = signature[0]; + this.logger.trace(__(`Modified request/response data: ${JSON.stringify({ request: params.request, response: params.response })}`)); } catch (err) { return callback(err); } diff --git a/packages/plugins/rpc-manager/src/lib/eth_subscribe.ts b/packages/plugins/rpc-manager/src/lib/eth_subscribe.ts new file mode 100644 index 000000000..24b485dd2 --- /dev/null +++ b/packages/plugins/rpc-manager/src/lib/eth_subscribe.ts @@ -0,0 +1,43 @@ +import { Callback, Embark, Events } /* supplied by @types/embark in packages/embark-typings */ from "embark"; +import { __ } from "embark-i18n"; +import RpcModifier from "./rpcModifier"; + +export default class EthSubscribe extends RpcModifier { + constructor(embark: Embark, rpcModifierEvents: Events) { + super(embark, rpcModifierEvents); + + embark.registerActionForEvent("blockchain:proxy:request", this.ethSubscribeRequest.bind(this)); + embark.registerActionForEvent("blockchain:proxy:response", this.ethSubscribeResponse.bind(this)); + } + + private async ethSubscribeRequest(params: any, callback: Callback) { + // check for eth_subscribe and websockets + if (params.isWs && params.request.method === "eth_subscribe") { + // indicate that we do not want this call to go to the node + params.sendToNode = false; + return callback(null, params); + } + callback(null, params); + } + private async ethSubscribeResponse(params: any, callback: Callback) { + + const { isWs, transport, request, response } = params; + + // check for eth_subscribe and websockets + if (!(isWs && request.method.includes("eth_subscribe"))) { + return callback(null, params); + } + + this.logger.trace(__(`Modifying blockchain '${request.method}' response:`)); + this.logger.trace(__(`Original request/response data: ${JSON.stringify({ request, response })}`)); + + try { + const nodeResponse = await this.events.request2("proxy:websocket:subscribe", transport, request, response); + params.response = nodeResponse; + this.logger.trace(__(`Modified request/response data: ${JSON.stringify({ request, response: params.response })}`)); + } catch (err) { + return callback(err); + } + callback(null, params); + } +} diff --git a/packages/plugins/rpc-manager/src/lib/eth_unsubscribe.ts b/packages/plugins/rpc-manager/src/lib/eth_unsubscribe.ts new file mode 100644 index 000000000..35042e3a3 --- /dev/null +++ b/packages/plugins/rpc-manager/src/lib/eth_unsubscribe.ts @@ -0,0 +1,44 @@ +import { Callback, Embark, Events } /* supplied by @types/embark in packages/embark-typings */ from "embark"; +import { __ } from "embark-i18n"; +import RpcModifier from "./rpcModifier"; + +export default class EthUnsubscribe extends RpcModifier { + + constructor(embark: Embark, rpcModifierEvents: Events) { + super(embark, rpcModifierEvents); + + embark.registerActionForEvent("blockchain:proxy:request", this.ethUnsubscribeRequest.bind(this)); + embark.registerActionForEvent("blockchain:proxy:response", this.ethUnsubscribeResponse.bind(this)); + } + + private async ethUnsubscribeRequest(params: any, callback: Callback) { + // check for eth_subscribe and websockets + if (params.isWs && params.request.method === "eth_unsubscribe") { + // indicate that we do not want this call to go to the node + params.sendToNode = false; + return callback(null, params); + } + callback(null, params); + } + private async ethUnsubscribeResponse(params: any, callback: Callback) { + + const { isWs, request, response } = params; + + // check for eth_subscribe and websockets + if (!(isWs && request.method.includes("eth_unsubscribe"))) { + return callback(null, params); + } + + this.logger.trace(__(`Modifying blockchain '${request.method}' response:`)); + this.logger.trace(__(`Original request/response data: ${JSON.stringify({ request, response })}`)); + + try { + const nodeResponse = await this.events.request2("proxy:websocket:unsubscribe", request, response); + params.response = nodeResponse; + this.logger.trace(__(`Modified request/response data: ${JSON.stringify({ request, response: params.response })}`)); + } catch (err) { + return callback(err); + } + callback(null, params); + } +} diff --git a/packages/plugins/rpc-manager/src/lib/index.ts b/packages/plugins/rpc-manager/src/lib/index.ts index b3faa9cef..bb31e2b70 100644 --- a/packages/plugins/rpc-manager/src/lib/index.ts +++ b/packages/plugins/rpc-manager/src/lib/index.ts @@ -4,6 +4,8 @@ import Web3 from "web3"; import EthAccounts from "./eth_accounts"; import EthSendTransaction from "./eth_sendTransaction"; import EthSignTypedData from "./eth_signTypedData"; +import EthSubscribe from "./eth_subscribe"; +import EthUnsubscribe from "./eth_unsubscribe"; import PersonalNewAccount from "./personal_newAccount"; import RpcModifier from "./rpcModifier"; @@ -34,7 +36,14 @@ export default class RpcManager { } return this.updateAccounts(this._nodeAccounts, cb); }); - this.modifiers = [PersonalNewAccount, EthAccounts, EthSendTransaction, EthSignTypedData].map((rpcModifier) => new rpcModifier(this.embark, this.rpcModifierEvents)); + this.modifiers = [ + PersonalNewAccount, + EthAccounts, + EthSendTransaction, + EthSignTypedData, + EthSubscribe, + EthUnsubscribe + ].map((rpcModifier) => new rpcModifier(this.embark, this.rpcModifierEvents)); } private async updateAccounts(updatedNodeAccounts: any[], cb: Callback) { for (const modifier of this.modifiers) { diff --git a/packages/plugins/rpc-manager/src/lib/personal_newAccount.ts b/packages/plugins/rpc-manager/src/lib/personal_newAccount.ts index a12304cf0..60d032035 100644 --- a/packages/plugins/rpc-manager/src/lib/personal_newAccount.ts +++ b/packages/plugins/rpc-manager/src/lib/personal_newAccount.ts @@ -7,16 +7,16 @@ export default class PersonalNewAccount extends RpcModifier { constructor(embark: Embark, rpcModifierEvents: Events) { super(embark, rpcModifierEvents); - embark.registerActionForEvent("blockchain:proxy:response", this.checkResponseFor_personal_newAccount.bind(this)); + embark.registerActionForEvent("blockchain:proxy:response", this.personalNewAccountResponse.bind(this)); } - private async checkResponseFor_personal_newAccount(params: any, callback: Callback) { - if (params.reqData.method !== blockchainConstants.transactionMethods.personal_newAccount) { + private async personalNewAccountResponse(params: any, callback: Callback) { + if (params.request.method !== blockchainConstants.transactionMethods.personal_newAccount) { return callback(null, params); } // emit event so tx modifiers can refresh accounts - await this.rpcModifierEvents.request2("nodeAccounts:added", params.respData.result); + await this.rpcModifierEvents.request2("nodeAccounts:added", params.response.result); callback(null, params); } diff --git a/packages/plugins/transaction-logger/src/index.js b/packages/plugins/transaction-logger/src/index.js index ffe8a183c..6b00aa04d 100644 --- a/packages/plugins/transaction-logger/src/index.js +++ b/packages/plugins/transaction-logger/src/index.js @@ -1,9 +1,9 @@ const async = require('async'); -import {__} from 'embark-i18n'; +import { __ } from 'embark-i18n'; const Web3 = require('web3'); -const {blockchain: blockchainConstants} = require('embark-core/constants'); -import {dappPath, getAddressToContract, getTransactionParams, hexToNumber} from 'embark-utils'; +const { blockchain: blockchainConstants } = require('embark-core/constants'); +import { dappPath, getAddressToContract, getTransactionParams, hexToNumber } from 'embark-utils'; const Transaction = require('ethereumjs-tx'); @@ -98,23 +98,23 @@ class TransactionLogger { } async _onLogRequest(args) { - const method = args.reqData.method; + const method = args.request.method; if (!this.contractsDeployed || !LISTENED_METHODS.includes(method)) { return; } if (method === blockchainConstants.transactionMethods.eth_sendTransaction) { // We just gather data and wait for the receipt - this.transactions[args.respData.result] = { - address: args.reqData.params[0].to, - data: args.reqData.params[0].data, - txHash: args.respData.result + this.transactions[args.response.result] = { + address: args.request.params[0].to, + data: args.request.params[0].data, + txHash: args.response.result }; return; } else if (method === blockchainConstants.transactionMethods.eth_sendRawTransaction) { - const rawData = Buffer.from(ethUtil.stripHexPrefix(args.reqData.params[0]), 'hex'); + const rawData = Buffer.from(ethUtil.stripHexPrefix(args.request.params[0]), 'hex'); const tx = new Transaction(rawData, 'hex'); - this.transactions[args.respData.result] = { + this.transactions[args.response.result] = { address: '0x' + tx.to.toString('hex'), data: '0x' + tx.data.toString('hex') }; @@ -123,22 +123,22 @@ class TransactionLogger { let dataObject; if (method === blockchainConstants.transactionMethods.eth_getTransactionReceipt) { - dataObject = args.respData.result; + dataObject = args.response.result; if (!dataObject) { return; } - if (this.transactions[args.respData.result.transactionHash]) { + if (this.transactions[args.response.result.transactionHash]) { // This is the normal case. If we don't get here, it's because we missed a TX - dataObject = Object.assign(dataObject, this.transactions[args.respData.result.transactionHash]); - delete this.transactions[args.respData.result.transactionHash]; // No longer needed + dataObject = Object.assign(dataObject, this.transactions[args.response.result.transactionHash]); + delete this.transactions[args.response.result.transactionHash]; // No longer needed } else { // Was not a eth_getTransactionReceipt in the context of a transaction return; } } else { - dataObject = args.reqData.params[0]; + dataObject = args.request.params[0]; } - const {to: address, data} = dataObject; + const { to: address, data } = dataObject; if (!address) { // It's a deployment return; @@ -152,7 +152,7 @@ class TransactionLogger { }); } - const {name, silent} = contract; + const { name, silent } = contract; if (silent && !this.outputDone) { return; } @@ -169,12 +169,12 @@ class TransactionLogger { } if (method === blockchainConstants.transactionMethods.eth_call) { - const log = Object.assign({}, args, {name, functionName, paramString}); + const log = Object.assign({}, args, { name, functionName, paramString }); log.status = '0x1'; return this.events.emit('contracts:log', log); } - let {transactionHash, blockNumber, gasUsed, status} = args.respData.result; + let { transactionHash, blockNumber, gasUsed, status } = args.response.result; let reason; if (status !== '0x0' && status !== '0x1') { status = !status ? '0x0' : '0x1'; @@ -192,7 +192,7 @@ class TransactionLogger { gasUsed = hexToNumber(gasUsed); blockNumber = hexToNumber(blockNumber); - const log = Object.assign({}, args, {name, functionName, paramString, gasUsed, blockNumber, reason, status, transactionHash}); + const log = Object.assign({}, args, { name, functionName, paramString, gasUsed, blockNumber, reason, status, transactionHash }); this.events.emit('contracts:log', log); this.logger.info(`Blockchain>`.underline + ` ${name}.${functionName}(${paramString})`.bold + ` | ${transactionHash} | gas:${gasUsed} | blk:${blockNumber} | status:${status}${reason ? ` | reason: "${reason}"` : ''}`); diff --git a/packages/stack/blockchain-client/src/index.js b/packages/stack/blockchain-client/src/index.js index f7bbb19a5..e269dc108 100644 --- a/packages/stack/blockchain-client/src/index.js +++ b/packages/stack/blockchain-client/src/index.js @@ -9,30 +9,35 @@ class BlockchainClient { this.blockchainClients = {}; this.client = null; + this.vms = []; this.events.setCommandHandler("blockchain:client:register", (clientName, blockchainClient) => { this.blockchainClients[clientName] = blockchainClient; this.client = blockchainClient; }); + this.events.setCommandHandler("blockchain:vm:register", (handler) => { + this.vms.push(handler()); + }); // TODO: unclear currently if this belongs here so it's a bit hardcoded for now - this.events.setCommandHandler("blockchain:client:provider", (clientName, cb) => { - this.events.request("proxy:endpoint:get", (err, endpoint) => { - if (err) { - return cb(err); - } - if (endpoint.startsWith('ws')) { - return cb(null, new Web3.providers.WebsocketProvider(endpoint, { - headers: {Origin: constants.embarkResourceOrigin}, - // TODO remove this when Geth fixes this: https://github.com/ethereum/go-ethereum/issues/16846 - // Edit: This has been fixed in Geth 1.9, but we don't support 1.9 yet and still support 1.8 - clientConfig: { - fragmentationThreshold: 81920 - } - })); - } - const web3 = new Web3(endpoint); - cb(null, web3.currentProvider); - }); + this.events.setCommandHandler("blockchain:client:vmProvider", async (cb) => { + if (!this.vms.length) { + return cb(`Failed to get the VM provider. Please register one using 'blockchain:vm:register', or by ensuring the 'embark-ganache' package is registered.`); + } + return cb(null, this.vms[this.vms.length - 1]); + }); + this.events.setCommandHandler("blockchain:client:provider", async (clientName, endpoint, cb) => { + if (!cb && typeof endpoint === "function") { + cb = endpoint; + endpoint = null; + } + let provider; + try { + provider = await this._getProvider(clientName, endpoint); + } + catch (err) { + return cb(`Error getting provider: ${err.message || err}`); + } + cb(null, provider); }); // TODO: maybe not the ideal event to listen to? @@ -46,7 +51,28 @@ class BlockchainClient { // set default account }); } - + async _getProvider(clientName, endpoint) { + // Passing in an endpoint allows us to customise which URL the provider connects to. + // If no endpoint is provided, the provider will connect to the proxy. + // Explicity setting an endpoint is useful for cases where we want to connect directly + // to the node (ie in the proxy). + if (!endpoint) { + // will return the proxy URL + endpoint = await this.events.request2("proxy:endpoint:get"); + } + if (endpoint.startsWith('ws')) { + return new Web3.providers.WebsocketProvider(endpoint, { + headers: { Origin: constants.embarkResourceOrigin }, + // TODO remove this when Geth fixes this: https://github.com/ethereum/go-ethereum/issues/16846 + // Edit: This has been fixed in Geth 1.9, but we don't support 1.9 yet and still support 1.8 + clientConfig: { + fragmentationThreshold: 81920 + } + }); + } + const web3 = new Web3(endpoint); + return web3.currentProvider; + } } module.exports = BlockchainClient; diff --git a/packages/stack/proxy/src/index.ts b/packages/stack/proxy/src/index.ts index 9d0dba921..3cab45a7e 100644 --- a/packages/stack/proxy/src/index.ts +++ b/packages/stack/proxy/src/index.ts @@ -1,5 +1,5 @@ -import {Embark, Events } /* supplied by @types/embark in packages/embark-typings */ from "embark"; -import {__} from "embark-i18n"; +import { Embark, Events } /* supplied by @types/embark in packages/embark-typings */ from "embark"; +import { __ } from "embark-i18n"; import { buildUrl, findNextPort } from "embark-utils"; import { Logger } from 'embark-logger'; import { Proxy } from "./proxy"; @@ -17,13 +17,14 @@ export default class ProxyManager { private wsPort = 0; private ready = false; private isWs = false; - private vms: any[]; + private isVm = false; + private _endpoint: string = ""; + private inited: boolean = false; constructor(private embark: Embark, options: any) { this.logger = embark.logger; this.events = embark.events; this.plugins = options.plugins; - this.vms = []; this.host = "localhost"; @@ -50,19 +51,28 @@ export default class ProxyManager { this.events.setCommandHandler("proxy:endpoint:get", async (cb) => { await this.onReady(); - if (!this.embark.config.blockchainConfig.proxy) { - return cb(null, this.embark.config.blockchainConfig.endpoint); + cb(null, (await this.endpoint)); + }); + } + + private get endpoint() { + return (async () => { + if (this._endpoint) { + return this._endpoint; } + if (!this.embark.config.blockchainConfig.proxy) { + this._endpoint = this.embark.config.blockchainConfig.endpoint; + return this._endpoint; + } + await this.init(); // TODO Check if the proxy can support HTTPS, though it probably doesn't matter since it's local if (this.isWs) { - return cb(null, buildUrl("ws", this.host, this.wsPort, "ws")); + this._endpoint = buildUrl("ws", this.host, this.wsPort, "ws"); + return this._endpoint; } - cb(null, buildUrl("http", this.host, this.rpcPort, "rpc")); - }); - - this.events.setCommandHandler("proxy:vm:register", (handler: any) => { - this.vms.push(handler); - }); + this._endpoint = buildUrl("http", this.host, this.rpcPort, "rpc"); + return this._endpoint; + })(); } public onReady() { @@ -76,50 +86,63 @@ export default class ProxyManager { }); } + private async init() { + if (this.inited) { + return; + } + this.inited = true; + + // setup ports + const port = await findNextPort(this.embark.config.blockchainConfig.rpcPort + constants.blockchain.servicePortOnProxy); + this.rpcPort = port; + this.wsPort = port + 1; + + // setup proxy details + this.isVm = this.embark.config.blockchainConfig.client === constants.blockchain.vm; + this.isWs = this.isVm || (/wss?/).test(this.embark.config.blockchainConfig.endpoint); + } + private async setupProxy(clientName: string) { + await this.init(); if (!this.embark.config.blockchainConfig.proxy) { return; } if (this.httpProxy || this.wsProxy) { throw new Error("Proxy is already started"); } - const port = await findNextPort(this.embark.config.blockchainConfig.rpcPort + constants.blockchain.servicePortOnProxy); - this.rpcPort = port; - this.wsPort = port + 1; - this.isWs = clientName === constants.blockchain.vm || (/wss?/).test(this.embark.config.blockchainConfig.endpoint); + const endpoint = this.embark.config.blockchainConfig.endpoint; // HTTP - if (clientName !== constants.blockchain.vm) { + if (!this.isVm) { this.httpProxy = await new Proxy({ - endpoint: this.embark.config.blockchainConfig.endpoint, + endpoint, events: this.events, isWs: false, logger: this.logger, plugins: this.plugins, - vms: this.vms, + isVm: this.isVm }) - .serve( - this.host, - this.rpcPort, - ); - this.logger.info(`HTTP Proxy for node endpoint ${this.embark.config.blockchainConfig.endpoint} listening on ${buildUrl("http", this.host, this.rpcPort, "rpc")}`); + .serve( + this.host, + this.rpcPort, + ); + this.logger.info(`HTTP Proxy for node endpoint ${endpoint} listening on ${buildUrl("http", this.host, this.rpcPort, "rpc")}`); } if (this.isWs) { - const endpoint = clientName === constants.blockchain.vm ? constants.blockchain.vm : this.embark.config.blockchainConfig.endpoint; this.wsProxy = await new Proxy({ endpoint, events: this.events, isWs: true, logger: this.logger, plugins: this.plugins, - vms: this.vms, + isVm: this.isVm }) - .serve( - this.host, - this.wsPort, - ); - this.logger.info(`WS Proxy for node endpoint ${endpoint} listening on ${buildUrl("ws", this.host, this.wsPort, "ws")}`); + .serve( + this.host, + this.wsPort, + ); + this.logger.info(`WS Proxy for node endpoint ${this.isVm ? 'vm' : endpoint} listening on ${buildUrl("ws", this.host, this.wsPort, "ws")}`); } } private stopProxy() { diff --git a/packages/stack/proxy/src/proxy.js b/packages/stack/proxy/src/proxy.js index 38018e4a1..5ded605d4 100644 --- a/packages/stack/proxy/src/proxy.js +++ b/packages/stack/proxy/src/proxy.js @@ -4,7 +4,6 @@ import express from 'express'; import expressWs from 'express-ws'; import cors from 'cors'; const Web3RequestManager = require('web3-core-requestmanager'); -const Web3WsProvider = require('web3-providers-ws'); const constants = require("embark-core/constants"); const ACTION_TIMEOUT = 5000; @@ -17,33 +16,50 @@ export class Proxy { this.timeouts = {}; this.plugins = options.plugins; this.logger = options.logger; - this.vms = options.vms; this.app = null; this.endpoint = options.endpoint; - if (options.endpoint === constants.blockchain.vm) { - this.endpoint = this.vms[this.vms.length - 1](); - } - if (typeof this.endpoint === 'string' && this.endpoint.startsWith('ws')) { - this.endpoint = new Web3WsProvider(this.endpoint, { - headers: {Origin: constants.embarkResourceOrigin}, - // TODO remove this when Geth fixes this: https://github.com/ethereum/go-ethereum/issues/16846 - // Edit: This has been fixed in Geth 1.9, but we don't support 1.9 yet and still support 1.8 - clientConfig: { - fragmentationThreshold: 81920 - } - }); - } + this.events = options.events; this.isWs = options.isWs; - // used to service all non-long-living WS connections, including any - // request that is not WS and any WS request that is not an `eth_subscribe` - // RPC request - this.requestManager = new Web3RequestManager.Manager(this.endpoint); + this.isVm = options.isVm; this.nodeSubscriptions = {}; + this._requestManager = null; + + this.clientName = options.isVm ? constants.blockchain.vm : constants.blockchain.ethereum; + + this.events.setCommandHandler("proxy:websocket:subscribe", this.handleSubscribe.bind(this)); + this.events.setCommandHandler("proxy:websocket:unsubscribe", this.handleUnsubscribe.bind(this)); + } + + // used to service all non-long-living WS connections, including any + // request that is not WS and any WS request that is not an `eth_subscribe` + // RPC request + get requestManager() { + return (async () => { + if (!this._requestManager) { + const provider = await this._createWebSocketProvider(this.endpoint); + this._requestManager = this._createWeb3RequestManager(provider); + } + return this._requestManager; + })(); + } + + async _createWebSocketProvider(endpoint) { + // if we are using a VM (ie for tests), then try to get the VM provider + if (this.isVm) { + return this.events.request2("blockchain:client:vmProvider"); + } + // pass in endpoint to ensure we get a provider with a connection to the node + return this.events.request2("blockchain:client:provider", this.clientName, endpoint); + } + + _createWeb3RequestManager(provider) { + return new Web3RequestManager.Manager(provider); } async nodeReady() { try { - await this.requestManager.send({ method: 'eth_accounts' }); + const reqMgr = await this.requestManager; + await reqMgr.send({ method: 'eth_accounts' }); } catch (e) { throw new Error(__(`Unable to connect to the blockchain endpoint on ${this.endpoint}`)); } @@ -63,7 +79,7 @@ export class Proxy { this.app.use(express.urlencoded({ extended: true })); if (this.isWs) { - this.app.ws('/', async (conn, _wsReq) => { + this.app.ws('/', async (conn, wsReq) => { conn.on('message', async (msg) => { try { @@ -73,6 +89,7 @@ export class Proxy { catch (err) { const error = __('Error processing request: %s', err.message); this.logger.error(error); + this.logger.debug(`Request causing error: ${JSON.stringify(wsReq)}`); this.respondWs(conn, error); } }); @@ -103,9 +120,9 @@ export class Proxy { async processRequest(request, transport) { // Modify request let modifiedRequest; - const rpcRequest = request.method === "POST" ? request.body : request; + request = request.method === "POST" ? request.body : request; try { - modifiedRequest = await this.emitActionsForRequest(rpcRequest); + modifiedRequest = await this.emitActionsForRequest(request, transport); } catch (reqError) { const error = reqError.message || reqError; @@ -116,92 +133,44 @@ export class Proxy { } // Send the possibly modified request to the Node - const respData = { jsonrpc: "2.0", id: modifiedRequest.reqData.id }; + const response = { jsonrpc: "2.0", id: modifiedRequest.request.id }; if (modifiedRequest.sendToNode !== false) { - // kill our manually created long-living connection for eth_subscribe if we have one - if (this.isWs && modifiedRequest.reqData.method === 'eth_unsubscribe') { - const id = modifiedRequest.reqData.params[0]; - if (this.nodeSubscriptions[id] && this.nodeSubscriptions[id].provider && this.nodeSubscriptions[id].provider.disconnect) { - this.nodeSubscriptions[id].provider.disconnect(); - } - } - // create a long-living WS connection to the node - if (this.isWs && modifiedRequest.reqData.method === 'eth_subscribe') { - - // creates a new long-living connection to the node - const currentReqManager = new Web3RequestManager.Manager(this.endpoint); - - // kill WS connetion to the node when the client connection closes - transport.on('close', () => currentReqManager.provider.disconnect()); - - // do the actual forward request to the node - currentReqManager.send(modifiedRequest.reqData, (error, result) => { - if (error) { - const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.reqData.id }; - return this.respondError(transport, rpcErrorObj); - } - // `result` contains our initial response from the node, ie - // subscription id. Any FUTURE data from the node that needs - // to be forwarded to the client connection will be handled - // in the `.on('data')` event below. - this.logger.debug(`Created subscription: ${result}`); - this.nodeSubscriptions[result] = currentReqManager; - respData.result = result; - // TODO: Kick off emitAcitonsForResponse here - this.respondWs(transport, respData); - }); - - // Watch for `eth_subscribe` subscription data coming from the node. - // Send the subscription data back across the originating client - // connection. - currentReqManager.provider.on('data', (result, deprecatedResult) => { - result = result || deprecatedResult; - this.logger.debug(`Subscription data received from node and forwarded to originating socket client connection: ${JSON.stringify(result)}`); - // TODO: Kick off emitAcitonsForResponse here - this.respondWs(transport, result); - }); - - return; - } - try { - const result = await this.forwardRequestToNode(modifiedRequest.reqData); - respData.result = result; + const result = await this.forwardRequestToNode(modifiedRequest.request); + response.result = result; } catch (fwdReqErr) { // The node responded with an error. Set up the error so that it can be // stripped out by modifying the response (via actions for blockchain:proxy:response) - respData.error = fwdReqErr.message || fwdReqErr; + response.error = fwdReqErr.message || fwdReqErr; } } try { - const modifiedResp = await this.emitActionsForResponse(modifiedRequest.reqData, respData); + const modifiedResp = await this.emitActionsForResponse(modifiedRequest.request, response, transport); // Send back to the client - if (modifiedResp && modifiedResp.respData && modifiedResp.respData.error) { + if (modifiedResp && modifiedResp.response && modifiedResp.response.error) { // error returned from the node and it wasn't stripped by our response actions - const error = modifiedResp.respData.error.message || modifiedResp.respData.error; + const error = modifiedResp.response.error.message || modifiedResp.response.error; this.logger.error(__(`Error returned from the node: ${error}`)); - const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedResp.respData.id }; + const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedResp.response.id }; return this.respondError(transport, rpcErrorObj); } - this.respondOK(transport, modifiedResp.respData); + this.respondOK(transport, modifiedResp.response); } catch (resError) { // if was an error in response actions (resError), send the error in the response const error = resError.message || resError; this.logger.error(__(`Error executing response actions: ${error}`)); - const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.reqData.id }; + const rpcErrorObj = { "jsonrpc": "2.0", "error": { "code": -32603, "message": error }, "id": modifiedRequest.request.id }; return this.respondError(transport, rpcErrorObj); } - - - } - forwardRequestToNode(reqData) { - return new Promise((resolve, reject) => { - this.requestManager.send(reqData, (fwdReqErr, result) => { + forwardRequestToNode(request) { + return new Promise(async (resolve, reject) => { + const reqMgr = await this.requestManager; + reqMgr.send(request, (fwdReqErr, result) => { if (fwdReqErr) { return reject(fwdReqErr); } @@ -210,6 +179,98 @@ export class Proxy { }); } + async handleSubscribe(clientSocket, request, response, cb) { + let currentReqManager = await this.requestManager; + if (!this.isVm) { + const provider = await this._createWebSocketProvider(this.endpoint); + // creates a new long-living connection to the node + currentReqManager = this._createWeb3RequestManager(provider); + + // kill WS connetion to the node when the client connection closes + clientSocket.on('close', () => currentReqManager.provider.disconnect()); + } + + + // do the actual forward request to the node + currentReqManager.send(request, (error, subscriptionId) => { + if (error) { + return cb(error); + } + // `result` contains our initial response from the node, ie + // subscription id. Any FUTURE data from the node that needs + // to be forwarded to the client connection will be handled + // in the `.on('data')` event below. + this.logger.debug(`Created subscription: ${subscriptionId} for ${JSON.stringify(request.params)}`); + this.logger.debug(`Subscription request: ${JSON.stringify(request)} `); + + // add the websocket req manager for this subscription to memory so it + // can be referenced later + this.nodeSubscriptions[subscriptionId] = currentReqManager; + + // Watch for `eth_subscribe` subscription data coming from the node. + // Send the subscription data back across the originating client + // connection. + currentReqManager.provider.on('data', async (subscriptionResponse, deprecatedResponse) => { + subscriptionResponse = subscriptionResponse || deprecatedResponse; + + // filter out any subscription data that is not meant to be passed back to the client + // This is only needed when using a VM because the VM uses only one provider. When there is + // only one provider, that single provider has 'data' events subscribed to it for each `eth_subscribe`. + // When any subscription data is returned from the node, it will fire the 'data' event for ALL + // `eth_subscribe`s. This filter prevents responding to sockets unnecessarily. + if (!subscriptionResponse.params || subscriptionResponse.params.subscription !== subscriptionId) { + return; + } + + this.logger.debug(`Subscription data received from node and forwarded to originating socket client connection: ${JSON.stringify(subscriptionResponse)} `); + + // allow modification of the node subscription data sent to the client + subscriptionResponse = await this.emitActionsForResponse(subscriptionResponse, subscriptionResponse, clientSocket); + this.respondWs(clientSocket, subscriptionResponse.response); + }); + + // send a response to the original requesting inbound client socket + // (ie the browser or embark) with the result of the subscription + // request from the node + response.result = subscriptionId; + cb(null, response); + }); + + + } + + async handleUnsubscribe(request, response, cb) { + // kill our manually created long-living connection for eth_subscribe if we have one + const subscriptionId = request.params[0]; + const currentReqManager = this.nodeSubscriptions[subscriptionId]; + + if (!currentReqManager) { + return this.logger.error(`Failed to unsubscribe from subscription '${subscriptionId}' because the proxy failed to find an active connection to the node.`); + } + // forward unsubscription request to the node + currentReqManager.send(request, (error, result) => { + if (error) { + return cb(error); + } + // `result` contains 'true' if the unsubscription request was successful + this.logger.debug(`Unsubscription result for subscription '${JSON.stringify(request.params)}': ${result} `); + this.logger.debug(`Unsubscription request: ${JSON.stringify(request)} `); + + + // if unsubscribe succeeded, disconnect connection and remove connection from memory + if (result === true) { + if (currentReqManager.provider && currentReqManager.provider.disconnect && !this.isVm) { + currentReqManager.provider.disconnect(); + } + delete this.nodeSubscriptions[subscriptionId]; + } + // result should be true/false + response.result = result; + + cb(null, response); + }); + } + respondWs(ws, response) { if (typeof response === "object") { response = JSON.stringify(response); @@ -223,7 +284,7 @@ export class Proxy { 2: "closing", 3: "closed" }; - this.logger.warn(`[Proxy]: Failed to send WebSocket response because the socket is ${stateMap[ws.readyState]}. Response: ${response}`); + this.logger.warn(`[Proxy]: Failed to send WebSocket response because the socket is ${stateMap[ws.readyState]}.Response: ${response} `); } respondHttp(res, statusCode, response) { res.status(statusCode).send(response); @@ -237,22 +298,23 @@ export class Proxy { return this.isWs ? this.respondWs(transport, response) : this.respondHttp(transport, 200, response); } - emitActionsForRequest(body) { + emitActionsForRequest(request, transport) { return new Promise((resolve, reject) => { let calledBack = false; + const data = { request, isWs: this.isWs, transport }; setTimeout(() => { if (calledBack) { return; } - this.logger.warn(__('Action for request "%s" timed out', body.method)); - this.logger.debug(body); + this.logger.warn(__('Action for request "%s" timed out', request.method)); + this.logger.debug(request); calledBack = true; - resolve({ reqData: body }); + resolve(data); }, ACTION_TIMEOUT); this.plugins.emitAndRunActionsForEvent('blockchain:proxy:request', - { reqData: body }, - (err, resp) => { + data, + (err, result) => { if (calledBack) { // Action timed out return; @@ -261,33 +323,34 @@ export class Proxy { this.logger.error(__('Error parsing the request in the proxy')); this.logger.error(err); // Reset the data to the original request so that it can be used anyway - resp = { reqData: body }; + result = data; calledBack = true; return reject(err); } calledBack = true; - resolve(resp); + resolve(result); }); }); } - emitActionsForResponse(reqData, respData) { + emitActionsForResponse(request, response, transport) { return new Promise((resolve, reject) => { + const data = { request, response, isWs: this.isWs, transport }; let calledBack = false; setTimeout(() => { if (calledBack) { return; } - this.logger.warn(__('Action for response "%s" timed out', reqData.method)); - this.logger.debug(reqData); - this.logger.debug(respData); + this.logger.warn(__('Action for response "%s" timed out', request.method)); + this.logger.debug(request); + this.logger.debug(response); calledBack = true; - resolve({ respData }); + resolve(data); }, ACTION_TIMEOUT); this.plugins.emitAndRunActionsForEvent('blockchain:proxy:response', - { respData, reqData }, - (err, resp) => { + data, + (err, result) => { if (calledBack) { // Action timed out return; @@ -296,10 +359,12 @@ export class Proxy { this.logger.error(__('Error parsing the response in the proxy')); this.logger.error(err); calledBack = true; - reject(err); + // Reset the data to the original response so that it can be used anyway + result = data; + return reject(err); } calledBack = true; - resolve(resp); + resolve(result); }); }); } diff --git a/packages/stack/test-runner/src/lib/reporter.js b/packages/stack/test-runner/src/lib/reporter.js index 5cfe34619..5ca3cd2a6 100644 --- a/packages/stack/test-runner/src/lib/reporter.js +++ b/packages/stack/test-runner/src/lib/reporter.js @@ -15,7 +15,7 @@ class Reporter { wireGasUsage() { const {events} = this.embark; events.on('blockchain:proxy:response', (params) => { - const {result} = params.respData; + const { result } = params.response; if (!result || !result.gasUsed) { return; diff --git a/site/source/docs/plugin_reference.md b/site/source/docs/plugin_reference.md index 09d39d348..ff4941372 100644 --- a/site/source/docs/plugin_reference.md +++ b/site/source/docs/plugin_reference.md @@ -477,5 +477,12 @@ embark.registerActionForEvent("deployment:contract:beforeDeploy", async (params, - `deployment:deployContracts:afterAll`: Called after all contracts have deployed. No params - `tests:contracts:compile:before`: Called before the contracts are compiled in the context of the test. Only param is `contractFiles` - `tests:contracts:compile:after`: Called after the contracts are compiled in the context of the test. Only param is `compiledContracts` -- `blockchain:proxy:request`: Called before a request from Embark or the Dapp is sent to the blockchain node. You can modify or react to the payload of the request. Only param is `reqData`, an object containing the payload -- `blockchain:proxy:response`: Called before the node response is sent back to Embark or the Dapp. You can modify or react to the payload of the response. Two params, `reqData` and `respData`, objects containing the payloads +- `blockchain:proxy:request`: Called before a request from Embark or the Dapp is sent to the blockchain node. You can modify or react to the payload of the request. Params are: + - `request`: an object containing the request payload + - `transport`: an object containing the client's websocket connection to the proxy + - `isWs`: a boolean flag indicating if the request was performed using websockets +- `blockchain:proxy:response`: Called before the node response is sent back to Embark or the Dapp. You can modify or react to the payload of the response. Params are: + - `request`: an object containing the request payload + - `response`: an object containing the response payload + - `transport`: an object containing the client's websocket connection to the proxy + - `isWs`: a boolean flag indicating if the request was performed using websockets