Decoupled message processor from whisper (useful for unit tests)

This commit is contained in:
Richard Ramos 2018-08-23 15:48:09 -04:00
parent 19e2d8029a
commit aa63705638
5 changed files with 135 additions and 145 deletions

View File

@ -1,150 +1,90 @@
class MessageProcessor {
constructor(config, settings, web3, kId, events){
constructor(config, settings, web3, events){
this.config = config;
this.settings = settings;
this.web3 = web3;
this.kId = kId;
this.events = events;
}
_reply(text, message, receipt){
if(message.sig !== undefined){
console.log(text);
this.web3.shh.post({
pubKey: message.sig,
sig: this.kId,
ttl: this.config.node.whisper.ttl,
powTarget:this.config.node.whisper.minPow,
powTime: this.config.node.whisper.powTime,
topic: message.topic,
payload: this.web3.utils.fromAscii(JSON.stringify({message:text, receipt}, null, " "))
}).catch(console.error);
}
}
async _validateInput(message){
console.info("Processing request to: %s, %s", message.input.contract, message.input.functionName);
const contract = this.settings.getContractByTopic(message.topic);
async _validateInput(contract, input){
console.info("Processing request to: %s, %s", input.contract, input.functionName);
if(contract == undefined){
this._reply('Invalid topic', message);
return false;
return {success: false, message: 'Unknown contract'};
}
if(!contract.functionSignatures.includes(message.input.functionName)){
this._reply('Function not allowed', message);
return false;
if(!contract.functionSignatures.includes(input.functionName)){
return {success: false, message: 'Function not allowed'};
}
// Get code from contract and compare it against the contract code
if(!contract.isIdentity){
const code = this.web3.utils.soliditySha3(await this.web3.eth.getCode(message.input.contract));
const code = this.web3.utils.soliditySha3(await this.web3.eth.getCode(input.contract));
if(code != contract.code){
this._reply('Invalid contract code', message);
return false;
return {success: false, message: 'Invalid contract code'};
}
} else {
if(!(/^0x[0-9a-f]{40}$/i).test(message.input.contract)){
this._reply('Invalid contract address', message);
return false;
if(!(/^0x[0-9a-f]{40}$/i).test(input.contract)){
return {success: false, message: 'Invalid contract address'};
}
}
if(message.input.address && !(/^0x[0-9a-f]{40}$/i).test(message.input.address)){
this._reply('Invalid address', message);
return false;
if(input.address && !(/^0x[0-9a-f]{40}$/i).test(input.address)){
return {success: false, message: 'Invalid address'};
}
return true;
return {success: true};
}
_extractInput(message){
let obj = {
contract: null,
address: null,
functionName: null,
functionParameters: null,
payload: null
async process(contract, input, reply){
const inputValidation = await this._validateInput(contract, input);
if(!inputValidation.success){
// TODO Log?
reply(inputValidation);
return;
}
let validationResult;
if(contract.strategy){
validationResult = await contract.strategy.execute(input, reply);
if(!validationResult.success){
reply(validationResult.message);
return;
}
}
let p = {
from: this.config.node.blockchain.account,
to: input.contract,
value: 0,
data: input.payload,
gasPrice: this.config.gasPrice
};
try {
const msg = this.web3.utils.toAscii(message.payload);
let parsedObj = JSON.parse(msg);
obj.contract = parsedObj.contract;
obj.address = parsedObj.address;
obj.functionName = parsedObj.encodedFunctionCall.slice(0, 10);
obj.functionParameters = "0x" + parsedObj.encodedFunctionCall.slice(10);
obj.payload = parsedObj.encodedFunctionCall;
} catch(err){
console.error("Couldn't parse " + message);
if(!validationResult.estimatedGas){
validationResult.estimatedGas = await this.web3.eth.estimateGas(p);
}
message.input = obj;
}
p.gas = parseInt(validationResult.estimatedGas * 1.05, 10); // Tune this
/*
_getFactor(input, contract, gasToken){
if(contract.allowedFunctions[input.functionName].isToken){
return this.web3.utils.toBN(this.settings.getToken(gasToken).pricePlugin.getFactor());
const nodeBalance = await this.web3.eth.getBalance(this.config.node.blockchain.account);
if(nodeBalance < p.gas){
reply("Relayer unavailable");
console.error("Relayer doesn't have enough gas to process trx: %s, required %s", nodeBalance, p.gas);
this.events.emit('exit');
} else {
return this.web3.utils.toBN(1);
}
} */
async process(error, message){
if(error){
console.error(error);
} else {
this._extractInput(message);
const contract = this.settings.getContractByTopic(message.topic);
if(!await this._validateInput(message)) return; // TODO Log
let validationResult;
if(contract.strategy){
validationResult = await contract.strategy.execute(message);
if(!validationResult.success){
return this._reply(validationResult.message, message);
}
try {
const receipt = await this.web3.eth.sendTransaction(p);
// TODO: parse events
return reply("Transaction mined", receipt);
} catch(err){
reply("Couldn't mine transaction: " + err.message);
// TODO log this?
console.error(err);
}
let p = {
from: this.config.node.blockchain.account,
to: message.input.contract,
value: 0,
data: message.input.payload,
gasPrice: this.config.gasPrice
};
if(!validationResult.estimatedGas){
validationResult.estimatedGas = await this.web3.eth.estimateGas(p);
}
p.gas = parseInt(validationResult.estimatedGas * 1.1, 10);
const nodeBalance = await this.web3.eth.getBalance(this.config.node.blockchain.account);
if(nodeBalance < p.gas){
this._reply("Relayer unavailable", message);
console.error("Relayer doesn't have enough gas to process trx: %s, required %s", nodeBalance, p.gas);
this.events.emit('exit');
} else {
try {
const receipt = await this.web3.eth.sendTransaction(p);
// TODO: parse events
return this._reply("Transaction mined", message, receipt);
} catch(err){
this._reply("Couldn't mine transaction: " + err.message, message);
// TODO log this?
console.error(err);
}
}
}
}
}

View File

@ -102,11 +102,59 @@ events.on('setup:complete', async (settings) => {
}*/
});
const reply = (message) => (text, receipt) => {
if(message.sig !== undefined){
console.log(text);
web3.shh.post({
pubKey: message.sig,
sig: shhOptions.kId,
ttl: config.node.whisper.ttl,
powTarget:config.node.whisper.minPow,
powTime: config.node.whisper.powTime,
topic: message.topic,
payload: web3.utils.fromAscii(JSON.stringify({message:text, receipt}, null, " "))
}).catch(console.error);
}
};
const extractInput = (message) => {
let obj = {
contract: null,
address: null,
functionName: null,
functionParameters: null,
payload: null
};
try {
const msg = web3.utils.toAscii(message.payload);
let parsedObj = JSON.parse(msg);
obj.contract = parsedObj.contract;
obj.address = parsedObj.address;
obj.functionName = parsedObj.encodedFunctionCall.slice(0, 10);
obj.functionParameters = "0x" + parsedObj.encodedFunctionCall.slice(10);
obj.payload = parsedObj.encodedFunctionCall;
} catch(err){
console.error("Couldn't parse " + message);
}
return obj;
};
events.on('server:listen', (shhOptions, settings) => {
let processor = new MessageProcessor(config, settings, web3, shhOptions.kId, events);
let processor = new MessageProcessor(config, settings, web3, events);
web3.shh.subscribe('messages', shhOptions, (error, message) => {
if(error){
console.error(error);
return;
}
verifyBalance(true);
processor.process(error, message);
processor.process(settings.getContractByTopic(message.topic),
extractInput(message),
reply(message));
});
});

View File

@ -21,18 +21,20 @@ class BaseStrategy {
}
}
_obtainParametersFunc(message){
const parameterList = this.web3.eth.abi.decodeParameters(this.contract.allowedFunctions[message.input.functionName].inputs, message.input.functionParameters);
_obtainParametersFunc(input){
const parameterList = this.web3.eth.abi.decodeParameters(this.contract.allowedFunctions[input.functionName].inputs, input.functionParameters);
return function(parameterName){
return parameterList[parameterName];
};
}
async _estimateGas(message){
async _estimateGas(input){
console.dir(input);
let p = {
from: this.config.node.blockchain.account,
to: message.input.contract,
data: message.input.payload
to: input.contract,
data: input.payload
};
const estimatedGas = await this.web3.eth.estimateGas(p);
return this.web3.utils.toBN(estimatedGas);
@ -41,7 +43,7 @@ class BaseStrategy {
/**
* Simulate transaction using ganache. Useful for obtaining events
*/
async _simulateTransaction(message){
async _simulateTransaction(input){
let web3Sim = new Web3(ganache.provider({
fork: `${this.config.node.ganache.protocol}://${this.config.node.ganache.host}:${this.config.node.ganache.port}`,
locked: false,
@ -52,9 +54,9 @@ class BaseStrategy {
let simulatedReceipt = await web3Sim.eth.sendTransaction({
from: simAccounts[0],
to: message.input.address,
to: input.address,
value: 0,
data: message.input.payload,
data: input.payload,
gasLimit: 9500000 // 95% of current chain latest gas block limit
});
@ -63,7 +65,7 @@ class BaseStrategy {
}
/*
async execute(message){
async execute(message, reply){
return {
success: true,
message: "Valid transaction"

View File

@ -3,8 +3,8 @@ const erc20ABI = require('../../abi/ERC20Token.json');
class IdentityStrategy extends Strategy {
async _validateInstance(message){
const instanceCodeHash = this.web3.utils.soliditySha3(await this.web3.eth.getCode(message.input.contract));
async _validateInstance(input){
const instanceCodeHash = this.web3.utils.soliditySha3(await this.web3.eth.getCode(input.contract));
const kernelVerifSignature = this.web3.utils.soliditySha3(this.contract.kernelVerification).slice(0, 10);
if(instanceCodeHash === null) return false;
@ -15,15 +15,15 @@ class IdentityStrategy extends Strategy {
return this.web3.eth.abi.decodeParameter('bool', verificationResult);
}
async execute(message){
async execute(input){
if(this.contract.isIdentity){
let validInstance = await this._validateInstance(message);
let validInstance = await this._validateInstance(input);
if(!validInstance){
return {success: false, message: "Invalid identity instance"};
}
}
const params = this._obtainParametersFunc(message);
const params = this._obtainParametersFunc(input);
// Verifying if token is allowed
const token = this.settings.getToken(params('_gasToken'));
@ -32,17 +32,17 @@ class IdentityStrategy extends Strategy {
// Determine if enough balance for baseToken
const gasPrice = this.web3.utils.toBN(params('_gasPrice'));
const gasLimit = this.web3.utils.toBN(params('_gasLimit'));
if(this.contract.allowedFunctions[message.input.functionName].isToken){
if(this.contract.allowedFunctions[input.functionName].isToken){
const Token = new this.web3.eth.Contract(erc20ABI.abi);
Token.options.address = params('_baseToken');
const tokenBalance = new this.web3.utils.BN(await Token.methods.balanceOf(message.input.contract).call());
const tokenBalance = new this.web3.utils.BN(await Token.methods.balanceOf(input.contract).call());
if(tokenBalance.lt(this.web3.utils.toBN(params('_value')))){
return {success: false, message: "Identity has not enough balance for specified value"};
}
}
// gasPrice * limit calculation
const balance = await this.getBalance(message.input.contract, token);
const balance = await this.getBalance(input.contract, token);
if(balance.lt(this.web3.utils.toBN(gasPrice.mul(gasLimit)))) {
return {success: false, message: "Identity has not enough tokens for gasPrice*gasLimit"};
}
@ -50,13 +50,13 @@ class IdentityStrategy extends Strategy {
let estimatedGas = 0;
try {
estimatedGas = await this._estimateGas(message);
if(gasLimit.lt(estimatedGas)) {
// Geth tends to fail estimation with proxies, so we simulate it with ganache
estimatedGas = await this._simulateTransaction(input);
if(gasLimit.mul(this.web3.utils.toBN(1.05)).lt(estimatedGas)) {
return {success: false, message: "Gas limit below estimated gas (" + estimatedGas + ")"};
}
} catch(exc){
if(exc.message.indexOf("revert") > -1) return {success: false, message: "Transaction will revert"};
}
return {

View File

@ -6,22 +6,22 @@ const ExecuteGasRelayed = "0x754e6ab0";
class SNTStrategy extends Strategy {
async execute(message){
const params = this._obtainParametersFunc(message);
async execute(input){
const params = this._obtainParametersFunc(input);
// Verifying if token is allowed
const token = this.settings.getTokenBySymbol("SNT");
if(token == undefined) return {success: false, message: "Token not allowed"};
const balance = await this.getBalance(message.input.address, token);
const balance = await this.getBalance(input.address, token);
const estimatedGas = await this.web3.eth.estimateGas({
data: message.input.payload,
data: input.payload,
from: this.config.node.blockchain.account,
to: message.input.contract
to: input.contract
});
if(message.input.functionName == TransferSNT){
if(input.functionName == TransferSNT){
const gas = this.web3.utils.toBN(estimatedGas);
const value = this.web3.utils.toBN(params('_amount'));
const requiredGas = value.add(gas);
@ -29,11 +29,11 @@ class SNTStrategy extends Strategy {
if(balance.lt(requiredGas)){
return {success: false, message: "Address has not enough balance to transfer specified value + fees (" + requiredGas.toString() + ")"};
}
} else if(message.input.functionName == ExecuteGasRelayed){
} else if(input.functionName == ExecuteGasRelayed){
const latestBlock = await this.web3.eth.getBlock("latest");
let estimatedGas = 0;
try {
estimatedGas = await this._estimateGas(message, latestBlock.gasLimit);
estimatedGas = await this._estimateGas(input, latestBlock.gasLimit);
} catch(exc){
if(exc.message.indexOf("revert") > -1) return {success: false, message: "Transaction will revert"};
}