diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index 1ccc737d0..8e3d4bb70 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -793,7 +793,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from commands := make([]*erc20HistoricalCommand, len(c.accounts)) for i, address := range c.accounts { erc20 := &erc20HistoricalCommand{ - erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{address}, types.LatestSignerForChainID(c.chainClient.ToBigInt())), + erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{address}, types.LatestSignerForChainID(c.chainClient.ToBigInt()), false), chainClient: c.chainClient, feed: c.feed, address: address, diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index 0ebb55990..95b929314 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -132,7 +132,7 @@ func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, fromBlock if fromBalance.Cmp(toBalance) != 0 { diff := new(big.Int).Sub(to, from) if diff.Cmp(batchSize) <= 0 { - headers, err := c.fastIndexErc20(parent, from, to) + headers, err := c.fastIndexErc20(parent, from, to, true) if err != nil { return nil, err } @@ -314,7 +314,7 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to // There could be incoming ERC20 transfers which don't change the balance // and nonce of ETH account, so we keep looking for them - erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to) + erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to, false) if err != nil { log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err, "account", c.account, "chain", c.chainClient.NetworkID()) c.error = err @@ -424,13 +424,13 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cache // run fast indexing for every accont up to canonical chain head minus safety depth. // every account will run it from last synced header. func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber *big.Int, - toBlockNumber *big.Int) ([]*DBHeader, error) { + toBlockNumber *big.Int, incomingOnly bool) ([]*DBHeader, error) { start := time.Now() group := async.NewGroup(ctx) erc20 := &erc20HistoricalCommand{ - erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{c.account}, types.LatestSignerForChainID(c.chainClient.ToBigInt())), + erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{c.account}, types.LatestSignerForChainID(c.chainClient.ToBigInt()), incomingOnly), chainClient: c.chainClient, feed: c.feed, address: c.account, diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 8ecf180e6..35fd32ee7 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -5,6 +5,7 @@ import ( "math/big" "sort" "strings" + "sync" "testing" "time" @@ -42,37 +43,72 @@ type TestClient struct { nonceHistory map[uint64]uint64 traceAPICalls bool printPreparedData bool + rw sync.RWMutex + callsCounter map[string]int } -func (tc TestClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { +func (tc *TestClient) incCounter(method string) { + tc.rw.Lock() + defer tc.rw.Unlock() + tc.callsCounter[method] = tc.callsCounter[method] + 1 +} + +func (tc *TestClient) getCounter() int { + tc.rw.RLock() + defer tc.rw.RUnlock() + cnt := 0 + for _, v := range tc.callsCounter { + cnt += v + } + return cnt +} + +func (tc *TestClient) printCounter() { + total := tc.getCounter() + + tc.rw.RLock() + defer tc.rw.RUnlock() + + tc.t.Log("========================================= Total calls", total) + for k, v := range tc.callsCounter { + tc.t.Log(k, v) + } + tc.t.Log("=========================================") +} + +func (tc *TestClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { if tc.traceAPICalls { tc.t.Log("BatchCallContext") } return nil } -func (tc TestClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { +func (tc *TestClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + tc.incCounter("HeaderByHash") if tc.traceAPICalls { tc.t.Log("HeaderByHash") } return nil, nil } -func (tc TestClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { +func (tc *TestClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { + tc.incCounter("BlockByHash") if tc.traceAPICalls { tc.t.Log("BlockByHash") } return nil, nil } -func (tc TestClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { +func (tc *TestClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + tc.incCounter("BlockByNumber") if tc.traceAPICalls { tc.t.Log("BlockByNumber") } return nil, nil } -func (tc TestClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { +func (tc *TestClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + tc.incCounter("NonceAt") nonce := tc.nonceHistory[blockNumber.Uint64()] if tc.traceAPICalls { tc.t.Log("NonceAt", blockNumber, "result:", nonce) @@ -80,7 +116,8 @@ func (tc TestClient) NonceAt(ctx context.Context, account common.Address, blockN return nonce, nil } -func (tc TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { +func (tc *TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { + tc.incCounter("FilterLogs") if tc.traceAPICalls { tc.t.Log("FilterLogs") } @@ -104,7 +141,8 @@ func (tc TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([] return logs, nil } -func (tc TestClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { +func (tc *TestClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + tc.incCounter("BalanceAt") balance := tc.balanceHistory[blockNumber.Uint64()] if tc.traceAPICalls { @@ -113,7 +151,7 @@ func (tc TestClient) BalanceAt(ctx context.Context, account common.Address, bloc return balance, nil } -func (tc TestClient) tokenBalanceAt(token common.Address, blockNumber *big.Int) *big.Int { +func (tc *TestClient) tokenBalanceAt(token common.Address, blockNumber *big.Int) *big.Int { balance := tc.tokenBalanceHistory[token][blockNumber.Uint64()] if tc.traceAPICalls { @@ -123,6 +161,7 @@ func (tc TestClient) tokenBalanceAt(token common.Address, blockNumber *big.Int) } func (tc *TestClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + tc.incCounter("HeaderByNumber") if tc.traceAPICalls { tc.t.Log("HeaderByNumber", number) } @@ -134,7 +173,8 @@ func (tc *TestClient) HeaderByNumber(ctx context.Context, number *big.Int) (*typ return header, nil } -func (tc TestClient) FullTransactionByBlockNumberAndIndex(ctx context.Context, blockNumber *big.Int, index uint) (*chain.FullTransaction, error) { +func (tc *TestClient) FullTransactionByBlockNumberAndIndex(ctx context.Context, blockNumber *big.Int, index uint) (*chain.FullTransaction, error) { + tc.incCounter("FullTransactionByBlockNumberAndIndex") if tc.traceAPICalls { tc.t.Log("FullTransactionByBlockNumberAndIndex") } @@ -150,18 +190,19 @@ func (tc TestClient) FullTransactionByBlockNumberAndIndex(ctx context.Context, b return tx, nil } -func (tc TestClient) GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) { +func (tc *TestClient) GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) { + tc.incCounter("GetBaseFeeFromBlock") if tc.traceAPICalls { - tc.t.Log("GetBaseFeeFromBloc") + tc.t.Log("GetBaseFeeFromBlock") } return "", nil } -func (tc TestClient) NetworkID() uint64 { +func (tc *TestClient) NetworkID() uint64 { return 777333 } -func (tc TestClient) ToBigInt() *big.Int { +func (tc *TestClient) ToBigInt() *big.Int { if tc.traceAPICalls { tc.t.Log("ToBigInt") } @@ -170,7 +211,8 @@ func (tc TestClient) ToBigInt() *big.Int { var ethscanAddress = common.HexToAddress("0x0000000000000000000000000000000000777333") -func (tc TestClient) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { +func (tc *TestClient) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { + tc.incCounter("CodeAt") if tc.traceAPICalls { tc.t.Log("CodeAt", contract, blockNumber) } @@ -182,7 +224,8 @@ func (tc TestClient) CodeAt(ctx context.Context, contract common.Address, blockN return nil, nil } -func (tc TestClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { +func (tc *TestClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { + tc.incCounter("CallContract") if tc.traceAPICalls { tc.t.Log("CallContract", call, blockNumber, call.To) } @@ -223,8 +266,8 @@ func (tc TestClient) CallContract(ctx context.Context, call ethereum.CallMsg, bl return output, nil } - if *call.To == tokenTXXAddress { - balance := tc.tokenBalanceAt(tokenTXXAddress, blockNumber) + if *call.To == tokenTXXAddress || *call.To == tokenTXZAddress { + balance := tc.tokenBalanceAt(*call.To, blockNumber) parsed, err := abi.JSON(strings.NewReader(ierc20.IERC20ABI)) if err != nil { @@ -314,34 +357,37 @@ func (tc *TestClient) prepareTokenBalanceHistory(toBlock int) { } } -func (tc TestClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { +func (tc *TestClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { + tc.incCounter("CallContext") if tc.traceAPICalls { tc.t.Log("CallContext") } return nil } -func (tc TestClient) GetWalletNotifier() func(chainId uint64, message string) { +func (tc *TestClient) GetWalletNotifier() func(chainId uint64, message string) { if tc.traceAPICalls { tc.t.Log("GetWalletNotifier") } return nil } -func (tc TestClient) SetWalletNotifier(notifier func(chainId uint64, message string)) { +func (tc *TestClient) SetWalletNotifier(notifier func(chainId uint64, message string)) { if tc.traceAPICalls { tc.t.Log("SetWalletNotifier") } } -func (tc TestClient) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) { +func (tc *TestClient) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) { + tc.incCounter("EstimateGas") if tc.traceAPICalls { tc.t.Log("EstimateGas") } return 0, nil } -func (tc TestClient) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) { +func (tc *TestClient) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) { + tc.incCounter("PendingCodeAt") if tc.traceAPICalls { tc.t.Log("PendingCodeAt") } @@ -349,7 +395,8 @@ func (tc TestClient) PendingCodeAt(ctx context.Context, account common.Address) return nil, nil } -func (tc TestClient) PendingCallContract(ctx context.Context, call ethereum.CallMsg) ([]byte, error) { +func (tc *TestClient) PendingCallContract(ctx context.Context, call ethereum.CallMsg) ([]byte, error) { + tc.incCounter("PendingCallContract") if tc.traceAPICalls { tc.t.Log("PendingCallContract") } @@ -357,7 +404,8 @@ func (tc TestClient) PendingCallContract(ctx context.Context, call ethereum.Call return nil, nil } -func (tc TestClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { +func (tc *TestClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { + tc.incCounter("PendingNonceAt") if tc.traceAPICalls { tc.t.Log("PendingNonceAt") } @@ -365,7 +413,8 @@ func (tc TestClient) PendingNonceAt(ctx context.Context, account common.Address) return 0, nil } -func (tc TestClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { +func (tc *TestClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { + tc.incCounter("SuggestGasPrice") if tc.traceAPICalls { tc.t.Log("SuggestGasPrice") } @@ -373,7 +422,8 @@ func (tc TestClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { return nil, nil } -func (tc TestClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { +func (tc *TestClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { + tc.incCounter("SendTransaction") if tc.traceAPICalls { tc.t.Log("SendTransaction") } @@ -381,7 +431,8 @@ func (tc TestClient) SendTransaction(ctx context.Context, tx *types.Transaction) return nil } -func (tc TestClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { +func (tc *TestClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { + tc.incCounter("SuggestGasTipCap") if tc.traceAPICalls { tc.t.Log("SuggestGasTipCap") } @@ -389,7 +440,8 @@ func (tc TestClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { return nil, nil } -func (tc TestClient) BatchCallContextIgnoringLocalHandlers(ctx context.Context, b []rpc.BatchElem) error { +func (tc *TestClient) BatchCallContextIgnoringLocalHandlers(ctx context.Context, b []rpc.BatchElem) error { + tc.incCounter("BatchCallContextIgnoringLocalHandlers") if tc.traceAPICalls { tc.t.Log("BatchCallContextIgnoringLocalHandlers") } @@ -397,7 +449,8 @@ func (tc TestClient) BatchCallContextIgnoringLocalHandlers(ctx context.Context, return nil } -func (tc TestClient) CallContextIgnoringLocalHandlers(ctx context.Context, result interface{}, method string, args ...interface{}) error { +func (tc *TestClient) CallContextIgnoringLocalHandlers(ctx context.Context, result interface{}, method string, args ...interface{}) error { + tc.incCounter("CallContextIgnoringLocalHandlers") if tc.traceAPICalls { tc.t.Log("CallContextIgnoringLocalHandlers") } @@ -405,7 +458,8 @@ func (tc TestClient) CallContextIgnoringLocalHandlers(ctx context.Context, resul return nil } -func (tc TestClient) CallRaw(data string) string { +func (tc *TestClient) CallRaw(data string) string { + tc.incCounter("CallRaw") if tc.traceAPICalls { tc.t.Log("CallRaw") } @@ -413,11 +467,12 @@ func (tc TestClient) CallRaw(data string) string { return "" } -func (tc TestClient) GetChainID() *big.Int { +func (tc *TestClient) GetChainID() *big.Int { return big.NewInt(1) } -func (tc TestClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { +func (tc *TestClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { + tc.incCounter("SubscribeFilterLogs") if tc.traceAPICalls { tc.t.Log("SubscribeFilterLogs") } @@ -425,7 +480,8 @@ func (tc TestClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQ return nil, nil } -func (tc TestClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { +func (tc *TestClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + tc.incCounter("TransactionReceipt") if tc.traceAPICalls { tc.t.Log("TransactionReceipt") } @@ -433,7 +489,8 @@ func (tc TestClient) TransactionReceipt(ctx context.Context, txHash common.Hash) return nil, nil } -func (tc TestClient) TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) { +func (tc *TestClient) TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) { + tc.incCounter("TransactionByHash") if tc.traceAPICalls { tc.t.Log("TransactionByHash") } @@ -441,20 +498,21 @@ func (tc TestClient) TransactionByHash(ctx context.Context, txHash common.Hash) return nil, false, nil } -func (tc TestClient) BlockNumber(ctx context.Context) (uint64, error) { +func (tc *TestClient) BlockNumber(ctx context.Context) (uint64, error) { + tc.incCounter("BlockNumber") if tc.traceAPICalls { tc.t.Log("BlockNumber") } return 0, nil } -func (tc TestClient) SetIsConnected(value bool) { +func (tc *TestClient) SetIsConnected(value bool) { if tc.traceAPICalls { tc.t.Log("SetIsConnected") } } -func (tc TestClient) GetIsConnected() bool { +func (tc *TestClient) GetIsConnected() bool { if tc.traceAPICalls { tc.t.Log("GetIsConnected") } @@ -478,6 +536,7 @@ type findBlockCase struct { outgoingERC20Transfers []testERC20Transfer incomingERC20Transfers []testERC20Transfer label string + expectedCalls map[string]int } func transferInEachBlock() [][]int { @@ -505,12 +564,26 @@ func getCases() []findBlockCase { }, toBlock: 100, expectedBlocksFound: 6, + expectedCalls: map[string]int{ + "BalanceAt": 27, + //TODO(rasom) NonceAt is flaky, sometimes it's called 18 times, sometimes 17 + //to be investigated + //"NonceAt": 18, + "FilterLogs": 10, + "HeaderByNumber": 5, + }, } case100transfers := findBlockCase{ balanceChanges: transferInEachBlock(), toBlock: 100, expectedBlocksFound: 100, + expectedCalls: map[string]int{ + "BalanceAt": 101, + "NonceAt": 0, + "FilterLogs": 10, + "HeaderByNumber": 100, + }, } case3 := findBlockCase{ @@ -574,6 +647,10 @@ func getCases() []findBlockCase { {big.NewInt(80), tokenTXXAddress, big.NewInt(1)}, {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, }, + expectedCalls: map[string]int{ + "FilterLogs": 3, + "CallContract": 3, + }, } cases = append(cases, case1) @@ -586,12 +663,13 @@ func getCases() []findBlockCase { cases = append(cases, case7emptyHistoryWithOneERC20Transfer) cases = append(cases, case8emptyHistoryWithERC20Transfers) - //cases = append([]findBlockCase{}, case8emptyHistoryWithERC20Transfers) + //cases = append([]findBlockCase{}, case1) return cases } var tokenTXXAddress = common.HexToAddress("0x53211") +var tokenTXZAddress = common.HexToAddress("0x73211") func TestFindBlocksCommand(t *testing.T) { for idx, testCase := range getCases() { @@ -609,6 +687,7 @@ func TestFindBlocksCommand(t *testing.T) { balances: testCase.balanceChanges, outgoingERC20Transfers: testCase.outgoingERC20Transfers, incomingERC20Transfers: testCase.incomingERC20Transfers, + callsCounter: map[string]int{}, } //tc.traceAPICalls = true //tc.printPreparedData = true @@ -668,6 +747,14 @@ func TestFindBlocksCommand(t *testing.T) { numbers = append(numbers, block.Number.Int64()) } + if tc.traceAPICalls { + tc.printCounter() + } + + for name, cnt := range testCase.expectedCalls { + require.Equal(t, cnt, tc.callsCounter[name], "calls to "+name) + } + sort.Slice(numbers, func(i, j int) bool { return numbers[i] < numbers[j] }) require.Equal(t, testCase.expectedBlocksFound, len(foundBlocks), testCase.label, "found blocks", numbers) } diff --git a/services/wallet/transfer/downloader.go b/services/wallet/transfer/downloader.go index d6ff904da..a87b29970 100644 --- a/services/wallet/transfer/downloader.go +++ b/services/wallet/transfer/downloader.go @@ -236,14 +236,15 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc } // NewERC20TransfersDownloader returns new instance. -func NewERC20TransfersDownloader(client chain.ClientInterface, accounts []common.Address, signer types.Signer) *ERC20TransfersDownloader { +func NewERC20TransfersDownloader(client chain.ClientInterface, accounts []common.Address, signer types.Signer, incomingOnly bool) *ERC20TransfersDownloader { signature := w_common.GetEventSignatureHash(w_common.Erc20_721TransferEventSignature) return &ERC20TransfersDownloader{ - client: client, - accounts: accounts, - signature: signature, - signer: signer, + client: client, + accounts: accounts, + incomingOnly: incomingOnly, + signature: signature, + signer: signer, } } @@ -253,8 +254,9 @@ func NewERC20TransfersDownloader(client chain.ClientInterface, accounts []common // database gets implemented, differentiation between erc20 and erc721 will handled // in the controller. type ERC20TransfersDownloader struct { - client chain.ClientInterface - accounts []common.Address + client chain.ClientInterface + accounts []common.Address + incomingOnly bool // hash of the Transfer event signature signature common.Hash @@ -417,14 +419,18 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro log.Debug("get erc20 transfers in range start", "chainID", d.client.NetworkID(), "from", from, "to", to) headers := []*DBHeader{} ctx := context.Background() + var err error for _, address := range d.accounts { - outbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ - FromBlock: from, - ToBlock: to, - Topics: d.outboundTopics(address), - }) - if err != nil { - return nil, err + outbound := []types.Log{} + if !d.incomingOnly { + outbound, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{ + FromBlock: from, + ToBlock: to, + Topics: d.outboundTopics(address), + }) + if err != nil { + return nil, err + } } inbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ FromBlock: from, diff --git a/services/wallet/transfer/reactor.go b/services/wallet/transfer/reactor.go index 92af9474e..f3d868d41 100644 --- a/services/wallet/transfer/reactor.go +++ b/services/wallet/transfer/reactor.go @@ -103,7 +103,7 @@ func (s *OnDemandFetchStrategy) newControlCommand(chainClient chain.ClientInterf signer: signer, db: s.db, }, - erc20: NewERC20TransfersDownloader(chainClient, accounts, signer), + erc20: NewERC20TransfersDownloader(chainClient, accounts, signer, false), feed: s.feed, errorsCount: 0, transactionManager: s.transactionManager,