#!/usr/bin/env python3.6 import argparse import vyper from pprint import pprint from vyper import compiler from vyper.parser import ( parser, ) from vyper.vdb import ( set_evm_opcode_debugger, set_evm_opcode_pass ) from eth_tester import ( EthereumTester, ) from web3.providers.eth_tester import ( EthereumTesterProvider, ) from web3 import ( Web3, ) aparser = argparse.ArgumentParser(description='Vyper {0} quick CLI runner'.format(vyper.__version__)) aparser.add_argument('input_file', help='Vyper sourcecode to run') aparser.add_argument('call_list', help='call list, without parameters: func, with parameters func(1, 2, 3). Semicolon separated') aparser.add_argument('-i', help='init args, comma separated', default='', dest='init_args') args = aparser.parse_args() set_evm_opcode_pass() # by default just pass over the debug opcode. def cast_types(args, abi_signature): newargs = args.copy() for idx, abi_arg in enumerate(abi_signature['inputs']): if abi_arg['type'] in ('int128', 'uint256'): newargs[idx] = int(args[idx]) elif abi_arg['type'].startswith('bytes'): newargs[idx] = args[idx].encode() return newargs def get_tester(): tester = EthereumTester() def zero_gas_price_strategy(web3, transaction_params=None): return 0 # zero gas price makes testing simpler. w3 = Web3(EthereumTesterProvider(tester)) w3.eth.setGasPriceStrategy(zero_gas_price_strategy) return tester, w3 def get_contract(w3, source_code, *args, **kwargs): abi = compiler.mk_full_signature(source_code) bytecode = '0x' + compiler.compile(source_code).hex() contract = w3.eth.contract(abi=abi, bytecode=bytecode) value = kwargs.pop('value', 0) value_in_eth = kwargs.pop('value_in_eth', 0) value = value_in_eth * 10**18 if value_in_eth else value # Handle deploying with an eth value. gasPrice = kwargs.pop('gasPrice', 0) deploy_transaction = { 'from': w3.eth.accounts[0], 'data': contract._encode_constructor_data(args, kwargs), 'value': value, 'gasPrice': gasPrice } tx = w3.eth.sendTransaction(deploy_transaction) address = w3.eth.getTransactionReceipt(tx)['contractAddress'] contract = w3.eth.contract(address, abi=abi, bytecode=bytecode) # Filter logs. contract._logfilter = w3.eth.filter({ 'fromBlock': w3.eth.blockNumber - 1, 'address': contract.address }) return contract if __name__ == '__main__': with open(args.input_file) as fh: code = fh.read() # Patch in vdb. init_args = args.init_args.split(',') if args.init_args else [] tester, w3 = get_tester() # Built list of calls to make. calls = [] for signature in args.call_list.split(';'): name = signature.strip() args = [] if '(' in signature: start_pos = signature.find('(') name = signature[:start_pos] args = signature[start_pos+1:-1].split(',') args = [arg.strip() for arg in args] args = [arg for arg in args if len(arg) > 0] calls.append((name, args)) abi = compiler.mk_full_signature(code) def serialise_var_rec(var_rec): return { 'type': var_rec.typ.typ, 'size': var_rec.size * 32, 'position': var_rec.pos } _contracts, _events, _defs, _globals, _custom_units = parser.get_contracts_and_defs_and_globals(parser.parse(code)) source_map = { 'globals': {}, 'locals': {} } source_map['globals'] = { name: serialise_var_rec(var_record) for name, var_record in _globals.items() } # Fetch context for each function. lll = parser.parse_tree_to_lll(parser.parse(code), code, runtime_only=True) contexts = { f.func_name: f.context for f in lll.args[1:] if hasattr(f, 'context') } prev_func_name = None for _def in _defs: func_info = { 'from_lineno': _def.lineno, 'variables': {} } # set local variables for specific function. context = contexts[_def.name] func_info['variables'] = { var_name: serialise_var_rec(var_rec) for var_name, var_rec in context.vars.items() } source_map['locals'][_def.name] = func_info # set to_lineno if prev_func_name: source_map['locals'][prev_func_name]['to_lineno'] = _def.lineno prev_func_name = _def.name source_map['locals'][_def.name]['to_lineno'] = len(code.splitlines()) # Format init args. if init_args: init_abi = next(filter(lambda func: func["name"] == '__init__', abi)) init_args = cast_types(init_args, init_abi) # Compile contract to chain. contract = get_contract(w3, code, *init_args, language='vyper') # Execute calls for func_name, args in calls: if not hasattr(contract.functions, func_name): print('\n No method {} found, skipping.'.format(func_name)) continue print('\n* Calling {}({})'.format(func_name, ','.join(args))) func_abi = next(filter(lambda func: func["name"] == func_name, abi)) if len(args) != len(func_abi['inputs']): print('Argument mismatch, please provide correct arguments.') break cast_args = cast_types(args, func_abi) res = getattr(contract.functions, func_name)(*cast_args).call({'gas': func_abi['gas'] + 22000}) set_evm_opcode_debugger(source_code=code, source_map=source_map) tx_hash = getattr(contract.functions, func_name)(*cast_args).transact({'gas': func_abi['gas'] + 22000}) set_evm_opcode_pass() print('- Returns:') pprint('{}'.format(res)) # Detect any new log events, and print them. print('- Logs:') event_names = [x['name'] for x in abi if x['type'] == 'event'] tx_receipt = w3.eth.getTransactionReceipt(tx_hash) for event_name in event_names: logs = getattr(contract.events, event_name)().processReceipt(tx_receipt) for log in logs: print(log.event + ":") pprint(dict(log.args)) else: print(' No events found.')