#!/usr/bin/env python3 import argparse import sys import os import vyper from collections import Counter from pprint import pprint from vyper import compile_code from vdb.source_map import ( produce_source_map ) from vdb.eth_tester_debug_backend import ( PyEVMDebugBackend, set_debug_info ) import vdb.debug_computation from web3.providers.eth_tester import ( EthereumTesterProvider, ) from web3 import ( Web3, ) sys.tracebacklimit = 0 tb_limit = os.environ.get('VYPER_TRACEBACK_LIMIT') if tb_limit: sys.tracebacklimit = int(tb_limit) aparser = argparse.ArgumentParser(description='Vyper {0} quick CLI runner'.format(vyper.__version__)) aparser.add_argument('input_file', help='Vyper sourcecode to run') aparser.add_argument('call_list', help='call list, without parameters: func, with parameters func(1, 2, 3). Semicolon separated') aparser.add_argument('--trace', action='store_true', help='very verbose logging') aparser.add_argument('-i', help='init args, comma separated', default=None, dest='init_args') args = aparser.parse_args() def cast_types(args, abi_signature): newargs = args.copy() for idx, abi_arg in enumerate(abi_signature['inputs']): if abi_arg['type'] in ('int128', 'uint256'): newargs[idx] = int(args[idx]) elif abi_arg['type'].startswith('bytes'): newargs[idx] = args[idx].encode() return newargs def get_tester(code): from eth_tester import ( EthereumTester, ) source_map = produce_source_map(code) set_debug_info(code, source_map) tester = EthereumTester(backend=PyEVMDebugBackend()) def zero_gas_price_strategy(web3, transaction_params=None): return 0 # zero gas price makes testing simpler. w3 = Web3(EthereumTesterProvider(tester)) w3.eth.setGasPriceStrategy(zero_gas_price_strategy) return tester, w3 def get_contract(w3, source_code, *args, **kwargs): compiler_out = compile_code(source_code, ['abi', 'bytecode']) abi, bytecode = compiler_out['abi'], compiler_out['bytecode'] contract = w3.eth.contract(abi=abi, bytecode=bytecode) value = kwargs.pop('value', 0) value_in_eth = kwargs.pop('value_in_eth', 0) value = value_in_eth * 10**18 if value_in_eth else value # Handle deploying with an eth value. gasPrice = kwargs.pop('gasPrice', 0) deploy_transaction = { 'from': w3.eth.accounts[0], 'data': contract._encode_constructor_data(args, kwargs), 'value': value, 'gasPrice': gasPrice } tx = w3.eth.sendTransaction(deploy_transaction) address = w3.eth.getTransactionReceipt(tx)['contractAddress'] contract = w3.eth.contract(address, abi=abi, bytecode=bytecode) # Filter logs. contract._logfilter = w3.eth.filter({ 'fromBlock': w3.eth.blockNumber - 1, 'address': contract.address }) return contract def get_func_abi(abi, func_name, args): # next(filter(lambda func: func["name"] == func_name, abi)) def guess_type(v): # is annotated type. if ':' in v: return v.split(':')[1] # otherwise just guess try: int(v) return 'int128' except ValueError: return 'bytes' # handle when func doesnot have a name --- special for __init__ and __default__ func_name_count_map = dict(Counter([a['name'] for a in abi if a['type'] != 'constructor' and a['type'] != 'fallback' ])) for candidate_func_abi in abi: if candidate_func_abi["type"] == "function": # try func name first. if candidate_func_abi["name"] == func_name and \ func_name_count_map[candidate_func_abi['name']] == 1: if len(args) != len(candidate_func_abi['inputs']): print('Incorrect arguments for {}'.format(func_name)) return else: return candidate_func_abi # is overloaded function, use full signature. else: full_sig = "{func_name}({type_str})".format( func_name=func_name, type_str=','.join([guess_type(x) for x in args]) ) method = "{func_name}({type_str})".format( func_name=candidate_func_abi['name'], type_str=','.join([x['type'] for x in candidate_func_abi['inputs']]) ) if method == full_sig: return candidate_func_abi if __name__ == '__main__': with open(args.input_file) as fh: code = fh.read() # Patch in vdb. init_args = args.init_args.split(',') if args.init_args else [] tester, w3 = get_tester(code) setattr(vdb.debug_computation.DebugComputation, 'trace', args.trace) # Built list of calls to make. calls = [] for signature in args.call_list.split(';'): name = signature.strip() args = [] if '(' in signature: start_pos = signature.find('(') name = signature[:start_pos].strip() args = signature[start_pos + 1:-1].split(',') args = [arg.strip() for arg in args] args = [arg for arg in args if len(arg) > 0] calls.append((name, args)) abi = compile_code(code, ['abi'])['abi'] # Format init args. if init_args: init_abi = next(filter(lambda func: func["type"] == 'constructor', abi)) #since __init__ doesn't have a name init_args = cast_types(init_args, init_abi) # Compile contract to chain. contract = get_contract(w3, code, *init_args) # Execute calls for func_name, args in calls: if not hasattr(contract.functions, func_name): print('\n No method {} found, skipping.'.format(func_name)) continue print('\n* Calling {}({})'.format(func_name, ','.join(args))) func_abi = get_func_abi(abi, func_name, args) if not func_abi: print('Did not find function in abi.') break cast_args = cast_types(args, func_abi) setattr(vdb.debug_computation.DebugComputation, 'enable_debug', True) res = getattr(contract.functions, func_name)(*cast_args).call({'gas': func_abi.get('gas', 0) + 50000}) setattr(vdb.debug_computation.DebugComputation, 'enable_debug', False) tx_hash = getattr(contract.functions, func_name)(*cast_args).transact({'gas': func_abi.get('gas', 0) + 50000}) print('- Returns:') pprint('{}'.format(res)) # Detect any new log events, and print them. print('- Logs:') event_names = [x['name'] for x in abi if x['type'] == 'event'] tx_receipt = w3.eth.getTransactionReceipt(tx_hash) for event_name in event_names: logs = getattr(contract.events, event_name)().processReceipt(tx_receipt) for log in logs: print(log.event + ":") pprint(dict(log.args)) else: print(' No events found.')