refactor(@embark-proxy): use web3 provider to send requests to node

This commit is contained in:
Jonathan Rainville 2019-08-27 12:04:17 -04:00
parent 218d0bda28
commit ff96a5cc98
3 changed files with 30 additions and 50 deletions

View File

@ -51,7 +51,7 @@
"embark-utils": "^4.1.0-beta.5", "embark-utils": "^4.1.0-beta.5",
"express": "4.17.1", "express": "4.17.1",
"express-ws": "4.0.0", "express-ws": "4.0.0",
"ws": "6.1.2" "web3": "1.0.0-beta.37"
}, },
"devDependencies": { "devDependencies": {
"@babel/cli": "7.2.3", "@babel/cli": "7.2.3",

View File

@ -14,6 +14,7 @@ export default class ProxyManager {
private rpcPort: number; private rpcPort: number;
private wsPort: number; private wsPort: number;
private ready: boolean; private ready: boolean;
private isWs = false;
constructor(private embark: Embark, options: any) { constructor(private embark: Embark, options: any) {
this.logger = embark.logger; this.logger = embark.logger;
@ -42,7 +43,7 @@ export default class ProxyManager {
return cb(null, this.embark.config.blockchainConfig.endpoint); return cb(null, this.embark.config.blockchainConfig.endpoint);
} }
// TODO Check if the proxy can support HTTPS, though it probably doesn't matter since it's local // TODO Check if the proxy can support HTTPS, though it probably doesn't matter since it's local
if (this.embark.config.blockchainConfig.wsRPC) { if (this.isWs) {
return cb(null, buildUrl("ws", this.host, this.wsPort, "ws")); return cb(null, buildUrl("ws", this.host, this.wsPort, "ws"));
} }
cb(null, buildUrl("http", this.host, this.rpcPort, "rpc")); cb(null, buildUrl("http", this.host, this.rpcPort, "rpc"));
@ -68,13 +69,14 @@ export default class ProxyManager {
this.rpcPort = port; this.rpcPort = port;
this.wsPort = port + 1; this.wsPort = port + 1;
this.isWs = (/wss?/).test(this.embark.config.blockchainConfig.endpoint);
this.proxy = await new Proxy({events: this.events, plugins: this.plugins, logger: this.logger}) this.proxy = await new Proxy({events: this.events, plugins: this.plugins, logger: this.logger})
.serve( .serve(
this.embark.config.blockchainConfig.endpoint, this.embark.config.blockchainConfig.endpoint,
this.host, this.host,
this.embark.config.blockchainConfig.wsRPC ? this.wsPort : this.rpcPort, this.isWs ? this.wsPort : this.rpcPort,
this.embark.config.blockchainConfig.wsRPC, this.isWs,
null, null,
); );
return; return;

View File

@ -1,11 +1,10 @@
/* global Buffer exports require */ /* global Buffer exports require */
import {__} from 'embark-i18n'; import {__} from 'embark-i18n';
import axios from "axios";
import {canonicalHost, timer, pingEndpoint, deconstructUrl} from 'embark-utils'; import {canonicalHost, timer, pingEndpoint, deconstructUrl} from 'embark-utils';
import express from 'express'; import express from 'express';
import expressWs from 'express-ws'; import expressWs from 'express-ws';
import Web3 from 'web3';
import cors from 'cors'; import cors from 'cors';
const WebSocket = require("ws");
const ACTION_TIMEOUT = 5000; const ACTION_TIMEOUT = 5000;
@ -43,6 +42,8 @@ export class Proxy {
}); });
}()); }());
const web3 = new Web3(endpoint);
const app = express(); const app = express();
if (ws) { if (ws) {
expressWs(app); expressWs(app);
@ -53,33 +54,6 @@ export class Proxy {
app.use(express.urlencoded({extended: true})); app.use(express.urlencoded({extended: true}));
if (ws) { if (ws) {
const messages = {};
const conn = new WebSocket(endpoint);
conn.on("message", (data) => {
// Message from the Node
let jsonData;
try {
jsonData = JSON.parse(data);
} catch (e) {
this.logger.error(__('Error parsing response'), e.message);
return;
}
const msg = messages[jsonData.id];
if (!msg) {
// Not a request
return;
}
delete messages[jsonData.id];
// Send to plugins to possibly modify the response
this.emitActionsForResponse(msg.msg, jsonData, (_err, resp) => {
// Send back to the caller (web3)
msg.ws.send(JSON.stringify(resp.respData));
});
});
conn.on("error", (e) => {
this.logger.error(__('Error executing the request on the Node'), JSON.stringify(e));
});
app.ws('/', (ws, _wsReq) => { app.ws('/', (ws, _wsReq) => {
ws.on('message', (msg) => { ws.on('message', (msg) => {
let jsonMsg; let jsonMsg;
@ -89,33 +63,37 @@ export class Proxy {
this.logger.error(__('Error parsing request'), e.message); this.logger.error(__('Error parsing request'), e.message);
return; return;
} }
messages[jsonMsg.id] = {msg: jsonMsg, ws};
// Modify request // Modify request
this.emitActionsForRequest(jsonMsg, (_err, resp) => { this.emitActionsForRequest(jsonMsg, (_err, resp) => {
// Send the possibly modified request to the Node // Send the possibly modified request to the Node
conn.send(JSON.stringify(resp.reqData)); web3._requestManager.send(resp.reqData, (err, result) => {
if (err) {
return this.logger.error(__('Error executing the request on the Node'), JSON.stringify(err));
}
this.emitActionsForResponse(resp.reqData, {jsonrpc: "2.0", id: resp.reqData.id, result}, (_err, resp) => {
// Send back to the caller (web3)
ws.send(JSON.stringify(resp.respData));
});
}); });
});
}); });
}); });
} else { } else {
// HTTP // HTTP
app.use((req, res) => { app.use((req, res) => {
// Modify request // Modify request
this.emitActionsForRequest(req.body, (_err, resp) => { this.emitActionsForRequest(req.body, (_err, resp) => {
// Send the possibly modified request to the Node // Send the possibly modified request to the Node
axios.post(endpoint, resp.reqData) web3._requestManager.send(resp.reqData, (err, result) => {
.then((response) => { if (err) {
// Send to plugins to possibly modify the response res.status(500).send(err.message || err);
this.emitActionsForResponse(resp.reqData, response.data, (_err, resp) => { }
// Send back to the caller (web3) this.emitActionsForResponse(resp.reqData, {jsonrpc: "2.0", id: resp.reqData.id, result}, (_err, resp) => {
res.send(resp.respData); // Send back to the caller (web3)
}); res.status(200).send(resp.respData);
}) });
.catch((error) => {
res.status(500);
res.send(error.message);
});
}); });
});
}); });
} }