148 lines
5.1 KiB
Python
Executable File
148 lines
5.1 KiB
Python
Executable File
#!/usr/bin/env python3.6
|
|
import argparse
|
|
import vyper
|
|
|
|
from pprint import pprint
|
|
from vyper import compiler
|
|
from vyper.parser import (
|
|
parser,
|
|
)
|
|
from vdb.vdb import (
|
|
set_evm_opcode_debugger,
|
|
set_evm_opcode_pass
|
|
)
|
|
from vdb.source_map import (
|
|
produce_source_map
|
|
)
|
|
from eth_tester import (
|
|
EthereumTester,
|
|
)
|
|
from web3.providers.eth_tester import (
|
|
EthereumTesterProvider,
|
|
)
|
|
from web3 import (
|
|
Web3,
|
|
)
|
|
|
|
aparser = argparse.ArgumentParser(description='Vyper {0} quick CLI runner'.format(vyper.__version__))
|
|
aparser.add_argument('input_file', help='Vyper sourcecode to run')
|
|
aparser.add_argument('call_list', help='call list, without parameters: func, with parameters func(1, 2, 3). Semicolon separated')
|
|
aparser.add_argument('-i', help='init args, comma separated', default='', dest='init_args')
|
|
|
|
args = aparser.parse_args()
|
|
set_evm_opcode_pass() # by default just pass over the debug opcode.
|
|
|
|
|
|
def cast_types(args, abi_signature):
|
|
newargs = args.copy()
|
|
for idx, abi_arg in enumerate(abi_signature['inputs']):
|
|
if abi_arg['type'] in ('int128', 'uint256'):
|
|
newargs[idx] = int(args[idx])
|
|
elif abi_arg['type'].startswith('bytes'):
|
|
newargs[idx] = args[idx].encode()
|
|
return newargs
|
|
|
|
|
|
def get_tester():
|
|
tester = EthereumTester()
|
|
def zero_gas_price_strategy(web3, transaction_params=None):
|
|
return 0 # zero gas price makes testing simpler.
|
|
w3 = Web3(EthereumTesterProvider(tester))
|
|
w3.eth.setGasPriceStrategy(zero_gas_price_strategy)
|
|
return tester, w3
|
|
|
|
|
|
def get_contract(w3, source_code, *args, **kwargs):
|
|
abi = compiler.mk_full_signature(source_code)
|
|
bytecode = '0x' + compiler.compile(source_code).hex()
|
|
contract = w3.eth.contract(abi=abi, bytecode=bytecode)
|
|
|
|
value = kwargs.pop('value', 0)
|
|
value_in_eth = kwargs.pop('value_in_eth', 0)
|
|
value = value_in_eth * 10**18 if value_in_eth else value # Handle deploying with an eth value.
|
|
gasPrice = kwargs.pop('gasPrice', 0)
|
|
deploy_transaction = {
|
|
'from': w3.eth.accounts[0],
|
|
'data': contract._encode_constructor_data(args, kwargs),
|
|
'value': value,
|
|
'gasPrice': gasPrice
|
|
}
|
|
tx = w3.eth.sendTransaction(deploy_transaction)
|
|
address = w3.eth.getTransactionReceipt(tx)['contractAddress']
|
|
contract = w3.eth.contract(address, abi=abi, bytecode=bytecode)
|
|
# Filter logs.
|
|
contract._logfilter = w3.eth.filter({
|
|
'fromBlock': w3.eth.blockNumber - 1,
|
|
'address': contract.address
|
|
})
|
|
return contract
|
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
with open(args.input_file) as fh:
|
|
code = fh.read()
|
|
# Patch in vdb.
|
|
init_args = args.init_args.split(',') if args.init_args else []
|
|
tester, w3 = get_tester()
|
|
|
|
# Built list of calls to make.
|
|
calls = []
|
|
for signature in args.call_list.split(';'):
|
|
name = signature.strip()
|
|
args = []
|
|
|
|
if '(' in signature:
|
|
start_pos = signature.find('(')
|
|
name = signature[:start_pos].strip()
|
|
args = signature[start_pos + 1:-1].split(',')
|
|
args = [arg.strip() for arg in args]
|
|
args = [arg for arg in args if len(arg) > 0]
|
|
|
|
calls.append((name, args))
|
|
|
|
abi = compiler.mk_full_signature(code)
|
|
|
|
# Format init args.
|
|
if init_args:
|
|
init_abi = next(filter(lambda func: func["name"] == '__init__', abi))
|
|
init_args = cast_types(init_args, init_abi)
|
|
|
|
# Compile contract to chain.
|
|
contract = get_contract(w3, code, *init_args, language='vyper')
|
|
|
|
# Execute calls
|
|
for func_name, args in calls:
|
|
if not hasattr(contract.functions, func_name):
|
|
print('\n No method {} found, skipping.'.format(func_name))
|
|
continue
|
|
|
|
print('\n* Calling {}({})'.format(func_name, ','.join(args)))
|
|
func_abi = next(filter(lambda func: func["name"] == func_name, abi))
|
|
if len(args) != len(func_abi['inputs']):
|
|
print('Argument mismatch, please provide correct arguments.')
|
|
break
|
|
|
|
cast_args = cast_types(args, func_abi)
|
|
res = getattr(contract.functions, func_name)(*cast_args).call({'gas': func_abi['gas'] + 22000})
|
|
|
|
source_map = produce_source_map(code)
|
|
set_evm_opcode_debugger(source_code=code, source_map=source_map)
|
|
tx_hash = getattr(contract.functions, func_name)(*cast_args).transact({'gas': func_abi['gas'] + 22000})
|
|
set_evm_opcode_pass()
|
|
|
|
print('- Returns:')
|
|
pprint('{}'.format(res))
|
|
|
|
# Detect any new log events, and print them.
|
|
print('- Logs:')
|
|
event_names = [x['name'] for x in abi if x['type'] == 'event']
|
|
tx_receipt = w3.eth.getTransactionReceipt(tx_hash)
|
|
for event_name in event_names:
|
|
logs = getattr(contract.events, event_name)().processReceipt(tx_receipt)
|
|
for log in logs:
|
|
print(log.event + ":")
|
|
pprint(dict(log.args))
|
|
else:
|
|
print(' No events found.')
|