From 382fcde74e74e914eed63804fde00d906c6f0b62 Mon Sep 17 00:00:00 2001 From: Roman Volosovskyi Date: Wed, 20 Sep 2023 10:41:23 +0200 Subject: [PATCH] Scanning of ERC20 tail of transfers history --- contracts/contracts.go | 19 +- contracts/ethscan/address.go | 34 +- rpc/chain/client.go | 27 + rpc/client.go | 25 +- services/wallet/history/service.go | 6 +- services/wallet/reader.go | 2 +- services/wallet/token/token.go | 114 ++-- services/wallet/transfer/bridge_identifier.go | 2 +- services/wallet/transfer/commands.go | 28 +- .../wallet/transfer/commands_sequential.go | 223 +++++-- .../transfer/commands_sequential_test.go | 624 +++++++++++++++--- services/wallet/transfer/controller.go | 2 +- services/wallet/transfer/downloader.go | 8 +- services/wallet/transfer/iterative.go | 4 +- services/wallet/transfer/reactor.go | 16 +- .../transfer/sequential_fetch_strategy.go | 6 +- services/wallet/transfer/swap_identifier.go | 18 +- 17 files changed, 927 insertions(+), 231 deletions(-) diff --git a/contracts/contracts.go b/contracts/contracts.go index 6f18acb94..d1cd11aaf 100644 --- a/contracts/contracts.go +++ b/contracts/contracts.go @@ -165,29 +165,36 @@ func (c *ContractMaker) NewDirectory(chainID uint64) (*directory.Directory, erro ) } -func (c *ContractMaker) NewEthScan(chainID uint64) (*ethscan.BalanceScanner, error) { +func (c *ContractMaker) NewEthScan(chainID uint64) (*ethscan.BalanceScanner, uint, error) { contractAddr, err := ethscan.ContractAddress(chainID) if err != nil { - return nil, err + return nil, 0, err + } + + contractCreatedAt, err := ethscan.ContractCreatedAt(chainID) + if err != nil { + return nil, 0, err } backend, err := c.RPCClient.EthClient(chainID) if err != nil { - return nil, err + return nil, 0, err } bytecode, err := backend.CodeAt(context.Background(), contractAddr, nil) if err != nil { - return nil, err + return nil, 0, err } if len(bytecode) == 0 { - return nil, errors.New("is not a contract") + return nil, 0, errors.New("is not a contract") } - return ethscan.NewBalanceScanner( + scanner, err := ethscan.NewBalanceScanner( contractAddr, backend, ) + + return scanner, contractCreatedAt, err } func (c *ContractMaker) NewHopL2SaddlSwap(chainID uint64, symbol string) (*hopSwap.HopSwap, error) { diff --git a/contracts/ethscan/address.go b/contracts/ethscan/address.go index 5da07c2fb..6065edfa9 100644 --- a/contracts/ethscan/address.go +++ b/contracts/ethscan/address.go @@ -8,20 +8,34 @@ import ( var errorNotAvailableOnChainID = errors.New("not available for chainID") -var contractAddressByChainID = map[uint64]common.Address{ - 1: common.HexToAddress("0x08A8fDBddc160A7d5b957256b903dCAb1aE512C5"), // mainnet - 5: common.HexToAddress("0x08A8fDBddc160A7d5b957256b903dCAb1aE512C5"), // goerli - 10: common.HexToAddress("0x9e5076df494fc949abc4461f4e57592b81517d81"), // optimism - 420: common.HexToAddress("0xf532c75239fa61b66d31e73f44300c46da41aadd"), // goerli optimism - 42161: common.HexToAddress("0xbb85398092b83a016935a17fc857507b7851a071"), // arbitrum - 421613: common.HexToAddress("0xec21ebe1918e8975fc0cd0c7747d318c00c0acd5"), // goerli arbitrum - 11155111: common.HexToAddress("0xec21ebe1918e8975fc0cd0c7747d318c00c0acd5"), // sepolia +type ContractData struct { + Address common.Address + CreatedAtBlock uint +} + +var contractDataByChainID = map[uint64]ContractData{ + 1: {common.HexToAddress("0x08A8fDBddc160A7d5b957256b903dCAb1aE512C5"), 12_194_222}, // mainnet + 5: {common.HexToAddress("0x08A8fDBddc160A7d5b957256b903dCAb1aE512C5"), 4_578_854}, // goerli + 10: {common.HexToAddress("0x9e5076df494fc949abc4461f4e57592b81517d81"), 34_421_097}, // optimism + 420: {common.HexToAddress("0xf532c75239fa61b66d31e73f44300c46da41aadd"), 2_236_534}, // goerli optimism + 42161: {common.HexToAddress("0xbb85398092b83a016935a17fc857507b7851a071"), 70_031_945}, // arbitrum + 421613: {common.HexToAddress("0xec21ebe1918e8975fc0cd0c7747d318c00c0acd5"), 818_155}, // goerli arbitrum + 777333: {common.HexToAddress("0x0000000000000000000000000000000000777333"), 50}, // unit tests + 11155111: {common.HexToAddress("0xec21ebe1918e8975fc0cd0c7747d318c00c0acd5"), 4_366_506}, // sepolia } func ContractAddress(chainID uint64) (common.Address, error) { - addr, exists := contractAddressByChainID[chainID] + contract, exists := contractDataByChainID[chainID] if !exists { return *new(common.Address), errorNotAvailableOnChainID } - return addr, nil + return contract.Address, nil +} + +func ContractCreatedAt(chainID uint64) (uint, error) { + contract, exists := contractDataByChainID[chainID] + if !exists { + return 0, errorNotAvailableOnChainID + } + return contract.CreatedAtBlock, nil } diff --git a/rpc/chain/client.go b/rpc/chain/client.go index 8165eac63..a7ff87fdd 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -42,6 +42,19 @@ type ClientInterface interface { GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) NetworkID() uint64 ToBigInt() *big.Int + CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) + CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) + CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error + GetWalletNotifier() func(chainId uint64, message string) + SetWalletNotifier(notifier func(chainId uint64, message string)) + TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) + TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) + BlockNumber(ctx context.Context) (uint64, error) + SetIsConnected(value bool) + GetIsConnected() bool + bind.ContractCaller + bind.ContractTransactor + bind.ContractFilterer } type ClientWithFallback struct { @@ -169,6 +182,12 @@ func (c *ClientWithFallback) SetIsConnected(value bool) { } } +func (c *ClientWithFallback) GetIsConnected() bool { + c.IsConnectedLock.RLock() + defer c.IsConnectedLock.RUnlock() + return c.IsConnected +} + func (c *ClientWithFallback) makeCallNoReturn(main func() error, fallback func() error) error { resultChan := make(chan CommandResult, 1) c.LastCheckedAt = time.Now().Unix() @@ -877,3 +896,11 @@ func (c *ClientWithFallback) FullTransactionByBlockNumberAndIndex(ctx context.Co return tx.(*FullTransaction), nil } + +func (c *ClientWithFallback) GetWalletNotifier() func(chainId uint64, message string) { + return c.WalletNotifier +} + +func (c *ClientWithFallback) SetWalletNotifier(notifier func(chainId uint64, message string)) { + c.WalletNotifier = notifier +} diff --git a/rpc/client.go b/rpc/client.go index 499d76339..c219dcdca 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -48,9 +48,9 @@ type Client struct { UpstreamChainID uint64 local *gethrpc.Client - upstream *chain.ClientWithFallback + upstream chain.ClientInterface rpcClientsMutex sync.RWMutex - rpcClients map[uint64]*chain.ClientWithFallback + rpcClients map[uint64]chain.ClientInterface router *router NetworkManager *network.Manager @@ -84,7 +84,7 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U local: client, NetworkManager: networkManager, handlers: make(map[string]Handler), - rpcClients: make(map[uint64]*chain.ClientWithFallback), + rpcClients: make(map[uint64]chain.ClientInterface), log: log, } @@ -112,12 +112,12 @@ func (c *Client) SetWalletNotifier(notifier func(chainID uint64, message string) c.walletNotifier = notifier } -func (c *Client) getClientUsingCache(chainID uint64) (*chain.ClientWithFallback, error) { +func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, error) { c.rpcClientsMutex.Lock() defer c.rpcClientsMutex.Unlock() if rpcClient, ok := c.rpcClients[chainID]; ok { - if rpcClient.WalletNotifier == nil { - rpcClient.WalletNotifier = c.walletNotifier + if rpcClient.GetWalletNotifier() == nil { + rpcClient.SetWalletNotifier(c.walletNotifier) } return rpcClient, nil } @@ -150,7 +150,7 @@ func (c *Client) getClientUsingCache(chainID uint64) (*chain.ClientWithFallback, } // Ethclient returns ethclient.Client per chain -func (c *Client) EthClient(chainID uint64) (*chain.ClientWithFallback, error) { +func (c *Client) EthClient(chainID uint64) (chain.ClientInterface, error) { client, err := c.getClientUsingCache(chainID) if err != nil { return nil, err @@ -169,8 +169,8 @@ func (c *Client) AbstractEthClient(chainID common.ChainID) (chain.BatchCallClien return client, nil } -func (c *Client) EthClients(chainIDs []uint64) (map[uint64]*chain.ClientWithFallback, error) { - clients := make(map[uint64]*chain.ClientWithFallback, 0) +func (c *Client) EthClients(chainIDs []uint64) (map[uint64]chain.ClientInterface, error) { + clients := make(map[uint64]chain.ClientInterface, 0) for _, chainID := range chainIDs { client, err := c.getClientUsingCache(chainID) if err != nil { @@ -182,6 +182,13 @@ func (c *Client) EthClients(chainIDs []uint64) (map[uint64]*chain.ClientWithFall return clients, nil } +// SetClient strictly for testing purposes +func (c *Client) SetClient(chainID uint64, client chain.ClientInterface) { + c.rpcClientsMutex.Lock() + defer c.rpcClientsMutex.Unlock() + c.rpcClients[chainID] = client +} + // UpdateUpstreamURL changes the upstream RPC client URL, if the upstream is enabled. func (c *Client) UpdateUpstreamURL(url string) error { if c.upstream == nil { diff --git a/services/wallet/history/service.go b/services/wallet/history/service.go index 5d0e3531c..a248fe05e 100644 --- a/services/wallet/history/service.go +++ b/services/wallet/history/service.go @@ -150,7 +150,7 @@ func (s *Service) isTokenVisible(tokenSymbol string) bool { // Native token implementation of DataSource interface type chainClientSource struct { - chainClient *chain.ClientWithFallback + chainClient chain.ClientInterface currency string } @@ -163,7 +163,7 @@ func (src *chainClientSource) BalanceAt(ctx context.Context, account common.Addr } func (src *chainClientSource) ChainID() uint64 { - return src.chainClient.ChainID + return src.chainClient.NetworkID() } func (src *chainClientSource) Currency() string { @@ -184,7 +184,7 @@ type tokenChainClientSource struct { } func (src *tokenChainClientSource) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { - network := src.NetworkManager.Find(src.chainClient.ChainID) + network := src.NetworkManager.Find(src.chainClient.NetworkID()) if network == nil { return nil, errors.New("network not found") } diff --git a/services/wallet/reader.go b/services/wallet/reader.go index c602070a5..ba7d85568 100644 --- a/services/wallet/reader.go +++ b/services/wallet/reader.go @@ -237,7 +237,7 @@ func (r *Reader) GetWalletToken(ctx context.Context, addresses []common.Address) } hasError := false if client, ok := clients[token.ChainID]; ok { - hasError = err != nil || !client.IsConnected + hasError = err != nil || !client.GetIsConnected() } if !anyPositiveBalance { anyPositiveBalance = balance.Cmp(big.NewFloat(0.0)) > 0 diff --git a/services/wallet/token/token.go b/services/wallet/token/token.go index 9507d3c57..8c718417e 100644 --- a/services/wallet/token/token.go +++ b/services/wallet/token/token.go @@ -14,6 +14,7 @@ import ( "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/contracts" + "github.com/status-im/status-go/contracts/ethscan" "github.com/status-im/status-go/contracts/ierc20" "github.com/status-im/status-go/params" "github.com/status-im/status-go/rpc" @@ -153,7 +154,7 @@ func (tm *Manager) getAddressTokenMap(chainID uint64) (addressTokenMap, bool) { return tokenMap, chainPresent } -func (tm *Manager) setTokens(tokens []*Token) { +func (tm *Manager) SetTokens(tokens []*Token) { tm.tokenLock.Lock() defer tm.tokenLock.Unlock() @@ -187,7 +188,7 @@ func (tm *Manager) fetchTokens() { tokenList = mergeTokenLists([][]*Token{tokenList, validTokens}) } - tm.setTokens(tokenList) + tm.SetTokens(tokenList) } func (tm *Manager) getFullTokenList(chainID uint64) []*Token { @@ -577,7 +578,7 @@ func (tm *Manager) DeleteCustom(chainID uint64, address common.Address) error { return err } -func (tm *Manager) GetTokenBalance(ctx context.Context, client *chain.ClientWithFallback, account common.Address, token common.Address) (*big.Int, error) { +func (tm *Manager) GetTokenBalance(ctx context.Context, client chain.ClientInterface, account common.Address, token common.Address) (*big.Int, error) { caller, err := ierc20.NewIERC20Caller(token, client) if err != nil { return nil, err @@ -588,23 +589,32 @@ func (tm *Manager) GetTokenBalance(ctx context.Context, client *chain.ClientWith }, account) } -func (tm *Manager) GetTokenBalanceAt(ctx context.Context, client *chain.ClientWithFallback, account common.Address, token common.Address, blockNumber *big.Int) (*big.Int, error) { +func (tm *Manager) GetTokenBalanceAt(ctx context.Context, client chain.ClientInterface, account common.Address, token common.Address, blockNumber *big.Int) (*big.Int, error) { caller, err := ierc20.NewIERC20Caller(token, client) if err != nil { return nil, err } - return caller.BalanceOf(&bind.CallOpts{ + balance, err := caller.BalanceOf(&bind.CallOpts{ Context: ctx, BlockNumber: blockNumber, }, account) + + if err != nil { + if err != bind.ErrNoCode { + return nil, err + } + balance = big.NewInt(0) + } + + return balance, nil } -func (tm *Manager) GetChainBalance(ctx context.Context, client *chain.ClientWithFallback, account common.Address) (*big.Int, error) { +func (tm *Manager) GetChainBalance(ctx context.Context, client chain.ClientInterface, account common.Address) (*big.Int, error) { return client.BalanceAt(ctx, account, nil) } -func (tm *Manager) GetBalance(ctx context.Context, client *chain.ClientWithFallback, account common.Address, token common.Address) (*big.Int, error) { +func (tm *Manager) GetBalance(ctx context.Context, client chain.ClientInterface, account common.Address, token common.Address) (*big.Int, error) { if token == nativeChainAddress { return tm.GetChainBalance(ctx, client, account) } @@ -612,7 +622,7 @@ func (tm *Manager) GetBalance(ctx context.Context, client *chain.ClientWithFallb return tm.GetTokenBalance(ctx, client, account, token) } -func (tm *Manager) GetBalances(parent context.Context, clients map[uint64]*chain.ClientWithFallback, accounts, tokens []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) { +func (tm *Manager) GetBalances(parent context.Context, clients map[uint64]chain.ClientInterface, accounts, tokens []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) { var ( group = async.NewAtomicGroup(parent) mu sync.Mutex @@ -638,7 +648,7 @@ func (tm *Manager) GetBalances(parent context.Context, clients map[uint64]*chain for clientIdx := range clients { client := clients[clientIdx] - ethScanContract, err := tm.contractMaker.NewEthScan(client.ChainID) + ethScanContract, _, err := tm.contractMaker.NewEthScan(client.NetworkID()) if err == nil { fetchChainBalance := false @@ -666,7 +676,7 @@ func (tm *Manager) GetBalances(parent context.Context, clients map[uint64]*chain Context: ctx, }, accounts) if err != nil { - log.Error("can't fetch chain balance", err) + log.Error("can't fetch chain balance 2", err) return nil } for idx, account := range accounts { @@ -690,7 +700,7 @@ func (tm *Manager) GetBalances(parent context.Context, clients map[uint64]*chain Context: ctx, }, account, chunk) if err != nil { - log.Error("can't fetch erc20 token balance", "account", account, "error", err) + log.Error("can't fetch erc20 token balance 3", "account", account, "error", err) return nil } @@ -713,7 +723,7 @@ func (tm *Manager) GetBalances(parent context.Context, clients map[uint64]*chain account := accounts[accountIdx] token := tokens[tokenIdx] client := clients[clientIdx] - if !tm.inStore(token, client.ChainID) { + if !tm.inStore(token, client.NetworkID()) { continue } group.Add(func(parent context.Context) error { @@ -722,7 +732,7 @@ func (tm *Manager) GetBalances(parent context.Context, clients map[uint64]*chain balance, err := tm.GetBalance(ctx, client, account, token) if err != nil { - log.Error("can't fetch erc20 token balance", "account", account, "token", token, "error", err) + log.Error("can't fetch erc20 token balance 4", "account", account, "token", token, "error", err) return nil } @@ -742,7 +752,11 @@ func (tm *Manager) GetBalances(parent context.Context, clients map[uint64]*chain return response, group.Error() } -func (tm *Manager) GetBalancesByChain(parent context.Context, clients map[uint64]*chain.ClientWithFallback, accounts, tokens []common.Address) (map[uint64]map[common.Address]map[common.Address]*hexutil.Big, error) { +func (tm *Manager) GetBalancesByChain(parent context.Context, clients map[uint64]chain.ClientInterface, accounts, tokens []common.Address) (map[uint64]map[common.Address]map[common.Address]*hexutil.Big, error) { + return tm.GetBalancesAtByChain(parent, clients, accounts, tokens, nil) +} + +func (tm *Manager) GetBalancesAtByChain(parent context.Context, clients map[uint64]chain.ClientInterface, accounts, tokens []common.Address, atBlocks map[uint64]*big.Int) (map[uint64]map[common.Address]map[common.Address]*hexutil.Big, error) { var ( group = async.NewAtomicGroup(parent) mu sync.Mutex @@ -769,14 +783,15 @@ func (tm *Manager) GetBalancesByChain(parent context.Context, clients map[uint64 mu.Unlock() } - for clientIdx := range clients { - client := clients[clientIdx] - ethScanContract, err := tm.contractMaker.NewEthScan(client.ChainID) + for _, client := range clients { + ethScanContract, availableAtBlock, err := tm.contractMaker.NewEthScan(client.NetworkID()) if err != nil { log.Error("error scanning contract", "err", err) return nil, err } + atBlock := atBlocks[client.NetworkID()] + fetchChainBalance := false var tokenChunks [][]common.Address chunkSize := 500 @@ -799,16 +814,17 @@ func (tm *Manager) GetBalancesByChain(parent context.Context, clients map[uint64 ctx, cancel := context.WithTimeout(parent, requestTimeout) defer cancel() res, err := ethScanContract.EtherBalances(&bind.CallOpts{ - Context: ctx, + Context: ctx, + BlockNumber: atBlock, }, accounts) if err != nil { - log.Error("can't fetch chain balance", err) + log.Error("can't fetch chain balance 5", err) return nil } for idx, account := range accounts { balance := new(big.Int) balance.SetBytes(res[idx].Data) - updateBalance(client.ChainID, account, common.HexToAddress("0x"), balance) + updateBalance(client.NetworkID(), account, common.HexToAddress("0x"), balance) } return nil @@ -822,28 +838,46 @@ func (tm *Manager) GetBalancesByChain(parent context.Context, clients map[uint64 group.Add(func(parent context.Context) error { ctx, cancel := context.WithTimeout(parent, requestTimeout) defer cancel() - res, err := ethScanContract.TokensBalance(&bind.CallOpts{ - Context: ctx, - }, account, chunk) - if err != nil { - log.Error("can't fetch erc20 token balance", "account", account, "error", err) - return nil - } - - if len(res) != len(chunk) { - log.Error("can't fetch erc20 token balance", "account", account, "error response not complete") - return nil - } - - for idx, token := range chunk { - - if !res[idx].Success { - continue + var res []ethscan.BalanceScannerResult + if atBlock == nil || big.NewInt(int64(availableAtBlock)).Cmp(atBlock) < 0 { + res, err = ethScanContract.TokensBalance(&bind.CallOpts{ + Context: ctx, + BlockNumber: atBlock, + }, account, chunk) + if err != nil { + log.Error("can't fetch erc20 token balance 6", "account", account, "error", err) + return nil } - balance := new(big.Int) - balance.SetBytes(res[idx].Data) - updateBalance(client.ChainID, account, token, balance) + + if len(res) != len(chunk) { + log.Error("can't fetch erc20 token balance 7", "account", account, "error response not complete") + return nil + } + + for idx, token := range chunk { + + if !res[idx].Success { + continue + } + balance := new(big.Int) + balance.SetBytes(res[idx].Data) + updateBalance(client.NetworkID(), account, token, balance) + } + return nil } + + for _, token := range chunk { + balance, err := tm.GetTokenBalanceAt(ctx, client, account, token, atBlock) + if err != nil { + if err != bind.ErrNoCode { + log.Error("can't fetch erc20 token balance 8", "account", account, "token", token, "error on fetching token balance") + + return nil + } + } + updateBalance(client.NetworkID(), account, token, balance) + } + return nil }) } diff --git a/services/wallet/transfer/bridge_identifier.go b/services/wallet/transfer/bridge_identifier.go index 91efd2ca5..72d2e8a78 100644 --- a/services/wallet/transfer/bridge_identifier.go +++ b/services/wallet/transfer/bridge_identifier.go @@ -131,7 +131,7 @@ func upsertHopBridgeDestinationTx(ctx context.Context, transactionManager *Trans return multiTx, nil } -func buildHopBridgeMultitransaction(ctx context.Context, client *chain.ClientWithFallback, transactionManager *TransactionManager, tokenManager *token.Manager, subTx *Transfer) (*MultiTransaction, error) { +func buildHopBridgeMultitransaction(ctx context.Context, client chain.ClientInterface, transactionManager *TransactionManager, tokenManager *token.Manager, subTx *Transfer) (*MultiTransaction, error) { // Identify if it's from/to transaction switch w_common.GetEventType(subTx.Log) { case w_common.HopBridgeTransferSentToL2EventType: diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index 6c39ccd9e..3b0847d3d 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -77,7 +77,7 @@ func (c *ethHistoricalCommand) Command() async.Command { } func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { - log.Info("eth historical downloader start", "chainID", c.chainClient.NetworkID(), "address", c.address, + log.Debug("eth historical downloader start", "chainID", c.chainClient.NetworkID(), "address", c.address, "from", c.from.Number, "to", c.to, "noLimit", c.noLimit) start := time.Now() @@ -101,7 +101,7 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) { c.resultingFrom = from c.startBlock = startBlock - log.Info("eth historical downloader finished successfully", "chain", c.chainClient.NetworkID(), + log.Debug("eth historical downloader finished successfully", "chain", c.chainClient.NetworkID(), "address", c.address, "from", from, "to", c.to, "total blocks", len(headers), "time", time.Since(start)) return nil @@ -147,7 +147,7 @@ func getErc20BatchSize(chainID uint64) *big.Int { } func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { - log.Info("wallet historical downloader for erc20 transfers start", "chainID", c.chainClient.NetworkID(), "address", c.address, + log.Debug("wallet historical downloader for erc20 transfers start", "chainID", c.chainClient.NetworkID(), "address", c.address, "from", c.from, "to", c.to) start := time.Now() @@ -168,7 +168,7 @@ func (c *erc20HistoricalCommand) Run(ctx context.Context) (err error) { } c.foundHeaders = append(c.foundHeaders, headers...) } - log.Info("wallet historical downloader for erc20 transfers finished", "chainID", c.chainClient.NetworkID(), "address", c.address, + log.Debug("wallet historical downloader for erc20 transfers finished", "chainID", c.chainClient.NetworkID(), "address", c.address, "from", c.from, "to", c.to, "time", time.Since(start), "headers", len(c.foundHeaders)) return nil } @@ -183,7 +183,7 @@ type controlCommand struct { blockDAO *BlockDAO eth *ETHDownloader erc20 *ERC20TransfersDownloader - chainClient *chain.ClientWithFallback + chainClient chain.ClientInterface feed *event.Feed errorsCount int nonArchivalRPCNode bool @@ -365,7 +365,7 @@ type transfersCommand struct { eth *ETHDownloader blockNums []*big.Int address common.Address - chainClient *chain.ClientWithFallback + chainClient chain.ClientInterface blocksLimit int transactionManager *TransactionManager pendingTxManager *transactions.PendingTxTracker @@ -625,7 +625,7 @@ type loadTransfersCommand struct { accounts []common.Address db *Database blockDAO *BlockDAO - chainClient *chain.ClientWithFallback + chainClient chain.ClientInterface blocksByAddress map[common.Address][]*big.Int transactionManager *TransactionManager pendingTxManager *transactions.PendingTxTracker @@ -655,7 +655,7 @@ type findAndCheckBlockRangeCommand struct { accounts []common.Address db *Database blockDAO *BlockDAO - chainClient *chain.ClientWithFallback + chainClient chain.ClientInterface balanceCacher balance.Cacher feed *event.Feed fromByAddress map[common.Address]*Block @@ -818,11 +818,11 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from } func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *BlockDAO, db *Database, - chainClient *chain.ClientWithFallback, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int, + chainClient chain.ClientInterface, blocksLimitPerAccount int, blocksByAddress map[common.Address][]*big.Int, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, feed *event.Feed) error { - log.Info("loadTransfers start", "accounts", accounts, "chain", chainClient.ChainID, "limit", blocksLimitPerAccount) + log.Info("loadTransfers start", "accounts", accounts, "chain", chainClient.NetworkID(), "limit", blocksLimitPerAccount) start := time.Now() group := async.NewGroup(ctx) @@ -852,7 +852,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *Blo case <-ctx.Done(): return ctx.Err() case <-group.WaitAsync(): - log.Info("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.ChainID) + log.Info("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.NetworkID()) return nil } } @@ -871,10 +871,10 @@ func getLowestFrom(chainID uint64, to *big.Int) *big.Int { } // Finds the latest range up to initialTo where the number of transactions is between 20 and 25 -func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client *chain.ClientWithFallback) (*big.Int, error) { +func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client chain.ClientInterface) (*big.Int, error) { log.Info("findFirstRange", "account", account, "initialTo", initialTo, "client", client) - from := getLowestFrom(client.ChainID, initialTo) + from := getLowestFrom(client.NetworkID(), initialTo) to := initialTo goal := uint64(20) @@ -930,7 +930,7 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In } // Finds the latest ranges up to initialTo where the number of transactions is between 20 and 25 -func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client *chain.ClientWithFallback) (map[common.Address]*big.Int, error) { +func findFirstRanges(c context.Context, accounts []common.Address, initialTo *big.Int, client chain.ClientInterface) (map[common.Address]*big.Int, error) { res := map[common.Address]*big.Int{} for _, address := range accounts { diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index 54d395c9e..8b9973359 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -66,22 +66,25 @@ func (c *findNewBlocksCommand) Run(parent context.Context) (err error) { // TODO NewFindBlocksCommand type findBlocksCommand struct { - account common.Address - db *Database - blockRangeDAO *BlockRangeSequentialDAO - chainClient chain.ClientInterface - balanceCacher balance.Cacher - feed *event.Feed - noLimit bool - transactionManager *TransactionManager - fromBlockNumber *big.Int - toBlockNumber *big.Int - blocksLoadedCh chan<- []*DBHeader + account common.Address + db *Database + blockRangeDAO *BlockRangeSequentialDAO + chainClient chain.ClientInterface + balanceCacher balance.Cacher + feed *event.Feed + noLimit bool + transactionManager *TransactionManager + tokenManager *token.Manager + fromBlockNumber *big.Int + toBlockNumber *big.Int + blocksLoadedCh chan<- []*DBHeader + defaultNodeBlockChunkSize int // Not to be set by the caller - resFromBlock *Block - startBlockNumber *big.Int - error error + resFromBlock *Block + startBlockNumber *big.Int + reachedETHHistoryStart bool + error error } func (c *findBlocksCommand) Command() async.Command { @@ -91,10 +94,114 @@ func (c *findBlocksCommand) Command() async.Command { }.Run } -func (c *findBlocksCommand) Run(parent context.Context) (err error) { - log.Debug("start findBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit) +func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, fromBlock, toBlock *big.Int, token common.Address) ([]*DBHeader, error) { + var err error + batchSize := getErc20BatchSize(c.chainClient.NetworkID()) + ranges := [][]*big.Int{{fromBlock, toBlock}} + foundHeaders := []*DBHeader{} + cache := map[int64]*big.Int{} + for { + nextRanges := [][]*big.Int{} + for _, blockRange := range ranges { + from, to := blockRange[0], blockRange[1] + fromBalance, ok := cache[from.Int64()] + if !ok { + fromBalance, err = c.tokenManager.GetTokenBalanceAt(parent, c.chainClient, c.account, token, from) + if err != nil { + return nil, err + } - rangeSize := big.NewInt(DefaultNodeBlockChunkSize) + if fromBalance == nil { + fromBalance = big.NewInt(0) + } + cache[from.Int64()] = fromBalance + } + + toBalance, ok := cache[to.Int64()] + if !ok { + toBalance, err = c.tokenManager.GetTokenBalanceAt(parent, c.chainClient, c.account, token, to) + if err != nil { + return nil, err + } + if toBalance == nil { + toBalance = big.NewInt(0) + } + cache[to.Int64()] = toBalance + } + + if fromBalance.Cmp(toBalance) != 0 { + diff := new(big.Int).Sub(to, from) + if diff.Cmp(batchSize) <= 0 { + headers, err := c.fastIndexErc20(parent, from, to) + if err != nil { + return nil, err + } + foundHeaders = append(foundHeaders, headers...) + + continue + } + + halfOfDiff := new(big.Int).Div(diff, big.NewInt(2)) + mid := new(big.Int).Add(from, halfOfDiff) + + nextRanges = append(nextRanges, []*big.Int{from, mid}) + nextRanges = append(nextRanges, []*big.Int{mid, to}) + } + } + + if len(nextRanges) == 0 { + break + } + + ranges = nextRanges + } + + return foundHeaders, nil +} + +func (c *findBlocksCommand) checkERC20Tail(parent context.Context) ([]*DBHeader, error) { + log.Debug("checkERC20Tail", "account", c.account, "to block", c.startBlockNumber, "from", c.resFromBlock.Number) + tokens, err := c.tokenManager.GetTokens(c.chainClient.NetworkID()) + if err != nil { + return nil, err + } + addresses := make([]common.Address, len(tokens)) + for i, token := range tokens { + addresses[i] = token.Address + } + + from := new(big.Int).Sub(c.resFromBlock.Number, big.NewInt(1)) + + clients := make(map[uint64]chain.ClientInterface, 1) + clients[c.chainClient.NetworkID()] = c.chainClient + atBlocks := make(map[uint64]*big.Int, 1) + atBlocks[c.chainClient.NetworkID()] = from + balances, err := c.tokenManager.GetBalancesAtByChain(parent, clients, []common.Address{c.account}, addresses, atBlocks) + if err != nil { + return nil, err + } + + headers := []*DBHeader{} + for token, balance := range balances[c.chainClient.NetworkID()][c.account] { + bigintBalance := big.NewInt(balance.ToInt().Int64()) + if bigintBalance.Cmp(big.NewInt(0)) <= 0 { + continue + } + result, err := c.ERC20ScanByBalance(parent, big.NewInt(0), from, token) + if err != nil { + return nil, err + } + + headers = append(headers, result...) + } + + return headers, nil +} + +func (c *findBlocksCommand) Run(parent context.Context) (err error) { + log.Debug("start findBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber) + + rangeSize := big.NewInt(int64(c.defaultNodeBlockChunkSize)) from, to := new(big.Int).Set(c.fromBlockNumber), new(big.Int).Set(c.toBlockNumber) @@ -104,7 +211,18 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) { } for { - headers, _ := c.checkRange(parent, from, to) + var headers []*DBHeader + if c.reachedETHHistoryStart { + if c.fromBlockNumber.Cmp(zero) == 0 && c.startBlockNumber != nil && c.startBlockNumber.Cmp(zero) == 1 { + headers, err = c.checkERC20Tail(parent) + if err != nil { + c.error = err + } + } + } else { + headers, _ = c.checkRange(parent, from, to) + } + if c.error != nil { log.Error("findBlocksCommand checkRange", "error", c.error, "account", c.account, "chain", c.chainClient.NetworkID(), "from", from, "to", to) @@ -126,17 +244,32 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) { c.blocksFound(headers) } + if c.reachedETHHistoryStart { + break + } + err = c.upsertBlockRange(&BlockRange{c.startBlockNumber, c.resFromBlock.Number, to}) if err != nil { break } - from, to = nextRange(c.resFromBlock.Number, c.fromBlockNumber) + if from.Cmp(to) == 0 { + break + } + + nextFrom, nextTo := nextRange(c.defaultNodeBlockChunkSize, c.resFromBlock.Number, c.fromBlockNumber) + + if nextFrom.Cmp(from) == 0 && nextTo.Cmp(to) == 0 { + break + } + + from = nextFrom + to = nextTo if to.Cmp(c.fromBlockNumber) <= 0 || (c.startBlockNumber != nil && c.startBlockNumber.Cmp(big.NewInt(0)) > 0 && to.Cmp(c.startBlockNumber) <= 0) { log.Debug("Checked all ranges, stop execution", "startBlock", c.startBlockNumber, "from", from, "to", to) - break + c.reachedETHHistoryStart = true } } @@ -319,18 +452,18 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber } func loadTransfersLoop(ctx context.Context, account common.Address, blockDAO *BlockDAO, db *Database, - chainClient *chain.ClientWithFallback, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, + chainClient chain.ClientInterface, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, feed *event.Feed, blocksLoadedCh <-chan []*DBHeader) { - log.Debug("loadTransfersLoop start", "chain", chainClient.ChainID, "account", account) + log.Debug("loadTransfersLoop start", "chain", chainClient.NetworkID(), "account", account) for { select { case <-ctx.Done(): - log.Info("loadTransfersLoop error", "chain", chainClient.ChainID, "account", account, "error", ctx.Err()) + log.Info("loadTransfersLoop error", "chain", chainClient.NetworkID(), "account", account, "error", ctx.Err()) return case dbHeaders := <-blocksLoadedCh: - log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.ChainID, "account", account, "headers", len(dbHeaders)) + log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.NetworkID(), "account", account, "headers", len(dbHeaders)) blockNums := make([]*big.Int, len(dbHeaders)) for i, dbHeader := range dbHeaders { @@ -347,7 +480,7 @@ func loadTransfersLoop(ctx context.Context, account common.Address, blockDAO *Bl } func newLoadBlocksAndTransfersCommand(account common.Address, db *Database, - blockDAO *BlockDAO, chainClient *chain.ClientWithFallback, feed *event.Feed, + blockDAO *BlockDAO, chainClient chain.ClientInterface, feed *event.Feed, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, balanceCacher balance.Cacher) *loadBlocksAndTransfersCommand { @@ -372,7 +505,7 @@ type loadBlocksAndTransfersCommand struct { db *Database blockRangeDAO *BlockRangeSequentialDAO blockDAO *BlockDAO - chainClient *chain.ClientWithFallback + chainClient chain.ClientInterface feed *event.Feed balanceCacher balance.Cacher errorsCount int @@ -455,17 +588,19 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, if !allHistoryLoaded { fbc := &findBlocksCommand{ - account: c.account, - db: c.db, - blockRangeDAO: c.blockRangeDAO, - chainClient: c.chainClient, - balanceCacher: c.balanceCacher, - feed: c.feed, - noLimit: false, - fromBlockNumber: big.NewInt(0), - toBlockNumber: to, - transactionManager: c.transactionManager, - blocksLoadedCh: blocksLoadedCh, + account: c.account, + db: c.db, + blockRangeDAO: c.blockRangeDAO, + chainClient: c.chainClient, + balanceCacher: c.balanceCacher, + feed: c.feed, + noLimit: false, + fromBlockNumber: big.NewInt(0), + toBlockNumber: to, + transactionManager: c.transactionManager, + tokenManager: c.tokenManager, + blocksLoadedCh: blocksLoadedCh, + defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, } group.Add(fbc.Command()) } else { @@ -501,6 +636,7 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Grou feed: c.feed, noLimit: false, transactionManager: c.transactionManager, + tokenManager: c.tokenManager, blocksLoadedCh: blocksLoadedCh, }, } @@ -583,15 +719,14 @@ func getHeadBlockNumber(parent context.Context, chainClient chain.ClientInterfac return head.Number, err } -func nextRange(from *big.Int, zeroBlockNumber *big.Int) (*big.Int, *big.Int) { - log.Debug("next range start", "from", from, "zeroBlockNumber", zeroBlockNumber) +func nextRange(maxRangeSize int, prevFrom, zeroBlockNumber *big.Int) (*big.Int, *big.Int) { + log.Debug("next range start", "from", prevFrom, "zeroBlockNumber", zeroBlockNumber) - rangeSize := big.NewInt(DefaultNodeBlockChunkSize) + rangeSize := big.NewInt(int64(maxRangeSize)) - to := new(big.Int).Sub(from, big.NewInt(1)) // it won't hit the cache, but we wont load the transfers twice - if to.Cmp(rangeSize) > 0 { - from.Sub(to, rangeSize) - } else { + to := big.NewInt(0).Set(prevFrom) + from := big.NewInt(0).Sub(to, rangeSize) + if from.Cmp(zeroBlockNumber) < 0 { from = new(big.Int).Set(zeroBlockNumber) } diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index aad0beb42..2bfb4c206 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -3,60 +3,243 @@ package transfer import ( "context" "math/big" + "sort" + "strings" "testing" "github.com/stretchr/testify/require" "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/accounts/abi" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common/hexutil" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/rpc" + "github.com/status-im/status-go/contracts/ethscan" + "github.com/status-im/status-go/contracts/ierc20" "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/t/helpers" + + "github.com/status-im/status-go/params" + statusRpc "github.com/status-im/status-go/rpc" + "github.com/status-im/status-go/rpc/network" + "github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/walletdatabase" ) type TestClient struct { t *testing.T // [][block, newBalance, nonceDiff] - balances [][]int - balanceHistory map[uint64]*big.Int - nonceHistory map[uint64]uint64 + balances [][]int + outgoingERC20Transfers []testERC20Transfer + incomingERC20Transfers []testERC20Transfer + balanceHistory map[uint64]*big.Int + tokenBalanceHistory map[common.Address]map[uint64]*big.Int + nonceHistory map[uint64]uint64 + traceAPICalls bool + printPreparedData bool } func (tc TestClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error { - tc.t.Log("BatchCallContext") + if tc.traceAPICalls { + tc.t.Log("BatchCallContext") + } return nil } func (tc TestClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { - tc.t.Log("HeaderByHash") + if tc.traceAPICalls { + tc.t.Log("HeaderByHash") + } return nil, nil } func (tc TestClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { - tc.t.Log("BlockByHash") + if tc.traceAPICalls { + tc.t.Log("BlockByHash") + } return nil, nil } func (tc TestClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { - tc.t.Log("BlockByNumber") + if tc.traceAPICalls { + tc.t.Log("BlockByNumber") + } return nil, nil } func (tc TestClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { nonce := tc.nonceHistory[blockNumber.Uint64()] - - tc.t.Log("NonceAt", blockNumber, "result:", nonce) + if tc.traceAPICalls { + tc.t.Log("NonceAt", blockNumber, "result:", nonce) + } return nonce, nil } func (tc TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { - tc.t.Log("FilterLogs") + if tc.traceAPICalls { + tc.t.Log("FilterLogs") + } + //checking only ERC20 for now + incomingAddress := q.Topics[len(q.Topics)-1] + allTransfers := tc.incomingERC20Transfers + if len(incomingAddress) == 0 { + allTransfers = tc.outgoingERC20Transfers + } + + logs := []types.Log{} + for _, transfer := range allTransfers { + if transfer.block.Cmp(q.FromBlock) >= 0 && transfer.block.Cmp(q.ToBlock) <= 0 { + logs = append(logs, types.Log{ + BlockNumber: transfer.block.Uint64(), + BlockHash: common.BigToHash(transfer.block), + }) + } + } + + return logs, nil +} + +func (tc TestClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + balance := tc.balanceHistory[blockNumber.Uint64()] + + if tc.traceAPICalls { + tc.t.Log("BalanceAt", blockNumber, "result:", balance) + } + return balance, nil +} + +func (tc TestClient) tokenBalanceAt(token common.Address, blockNumber *big.Int) *big.Int { + balance := tc.tokenBalanceHistory[token][blockNumber.Uint64()] + + if tc.traceAPICalls { + tc.t.Log("tokenBalanceAt", token, blockNumber, "result:", balance) + } + return balance +} + +func (tc *TestClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + if tc.traceAPICalls { + tc.t.Log("HeaderByNumber", number) + } + header := &types.Header{ + Number: number, + Time: 0, + } + + return header, nil +} + +func (tc TestClient) FullTransactionByBlockNumberAndIndex(ctx context.Context, blockNumber *big.Int, index uint) (*chain.FullTransaction, error) { + if tc.traceAPICalls { + tc.t.Log("FullTransactionByBlockNumberAndIndex") + } + blockHash := common.BigToHash(blockNumber) + tx := &chain.FullTransaction{ + Tx: &types.Transaction{}, + TxExtraInfo: chain.TxExtraInfo{ + BlockNumber: (*hexutil.Big)(big.NewInt(0)), + BlockHash: &blockHash, + }, + } + + return tx, nil +} + +func (tc TestClient) GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) { + if tc.traceAPICalls { + tc.t.Log("GetBaseFeeFromBloc") + } + return "", nil +} + +func (tc TestClient) NetworkID() uint64 { + return 777333 +} + +func (tc TestClient) ToBigInt() *big.Int { + if tc.traceAPICalls { + tc.t.Log("ToBigInt") + } + return nil +} + +var ethscanAddress = common.HexToAddress("0x0000000000000000000000000000000000777333") + +func (tc TestClient) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { + if tc.traceAPICalls { + tc.t.Log("CodeAt", contract, blockNumber) + } + + if ethscanAddress == contract { + return []byte{1}, nil + } + + return nil, nil +} + +func (tc TestClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { + if tc.traceAPICalls { + tc.t.Log("CallContract", call, blockNumber, call.To) + } + + if *call.To == ethscanAddress { + parsed, err := abi.JSON(strings.NewReader(ethscan.BalanceScannerABI)) + if err != nil { + return nil, err + } + method := parsed.Methods["tokensBalance"] + params := call.Data[len(method.ID):] + args, err := method.Inputs.Unpack(params) + + if err != nil { + tc.t.Log("ERROR on unpacking", err) + return nil, err + } + + tokens := args[1].([]common.Address) + balances := []*big.Int{} + for _, token := range tokens { + balances = append(balances, tc.tokenBalanceAt(token, blockNumber)) + } + results := []ethscan.BalanceScannerResult{} + for _, balance := range balances { + results = append(results, ethscan.BalanceScannerResult{ + Success: true, + Data: balance.Bytes(), + }) + } + + output, err := method.Outputs.Pack(results) + if err != nil { + tc.t.Log("ERROR on packing", err) + return nil, err + } + + return output, nil + } + + if *call.To == tokenTXXAddress { + balance := tc.tokenBalanceAt(tokenTXXAddress, blockNumber) + + parsed, err := abi.JSON(strings.NewReader(ierc20.IERC20ABI)) + if err != nil { + return nil, err + } + + method := parsed.Methods["balanceOf"] + output, err := method.Outputs.Pack(balance) + if err != nil { + tc.t.Log("ERROR on packing ERC20 balance", err) + return nil, err + } + + return output, nil + } + return nil, nil } @@ -82,66 +265,233 @@ func (tc *TestClient) prepareBalanceHistory(toBlock int) { currentNonce += change[2] } - tc.t.Log("=========================================") - tc.t.Log(tc.balanceHistory) - tc.t.Log(tc.nonceHistory) - tc.t.Log("=========================================") + if tc.printPreparedData { + tc.t.Log("========================================= ETH BALANCES") + tc.t.Log(tc.balanceHistory) + tc.t.Log(tc.nonceHistory) + tc.t.Log(tc.tokenBalanceHistory) + tc.t.Log("=========================================") + } } -func (tc TestClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { - balance := tc.balanceHistory[blockNumber.Uint64()] - - tc.t.Log("BalanceAt", blockNumber, "result:", balance) - return balance, nil -} - -func (tc *TestClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { - tc.t.Log("HeaderByNumber", number) - header := &types.Header{ - Number: number, - Time: 0, +func (tc *TestClient) prepareTokenBalanceHistory(toBlock int) { + transfersPerToken := map[common.Address][]testERC20Transfer{} + for _, transfer := range tc.outgoingERC20Transfers { + transfer.amount = new(big.Int).Neg(transfer.amount) + transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer) } - return header, nil -} - -func (tc TestClient) FullTransactionByBlockNumberAndIndex(ctx context.Context, blockNumber *big.Int, index uint) (*chain.FullTransaction, error) { - tc.t.Log("FullTransactionByBlockNumberAndIndex") - blockHash := common.BigToHash(blockNumber) - tx := &chain.FullTransaction{ - Tx: &types.Transaction{}, - TxExtraInfo: chain.TxExtraInfo{ - BlockNumber: (*hexutil.Big)(big.NewInt(0)), - BlockHash: &blockHash, - }, + for _, transfer := range tc.incomingERC20Transfers { + transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer) } - return tx, nil + tc.tokenBalanceHistory = map[common.Address]map[uint64]*big.Int{} + + for token, transfers := range transfersPerToken { + sort.Slice(transfers, func(i, j int) bool { + return transfers[i].block.Cmp(transfers[j].block) < 0 + }) + + currentBlock := uint64(0) + currentBalance := big.NewInt(0) + + tc.tokenBalanceHistory[token] = map[uint64]*big.Int{} + transfers = append(transfers, testERC20Transfer{big.NewInt(int64(toBlock + 1)), token, big.NewInt(0)}) + + for _, transfer := range transfers { + for blockN := currentBlock; blockN < transfer.block.Uint64(); blockN++ { + tc.tokenBalanceHistory[token][blockN] = new(big.Int).Set(currentBalance) + } + currentBlock = transfer.block.Uint64() + currentBalance = new(big.Int).Add(currentBalance, transfer.amount) + } + } + if tc.printPreparedData { + tc.t.Log("========================================= ERC20 BALANCES") + tc.t.Log(tc.tokenBalanceHistory) + tc.t.Log("=========================================") + } } -func (tc TestClient) GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) { - tc.t.Log("GetBaseFeeFromBloc") - return "", nil -} - -func (tc TestClient) NetworkID() uint64 { - return 1 -} - -func (tc TestClient) ToBigInt() *big.Int { - tc.t.Log("ToBigInt") +func (tc TestClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error { + if tc.traceAPICalls { + tc.t.Log("CallContext") + } return nil } -type findBlockCase struct { - balanceChanges [][]int - fromBlock int64 - toBlock int64 - expectedBlocksFound int +func (tc TestClient) GetWalletNotifier() func(chainId uint64, message string) { + if tc.traceAPICalls { + tc.t.Log("GetWalletNotifier") + } + return nil } -var findBlocksCommandCases = []findBlockCase{ - { +func (tc TestClient) SetWalletNotifier(notifier func(chainId uint64, message string)) { + if tc.traceAPICalls { + tc.t.Log("SetWalletNotifier") + } +} + +func (tc TestClient) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) { + if tc.traceAPICalls { + tc.t.Log("EstimateGas") + } + return 0, nil +} + +func (tc TestClient) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) { + if tc.traceAPICalls { + tc.t.Log("PendingCodeAt") + } + + return nil, nil +} + +func (tc TestClient) PendingCallContract(ctx context.Context, call ethereum.CallMsg) ([]byte, error) { + if tc.traceAPICalls { + tc.t.Log("PendingCallContract") + } + + return nil, nil +} + +func (tc TestClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { + if tc.traceAPICalls { + tc.t.Log("PendingNonceAt") + } + + return 0, nil +} + +func (tc TestClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { + if tc.traceAPICalls { + tc.t.Log("SuggestGasPrice") + } + + return nil, nil +} + +func (tc TestClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { + if tc.traceAPICalls { + tc.t.Log("SendTransaction") + } + + return nil +} + +func (tc TestClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { + if tc.traceAPICalls { + tc.t.Log("SuggestGasTipCap") + } + + return nil, nil +} + +func (tc TestClient) BatchCallContextIgnoringLocalHandlers(ctx context.Context, b []rpc.BatchElem) error { + if tc.traceAPICalls { + tc.t.Log("BatchCallContextIgnoringLocalHandlers") + } + + return nil +} + +func (tc TestClient) CallContextIgnoringLocalHandlers(ctx context.Context, result interface{}, method string, args ...interface{}) error { + if tc.traceAPICalls { + tc.t.Log("CallContextIgnoringLocalHandlers") + } + + return nil +} + +func (tc TestClient) CallRaw(data string) string { + if tc.traceAPICalls { + tc.t.Log("CallRaw") + } + + return "" +} + +func (tc TestClient) GetChainID() *big.Int { + return big.NewInt(1) +} + +func (tc TestClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { + if tc.traceAPICalls { + tc.t.Log("SubscribeFilterLogs") + } + + return nil, nil +} + +func (tc TestClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + if tc.traceAPICalls { + tc.t.Log("TransactionReceipt") + } + + return nil, nil +} + +func (tc TestClient) TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) { + if tc.traceAPICalls { + tc.t.Log("TransactionByHash") + } + + return nil, false, nil +} + +func (tc TestClient) BlockNumber(ctx context.Context) (uint64, error) { + if tc.traceAPICalls { + tc.t.Log("BlockNumber") + } + + return 0, nil +} +func (tc TestClient) SetIsConnected(value bool) { + if tc.traceAPICalls { + tc.t.Log("SetIsConnected") + } +} + +func (tc TestClient) GetIsConnected() bool { + if tc.traceAPICalls { + tc.t.Log("GetIsConnected") + } + + return true +} + +type testERC20Transfer struct { + block *big.Int + address common.Address + amount *big.Int +} + +type findBlockCase struct { + balanceChanges [][]int + ERC20BalanceChanges [][]int + fromBlock int64 + toBlock int64 + rangeSize int + expectedBlocksFound int + outgoingERC20Transfers []testERC20Transfer + incomingERC20Transfers []testERC20Transfer + label string +} + +func transferInEachBlock() [][]int { + res := [][]int{} + + for i := 1; i < 101; i++ { + res = append(res, []int{i, i, i}) + } + + return res +} + +func getCases() []findBlockCase { + cases := []findBlockCase{} + case1 := findBlockCase{ balanceChanges: [][]int{ {5, 1, 0}, {20, 2, 0}, @@ -149,19 +499,102 @@ var findBlocksCommandCases = []findBlockCase{ {46, 50, 0}, {75, 0, 1}, }, + outgoingERC20Transfers: []testERC20Transfer{ + {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + }, + toBlock: 100, + expectedBlocksFound: 6, + } + + case100transfers := findBlockCase{ + balanceChanges: transferInEachBlock(), + toBlock: 100, + expectedBlocksFound: 100, + } + + case3 := findBlockCase{ + balanceChanges: [][]int{ + {1, 1, 1}, + {2, 2, 2}, + {45, 1, 1}, + {46, 50, 0}, + {75, 0, 1}, + }, toBlock: 100, expectedBlocksFound: 5, - }, - { + } + case4 := findBlockCase{ + balanceChanges: [][]int{ + {20, 1, 0}, + }, + toBlock: 100, + fromBlock: 10, + expectedBlocksFound: 1, + label: "single block", + } + + case5 := findBlockCase{ balanceChanges: [][]int{}, toBlock: 100, + fromBlock: 20, expectedBlocksFound: 0, - }, + } + + case6 := findBlockCase{ + balanceChanges: [][]int{ + {20, 1, 0}, + {45, 1, 1}, + }, + toBlock: 100, + fromBlock: 30, + expectedBlocksFound: 1, + rangeSize: 20, + label: "single block in range", + } + + case7emptyHistoryWithOneERC20Transfer := findBlockCase{ + balanceChanges: [][]int{}, + toBlock: 100, + rangeSize: 20, + expectedBlocksFound: 1, + incomingERC20Transfers: []testERC20Transfer{ + {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + }, + } + + case8emptyHistoryWithERC20Transfers := findBlockCase{ + balanceChanges: [][]int{}, + toBlock: 100, + rangeSize: 20, + expectedBlocksFound: 2, + incomingERC20Transfers: []testERC20Transfer{ + // edge case when a regular scan will find transfer at 80, + // but erc20 tail scan should only find transfer at block 6 + {big.NewInt(80), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + }, + } + + cases = append(cases, case1) + cases = append(cases, case100transfers) + cases = append(cases, case3) + cases = append(cases, case4) + cases = append(cases, case5) + + cases = append(cases, case6) + cases = append(cases, case7emptyHistoryWithOneERC20Transfer) + cases = append(cases, case8emptyHistoryWithERC20Transfers) + + //cases = append([]findBlockCase{}, case8emptyHistoryWithERC20Transfers) + + return cases } -func TestFindBlocksCommand(t *testing.T) { - for _, testCase := range findBlocksCommandCases { +var tokenTXXAddress = common.HexToAddress("0x53211") +func TestFindBlocksCommand(t *testing.T) { + for idx, testCase := range getCases() { + t.Log("case #", idx) ctx := context.Background() group := async.NewGroup(ctx) @@ -171,32 +604,71 @@ func TestFindBlocksCommand(t *testing.T) { wdb := NewDB(db) tc := &TestClient{ - t: t, - balances: testCase.balanceChanges, + t: t, + balances: testCase.balanceChanges, + outgoingERC20Transfers: testCase.outgoingERC20Transfers, + incomingERC20Transfers: testCase.incomingERC20Transfers, } + //tc.traceAPICalls = true + //tc.printPreparedData = true tc.prepareBalanceHistory(100) + tc.prepareTokenBalanceHistory(100) blockChannel := make(chan []*DBHeader, 100) + rangeSize := 20 + if testCase.rangeSize != 0 { + rangeSize = testCase.rangeSize + } + client, _ := statusRpc.NewClient(nil, 1, params.UpstreamRPCConfig{Enabled: false, URL: ""}, []params.Network{}, db) + client.SetClient(tc.NetworkID(), tc) + tokenManager := token.NewTokenManager(db, client, network.NewManager(db)) + tokenManager.SetTokens([]*token.Token{ + { + Address: tokenTXXAddress, + Symbol: "TXX", + Decimals: 18, + ChainID: tc.NetworkID(), + Name: "Test Token 1", + Verified: true, + }, + }) fbc := &findBlocksCommand{ - account: common.HexToAddress("0x1234"), - db: wdb, - blockRangeDAO: &BlockRangeSequentialDAO{wdb.client}, - chainClient: tc, - balanceCacher: balance.NewCache(), - feed: &event.Feed{}, - noLimit: false, - fromBlockNumber: big.NewInt(testCase.fromBlock), - toBlockNumber: big.NewInt(testCase.toBlock), - transactionManager: tm, - blocksLoadedCh: blockChannel, + account: common.HexToAddress("0x12345"), + db: wdb, + blockRangeDAO: &BlockRangeSequentialDAO{wdb.client}, + chainClient: tc, + balanceCacher: balance.NewCache(), + feed: &event.Feed{}, + noLimit: false, + fromBlockNumber: big.NewInt(testCase.fromBlock), + toBlockNumber: big.NewInt(testCase.toBlock), + transactionManager: tm, + blocksLoadedCh: blockChannel, + defaultNodeBlockChunkSize: rangeSize, + tokenManager: tokenManager, } group.Add(fbc.Command()) + foundBlocks := []*DBHeader{} select { case <-ctx.Done(): t.Log("ERROR") case <-group.WaitAsync(): close(blockChannel) - require.Equal(t, testCase.expectedBlocksFound, len(<-blockChannel)) + for { + bloks, ok := <-blockChannel + if !ok { + break + } + foundBlocks = append(foundBlocks, bloks...) + } + + numbers := []int64{} + for _, block := range foundBlocks { + numbers = append(numbers, block.Number.Int64()) + } + + sort.Slice(numbers, func(i, j int) bool { return numbers[i] < numbers[j] }) + require.Equal(t, testCase.expectedBlocksFound, len(foundBlocks), testCase.label, "found blocks", numbers) } } } diff --git a/services/wallet/transfer/controller.go b/services/wallet/transfer/controller.go index 39360afa1..10d2eab91 100644 --- a/services/wallet/transfer/controller.go +++ b/services/wallet/transfer/controller.go @@ -114,7 +114,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add // watchAccountsChanges subscribes to a feed and watches for changes in accounts list. If there are new or removed accounts // reactor will be restarted. func watchAccountsChanges(ctx context.Context, accountFeed *event.Feed, reactor *Reactor, - chainClients map[uint64]*chain.ClientWithFallback, initial []common.Address, loadAllTransfers bool) error { + chainClients map[uint64]chain.ClientInterface, initial []common.Address, loadAllTransfers bool) error { ch := make(chan accountsevent.Event, 1) // it may block if the rate of updates will be significantly higher sub := accountFeed.Subscribe(ch) diff --git a/services/wallet/transfer/downloader.go b/services/wallet/transfer/downloader.go index abe27fecd..d6ff904da 100644 --- a/services/wallet/transfer/downloader.go +++ b/services/wallet/transfer/downloader.go @@ -74,7 +74,7 @@ type Transfer struct { // ETHDownloader downloads regular eth transfers. type ETHDownloader struct { - chainClient *chain.ClientWithFallback + chainClient chain.ClientInterface accounts []common.Address signer types.Signer db *Database @@ -95,7 +95,7 @@ func (d *ETHDownloader) GetTransfersByNumber(ctx context.Context, number *big.In } // Only used by status-mobile -func getTransferByHash(ctx context.Context, client *chain.ClientWithFallback, signer types.Signer, address common.Address, hash common.Hash) (*Transfer, error) { +func getTransferByHash(ctx context.Context, client chain.ClientInterface, signer types.Signer, address common.Address, hash common.Hash) (*Transfer, error) { transaction, _, err := client.TransactionByHash(ctx, hash) if err != nil { return nil, err @@ -143,7 +143,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc for _, address := range accounts { // During block discovery, we should have populated the DB with 1 item per Transaction containing // erc20/erc721 transfers - transactionsToLoad, err := d.db.GetTransactionsToLoad(d.chainClient.ChainID, address, blk.Number()) + transactionsToLoad, err := d.db.GetTransactionsToLoad(d.chainClient.NetworkID(), address, blk.Number()) if err != nil { return nil, err } @@ -162,7 +162,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc for _, tx := range blk.Transactions() { if tx.ChainId().Cmp(big.NewInt(0)) != 0 && tx.ChainId().Cmp(d.chainClient.ToBigInt()) != 0 { - log.Info("chain id mismatch", "tx hash", tx.Hash(), "tx chain id", tx.ChainId(), "expected chain id", d.chainClient.ChainID) + log.Info("chain id mismatch", "tx hash", tx.Hash(), "tx chain id", tx.ChainId(), "expected chain id", d.chainClient.NetworkID()) continue } from, err := types.Sender(d.signer, tx) diff --git a/services/wallet/transfer/iterative.go b/services/wallet/transfer/iterative.go index 2fd54c76b..8b2c14dee 100644 --- a/services/wallet/transfer/iterative.go +++ b/services/wallet/transfer/iterative.go @@ -18,7 +18,7 @@ func SetupIterativeDownloader( return nil, errors.New("to or from cannot be nil") } - log.Info("iterative downloader", "address", address, "from", from, "to", to, "size", size) + log.Debug("iterative downloader", "address", address, "from", from, "to", to, "size", size) d := &IterativeDownloader{ client: client, batchSize: size, @@ -65,7 +65,7 @@ func (d *IterativeDownloader) Next(parent context.Context) ([]*DBHeader, *big.In from = d.from } headers, err := d.downloader.GetHeadersInRange(parent, from, to) - log.Info("load erc20 transfers in range", "from", from, "to", to, "batchSize", d.batchSize) + log.Debug("load erc20 transfers in range", "from", from, "to", to, "batchSize", d.batchSize) if err != nil { log.Error("failed to get transfer in between two blocks", "from", from, "to", to, "error", err) return nil, nil, nil, err diff --git a/services/wallet/transfer/reactor.go b/services/wallet/transfer/reactor.go index 23a136737..82d8a8046 100644 --- a/services/wallet/transfer/reactor.go +++ b/services/wallet/transfer/reactor.go @@ -57,7 +57,7 @@ func NewOnDemandFetchStrategy( transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, - chainClients map[uint64]*chain.ClientWithFallback, + chainClients map[uint64]chain.ClientInterface, accounts []common.Address, balanceCacher balance.Cacher, ) *OnDemandFetchStrategy { @@ -86,11 +86,11 @@ type OnDemandFetchStrategy struct { transactionManager *TransactionManager pendingTxManager *transactions.PendingTxTracker tokenManager *token.Manager - chainClients map[uint64]*chain.ClientWithFallback + chainClients map[uint64]chain.ClientInterface accounts []common.Address } -func (s *OnDemandFetchStrategy) newControlCommand(chainClient *chain.ClientWithFallback, accounts []common.Address) *controlCommand { +func (s *OnDemandFetchStrategy) newControlCommand(chainClient chain.ClientInterface, accounts []common.Address) *controlCommand { signer := types.LatestSignerForChainID(chainClient.ToBigInt()) ctl := &controlCommand{ db: s.db, @@ -277,7 +277,7 @@ func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *Transact } // Start runs reactor loop in background. -func (r *Reactor) start(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address, +func (r *Reactor) start(chainClients map[uint64]chain.ClientInterface, accounts []common.Address, loadAllTransfers bool) error { r.strategy = r.createFetchStrategy(chainClients, accounts, loadAllTransfers) @@ -291,14 +291,14 @@ func (r *Reactor) stop() { } } -func (r *Reactor) restart(chainClients map[uint64]*chain.ClientWithFallback, accounts []common.Address, +func (r *Reactor) restart(chainClients map[uint64]chain.ClientInterface, accounts []common.Address, loadAllTransfers bool) error { r.stop() return r.start(chainClients, accounts, loadAllTransfers) } -func (r *Reactor) createFetchStrategy(chainClients map[uint64]*chain.ClientWithFallback, +func (r *Reactor) createFetchStrategy(chainClients map[uint64]chain.ClientInterface, accounts []common.Address, loadAllTransfers bool) HistoryFetcher { if loadAllTransfers { @@ -328,9 +328,9 @@ func (r *Reactor) getTransfersByAddress(ctx context.Context, chainID uint64, add return nil, errors.New(ReactorNotStarted) } -func getChainClientByID(clients map[uint64]*chain.ClientWithFallback, id uint64) (*chain.ClientWithFallback, error) { +func getChainClientByID(clients map[uint64]chain.ClientInterface, id uint64) (chain.ClientInterface, error) { for _, client := range clients { - if client.ChainID == id { + if client.NetworkID() == id { return client, nil } } diff --git a/services/wallet/transfer/sequential_fetch_strategy.go b/services/wallet/transfer/sequential_fetch_strategy.go index bacde2bd3..1649c029c 100644 --- a/services/wallet/transfer/sequential_fetch_strategy.go +++ b/services/wallet/transfer/sequential_fetch_strategy.go @@ -19,7 +19,7 @@ import ( func NewSequentialFetchStrategy(db *Database, blockDAO *BlockDAO, feed *event.Feed, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, - chainClients map[uint64]*chain.ClientWithFallback, + chainClients map[uint64]chain.ClientInterface, accounts []common.Address, balanceCacher balance.Cacher, ) *SequentialFetchStrategy { @@ -46,12 +46,12 @@ type SequentialFetchStrategy struct { transactionManager *TransactionManager pendingTxManager *transactions.PendingTxTracker tokenManager *token.Manager - chainClients map[uint64]*chain.ClientWithFallback + chainClients map[uint64]chain.ClientInterface accounts []common.Address balanceCacher balance.Cacher } -func (s *SequentialFetchStrategy) newCommand(chainClient *chain.ClientWithFallback, +func (s *SequentialFetchStrategy) newCommand(chainClient chain.ClientInterface, account common.Address) async.Commander { return newLoadBlocksAndTransfersCommand(account, s.db, s.blockDAO, chainClient, s.feed, diff --git a/services/wallet/transfer/swap_identifier.go b/services/wallet/transfer/swap_identifier.go index c05167387..4232736b3 100644 --- a/services/wallet/transfer/swap_identifier.go +++ b/services/wallet/transfer/swap_identifier.go @@ -20,7 +20,7 @@ import ( const ETHSymbol string = "ETH" const WETHSymbol string = "WETH" -func fetchUniswapV2PairInfo(ctx context.Context, client *chain.ClientWithFallback, pairAddress common.Address) (*common.Address, *common.Address, error) { +func fetchUniswapV2PairInfo(ctx context.Context, client chain.ClientInterface, pairAddress common.Address) (*common.Address, *common.Address, error) { caller, err := uniswapv2.NewUniswapv2Caller(pairAddress, client) if err != nil { return nil, nil, err @@ -43,7 +43,7 @@ func fetchUniswapV2PairInfo(ctx context.Context, client *chain.ClientWithFallbac return &token0Address, &token1Address, nil } -func fetchUniswapV3PoolInfo(ctx context.Context, client *chain.ClientWithFallback, poolAddress common.Address) (*common.Address, *common.Address, error) { +func fetchUniswapV3PoolInfo(ctx context.Context, client chain.ClientInterface, poolAddress common.Address) (*common.Address, *common.Address, error) { caller, err := uniswapv3.NewUniswapv3Caller(poolAddress, client) if err != nil { return nil, nil, err @@ -90,7 +90,7 @@ func identifyUniswapV2Asset(tokenManager *token.Manager, chainID uint64, amount0 return } -func fetchUniswapV2Info(ctx context.Context, client *chain.ClientWithFallback, tokenManager *token.Manager, log *types.Log) (fromAsset string, fromAmount *hexutil.Big, toAsset string, toAmount *hexutil.Big, err error) { +func fetchUniswapV2Info(ctx context.Context, client chain.ClientInterface, tokenManager *token.Manager, log *types.Log) (fromAsset string, fromAmount *hexutil.Big, toAsset string, toAmount *hexutil.Big, err error) { pairAddress, _, _, amount0In, amount1In, amount0Out, amount1Out, err := w_common.ParseUniswapV2Log(log) if err != nil { return @@ -101,7 +101,7 @@ func fetchUniswapV2Info(ctx context.Context, client *chain.ClientWithFallback, t return } - fromToken, fromAmountInt, err := identifyUniswapV2Asset(tokenManager, client.ChainID, amount0In, *token0ContractAddress, amount1In, *token1ContractAddress) + fromToken, fromAmountInt, err := identifyUniswapV2Asset(tokenManager, client.NetworkID(), amount0In, *token0ContractAddress, amount1In, *token1ContractAddress) if err != nil { // "Soft" error, allow to continue with unknown asset fromAsset = "" @@ -111,7 +111,7 @@ func fetchUniswapV2Info(ctx context.Context, client *chain.ClientWithFallback, t fromAmount = (*hexutil.Big)(fromAmountInt) } - toToken, toAmountInt, err := identifyUniswapV2Asset(tokenManager, client.ChainID, amount0Out, *token0ContractAddress, amount1Out, *token1ContractAddress) + toToken, toAmountInt, err := identifyUniswapV2Asset(tokenManager, client.NetworkID(), amount0Out, *token0ContractAddress, amount1Out, *token1ContractAddress) if err != nil { // "Soft" error, allow to continue with unknown asset toAsset = "" @@ -159,7 +159,7 @@ func identifyUniswapV3Assets(tokenManager *token.Manager, chainID uint64, amount return } -func fetchUniswapV3Info(ctx context.Context, client *chain.ClientWithFallback, tokenManager *token.Manager, log *types.Log) (fromAsset string, fromAmount *hexutil.Big, toAsset string, toAmount *hexutil.Big, err error) { +func fetchUniswapV3Info(ctx context.Context, client chain.ClientInterface, tokenManager *token.Manager, log *types.Log) (fromAsset string, fromAmount *hexutil.Big, toAsset string, toAmount *hexutil.Big, err error) { poolAddress, _, _, amount0, amount1, err := w_common.ParseUniswapV3Log(log) if err != nil { return @@ -170,7 +170,7 @@ func fetchUniswapV3Info(ctx context.Context, client *chain.ClientWithFallback, t return } - fromToken, fromAmountInt, toToken, toAmountInt, err := identifyUniswapV3Assets(tokenManager, client.ChainID, amount0, *token0ContractAddress, amount1, *token1ContractAddress) + fromToken, fromAmountInt, toToken, toAmountInt, err := identifyUniswapV3Assets(tokenManager, client.NetworkID(), amount0, *token0ContractAddress, amount1, *token1ContractAddress) if err != nil { // "Soft" error, allow to continue with unknown asset err = nil @@ -188,7 +188,7 @@ func fetchUniswapV3Info(ctx context.Context, client *chain.ClientWithFallback, t return } -func fetchUniswapInfo(ctx context.Context, client *chain.ClientWithFallback, tokenManager *token.Manager, log *types.Log, logType w_common.EventType) (fromAsset string, fromAmount *hexutil.Big, toAsset string, toAmount *hexutil.Big, err error) { +func fetchUniswapInfo(ctx context.Context, client chain.ClientInterface, tokenManager *token.Manager, log *types.Log, logType w_common.EventType) (fromAsset string, fromAmount *hexutil.Big, toAsset string, toAmount *hexutil.Big, err error) { switch logType { case w_common.UniswapV2SwapEventType: return fetchUniswapV2Info(ctx, client, tokenManager, log) @@ -201,7 +201,7 @@ func fetchUniswapInfo(ctx context.Context, client *chain.ClientWithFallback, tok // Build a Swap multitransaction from a list containing one or several uniswapV2/uniswapV3 subTxs // We only care about the first and last swap to identify the input/output token and amounts -func buildUniswapSwapMultitransaction(ctx context.Context, client *chain.ClientWithFallback, tokenManager *token.Manager, transfer *Transfer) (*MultiTransaction, error) { +func buildUniswapSwapMultitransaction(ctx context.Context, client chain.ClientInterface, tokenManager *token.Manager, transfer *Transfer) (*MultiTransaction, error) { multiTransaction := MultiTransaction{ Type: MultiTransactionSwap, FromNetworkID: transfer.NetworkID,