package transfer import ( "context" "errors" "math/big" "time" "go.uber.org/zap" "golang.org/x/exp/slices" // since 1.21, this is in the standard library "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/status-im/status-go/logutils" "github.com/status-im/status-go/rpc/chain" w_common "github.com/status-im/status-go/services/wallet/common" ) var ( zero = big.NewInt(0) one = big.NewInt(1) two = big.NewInt(2) ) // Partial transaction info obtained by ERC20Downloader. // A PreloadedTransaction represents a Transaction which contains one // ERC20/ERC721/ERC1155 transfer event. // To be converted into one Transfer object post-indexing. type PreloadedTransaction struct { Type w_common.Type `json:"type"` ID common.Hash `json:"-"` Address common.Address `json:"address"` // Log that was used to generate preloaded transaction. Log *types.Log `json:"log"` TokenID *big.Int `json:"tokenId"` Value *big.Int `json:"value"` } // Transfer stores information about transfer. // A Transfer represents a plain ETH transfer or some token activity inside a Transaction // Since ERC1155 transfers can contain multiple tokens, a single Transfer represents a single token transfer, // that means ERC1155 batch transfers will be represented by multiple Transfer objects. type Transfer struct { Type w_common.Type `json:"type"` ID common.Hash `json:"-"` Address common.Address `json:"address"` BlockNumber *big.Int `json:"blockNumber"` BlockHash common.Hash `json:"blockhash"` Timestamp uint64 `json:"timestamp"` Transaction *types.Transaction `json:"transaction"` Loaded bool NetworkID uint64 // From is derived from tx signature in order to offload this computation from UI component. From common.Address `json:"from"` Receipt *types.Receipt `json:"receipt"` // Log that was used to generate erc20 transfer. Nil for eth transfer. Log *types.Log `json:"log"` // TokenID is the id of the transferred token. Nil for eth transfer. TokenID *big.Int `json:"tokenId"` // TokenValue is the value of the token transfer. Nil for eth transfer. TokenValue *big.Int `json:"tokenValue"` BaseGasFees string // Internal field that is used to track multi-transaction transfers. MultiTransactionID w_common.MultiTransactionIDType `json:"multi_transaction_id"` } // ETHDownloader downloads regular eth transfers and tokens transfers. type ETHDownloader struct { chainClient chain.ClientInterface accounts []common.Address signer types.Signer db *Database } var errLogsDownloaderStuck = errors.New("logs downloader stuck") func (d *ETHDownloader) GetTransfersByNumber(ctx context.Context, number *big.Int) ([]Transfer, error) { blk, err := d.chainClient.BlockByNumber(ctx, number) if err != nil { return nil, err } rst, err := d.getTransfersInBlock(ctx, blk, d.accounts) if err != nil { return nil, err } return rst, err } // Only used by status-mobile func getTransferByHash(ctx context.Context, client chain.ClientInterface, signer types.Signer, address common.Address, hash common.Hash) (*Transfer, error) { transaction, _, err := client.TransactionByHash(ctx, hash) if err != nil { return nil, err } receipt, err := client.TransactionReceipt(ctx, hash) if err != nil { return nil, err } eventType, transactionLog := w_common.GetFirstEvent(receipt.Logs) transactionType := w_common.EventTypeToSubtransactionType(eventType) from, err := types.Sender(signer, transaction) if err != nil { return nil, err } baseGasFee, err := client.GetBaseFeeFromBlock(ctx, big.NewInt(int64(transactionLog.BlockNumber))) if err != nil { return nil, err } transfer := &Transfer{ Type: transactionType, ID: hash, Address: address, BlockNumber: receipt.BlockNumber, BlockHash: receipt.BlockHash, Timestamp: uint64(time.Now().Unix()), Transaction: transaction, From: from, Receipt: receipt, Log: transactionLog, BaseGasFees: baseGasFee, } return transfer, nil } func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) ([]Transfer, error) { startTs := time.Now() rst := make([]Transfer, 0, len(blk.Transactions())) receiptsByAddressAndTxHash := make(map[common.Address]map[common.Hash]*types.Receipt) txsByAddressAndTxHash := make(map[common.Address]map[common.Hash]*types.Transaction) addReceiptToCache := func(address common.Address, txHash common.Hash, receipt *types.Receipt) { if receiptsByAddressAndTxHash[address] == nil { receiptsByAddressAndTxHash[address] = make(map[common.Hash]*types.Receipt) } receiptsByAddressAndTxHash[address][txHash] = receipt } addTxToCache := func(address common.Address, txHash common.Hash, tx *types.Transaction) { if txsByAddressAndTxHash[address] == nil { txsByAddressAndTxHash[address] = make(map[common.Hash]*types.Transaction) } txsByAddressAndTxHash[address][txHash] = tx } getReceiptFromCache := func(address common.Address, txHash common.Hash) *types.Receipt { if receiptsByAddressAndTxHash[address] == nil { return nil } return receiptsByAddressAndTxHash[address][txHash] } getTxFromCache := func(address common.Address, txHash common.Hash) *types.Transaction { if txsByAddressAndTxHash[address] == nil { return nil } return txsByAddressAndTxHash[address][txHash] } getReceipt := func(address common.Address, txHash common.Hash) (receipt *types.Receipt, err error) { receipt = getReceiptFromCache(address, txHash) if receipt == nil { receipt, err = d.fetchTransactionReceipt(ctx, txHash) if err != nil { return nil, err } addReceiptToCache(address, txHash, receipt) } return receipt, nil } getTx := func(address common.Address, txHash common.Hash) (tx *types.Transaction, err error) { tx = getTxFromCache(address, txHash) if tx == nil { tx, err = d.fetchTransaction(ctx, txHash) if err != nil { return nil, err } addTxToCache(address, txHash, tx) } return tx, nil } for _, address := range accounts { // During block discovery, we should have populated the DB with 1 item per transfer log containing // erc20/erc721/erc1155 transfers. // ID is a hash of the tx hash and the log index. log_index is unique per ERC20/721 tx, but not per ERC1155 tx. transactionsToLoad, err := d.db.GetTransactionsToLoad(d.chainClient.NetworkID(), address, blk.Number()) if err != nil { return nil, err } areSubTxsCheckedForTxHash := make(map[common.Hash]bool) logutils.ZapLogger().Debug("getTransfersInBlock", zap.Stringer("block", blk.Number()), zap.Int("transactionsToLoad", len(transactionsToLoad)), ) for _, t := range transactionsToLoad { receipt, err := getReceipt(address, t.Log.TxHash) if err != nil { return nil, err } tx, err := getTx(address, t.Log.TxHash) if err != nil { return nil, err } subtransactions, err := d.subTransactionsFromPreloaded(t, tx, receipt, blk) if err != nil { logutils.ZapLogger().Error("can't fetch subTxs for erc20/erc721/erc1155 transfer", zap.Error(err)) return nil, err } rst = append(rst, subtransactions...) areSubTxsCheckedForTxHash[t.Log.TxHash] = true } for _, tx := range blk.Transactions() { // Skip dummy blob transactions, as they are not supported by us if tx.Type() == types.BlobTxType { continue } if tx.ChainId().Cmp(big.NewInt(0)) != 0 && tx.ChainId().Cmp(d.chainClient.ToBigInt()) != 0 { logutils.ZapLogger().Info("chain id mismatch", zap.Stringer("tx hash", tx.Hash()), zap.Stringer("tx chain id", tx.ChainId()), zap.Uint64("expected chain id", d.chainClient.NetworkID()), ) continue } from, err := types.Sender(d.signer, tx) if err != nil { if err == core.ErrTxTypeNotSupported { logutils.ZapLogger().Error("Tx Type not supported", zap.Stringer("tx chain id", tx.ChainId()), zap.Uint8("type", tx.Type()), zap.Error(err), ) continue } return nil, err } isPlainTransfer := from == address || (tx.To() != nil && *tx.To() == address) mustCheckSubTxs := false if !isPlainTransfer { // We might miss some subTransactions of interest for some transaction types. We need to check if we // find the address in the transaction data. switch tx.Type() { case types.DynamicFeeTxType, types.OptimismDepositTxType, types.ArbitrumDepositTxType, types.ArbitrumRetryTxType: mustCheckSubTxs = !areSubTxsCheckedForTxHash[tx.Hash()] && w_common.TxDataContainsAddress(tx.Type(), tx.Data(), address) } } if isPlainTransfer || mustCheckSubTxs { receipt, err := getReceipt(address, tx.Hash()) if err != nil { return nil, err } // Since we've already got the receipt, check for subTxs of // interest in case we haven't already. if !areSubTxsCheckedForTxHash[tx.Hash()] { subtransactions, err := d.subTransactionsFromTransactionData(address, from, tx, receipt, blk) if err != nil { logutils.ZapLogger().Error("can't fetch subTxs for eth transfer", zap.Error(err)) return nil, err } rst = append(rst, subtransactions...) areSubTxsCheckedForTxHash[tx.Hash()] = true } // If it's a plain ETH transfer, add it to the list if isPlainTransfer { rst = append(rst, Transfer{ Type: w_common.EthTransfer, NetworkID: tx.ChainId().Uint64(), ID: tx.Hash(), Address: address, BlockNumber: blk.Number(), BlockHash: receipt.BlockHash, Timestamp: blk.Time(), Transaction: tx, From: from, Receipt: receipt, Log: nil, BaseGasFees: blk.BaseFee().String(), MultiTransactionID: w_common.NoMultiTransactionID}) } } } } logutils.ZapLogger().Debug("getTransfersInBlock found", zap.Stringer("block", blk.Number()), zap.Int("len", len(rst)), zap.Duration("time", time.Since(startTs)), ) // TODO(dshulyak) test that balance difference was covered by transactions return rst, nil } // NewERC20TransfersDownloader returns new instance. func NewERC20TransfersDownloader(client chain.ClientInterface, accounts []common.Address, signer types.Signer, incomingOnly bool) *ERC20TransfersDownloader { signature := w_common.GetEventSignatureHash(w_common.Erc20_721TransferEventSignature) return &ERC20TransfersDownloader{ client: client, accounts: accounts, signature: signature, incomingOnly: incomingOnly, signatureErc1155Single: w_common.GetEventSignatureHash(w_common.Erc1155TransferSingleEventSignature), signatureErc1155Batch: w_common.GetEventSignatureHash(w_common.Erc1155TransferBatchEventSignature), signer: signer, } } // ERC20TransfersDownloader is a downloader for erc20 and erc721 tokens transfers. // Since both transaction types share the same signature, both will be assigned // type Erc20Transfer. Until the downloader gets refactored and a migration of the // database gets implemented, differentiation between erc20 and erc721 will handled // in the controller. type ERC20TransfersDownloader struct { client chain.ClientInterface accounts []common.Address incomingOnly bool // hash of the Transfer event signature signature common.Hash signatureErc1155Single common.Hash signatureErc1155Batch common.Hash // signer is used to derive tx sender from tx signature signer types.Signer } func topicFromAddressSlice(addresses []common.Address) []common.Hash { rst := make([]common.Hash, len(addresses)) for i, address := range addresses { rst[i] = common.BytesToHash(address.Bytes()) } return rst } func (d *ERC20TransfersDownloader) inboundTopics(addresses []common.Address) [][]common.Hash { return [][]common.Hash{{d.signature}, {}, topicFromAddressSlice(addresses)} } func (d *ERC20TransfersDownloader) outboundTopics(addresses []common.Address) [][]common.Hash { return [][]common.Hash{{d.signature}, topicFromAddressSlice(addresses), {}} } func (d *ERC20TransfersDownloader) inboundERC20OutboundERC1155Topics(addresses []common.Address) [][]common.Hash { return [][]common.Hash{{d.signature, d.signatureErc1155Single, d.signatureErc1155Batch}, {}, topicFromAddressSlice(addresses)} } func (d *ERC20TransfersDownloader) inboundTopicsERC1155(addresses []common.Address) [][]common.Hash { return [][]common.Hash{{d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {}, topicFromAddressSlice(addresses)} } func (d *ETHDownloader) fetchTransactionReceipt(parent context.Context, txHash common.Hash) (*types.Receipt, error) { ctx, cancel := context.WithTimeout(parent, 3*time.Second) receipt, err := d.chainClient.TransactionReceipt(ctx, txHash) cancel() if err != nil { return nil, err } return receipt, nil } func (d *ETHDownloader) fetchTransaction(parent context.Context, txHash common.Hash) (*types.Transaction, error) { ctx, cancel := context.WithTimeout(parent, 3*time.Second) tx, _, err := d.chainClient.TransactionByHash(ctx, txHash) // TODO Save on requests by checking in the DB first cancel() if err != nil { return nil, err } return tx, nil } func (d *ETHDownloader) subTransactionsFromPreloaded(preloadedTx *PreloadedTransaction, tx *types.Transaction, receipt *types.Receipt, blk *types.Block) ([]Transfer, error) { logutils.ZapLogger().Debug("subTransactionsFromPreloaded start", zap.Stringer("txHash", tx.Hash()), zap.Stringer("address", preloadedTx.Address), zap.Stringer("tokenID", preloadedTx.TokenID), zap.Stringer("value", preloadedTx.Value), ) address := preloadedTx.Address txLog := preloadedTx.Log rst := make([]Transfer, 0, 1) from, err := types.Sender(d.signer, tx) if err != nil { if err == core.ErrTxTypeNotSupported { return nil, nil } return nil, err } eventType := w_common.GetEventType(preloadedTx.Log) // Only add ERC20/ERC721/ERC1155 transfers from/to the given account // from/to matching is already handled by getLogs filter switch eventType { case w_common.Erc20TransferEventType, w_common.Erc721TransferEventType, w_common.Erc1155TransferSingleEventType, w_common.Erc1155TransferBatchEventType: logutils.ZapLogger().Debug("subTransactionsFromPreloaded transfer", zap.String("eventType", string(eventType)), zap.Uint("logIdx", txLog.Index), zap.Stringer("txHash", tx.Hash()), zap.Stringer("address", address), zap.Stringer("tokenID", preloadedTx.TokenID), zap.Stringer("value", preloadedTx.Value), zap.Stringer("baseFee", blk.BaseFee()), ) transfer := Transfer{ Type: w_common.EventTypeToSubtransactionType(eventType), ID: preloadedTx.ID, Address: address, BlockNumber: new(big.Int).SetUint64(txLog.BlockNumber), BlockHash: txLog.BlockHash, Loaded: true, NetworkID: d.signer.ChainID().Uint64(), From: from, Log: txLog, TokenID: preloadedTx.TokenID, TokenValue: preloadedTx.Value, BaseGasFees: blk.BaseFee().String(), Transaction: tx, Receipt: receipt, Timestamp: blk.Time(), MultiTransactionID: w_common.NoMultiTransactionID, } rst = append(rst, transfer) } logutils.ZapLogger().Debug("subTransactionsFromPreloaded end", zap.Stringer("txHash", tx.Hash()), zap.Stringer("address", address), zap.Stringer("tokenID", preloadedTx.TokenID), zap.Stringer("value", preloadedTx.Value), ) return rst, nil } func (d *ETHDownloader) subTransactionsFromTransactionData(address, from common.Address, tx *types.Transaction, receipt *types.Receipt, blk *types.Block) ([]Transfer, error) { logutils.ZapLogger().Debug("subTransactionsFromTransactionData start", zap.Stringer("txHash", tx.Hash()), zap.Stringer("address", address), ) rst := make([]Transfer, 0, 1) for _, txLog := range receipt.Logs { eventType := w_common.GetEventType(txLog) switch eventType { case w_common.UniswapV2SwapEventType, w_common.UniswapV3SwapEventType, w_common.HopBridgeTransferSentToL2EventType, w_common.HopBridgeTransferFromL1CompletedEventType, w_common.HopBridgeWithdrawalBondedEventType, w_common.HopBridgeTransferSentEventType: transfer := Transfer{ Type: w_common.EventTypeToSubtransactionType(eventType), ID: w_common.GetLogSubTxID(*txLog), Address: address, BlockNumber: new(big.Int).SetUint64(txLog.BlockNumber), BlockHash: txLog.BlockHash, Loaded: true, NetworkID: d.signer.ChainID().Uint64(), From: from, Log: txLog, BaseGasFees: blk.BaseFee().String(), Transaction: tx, Receipt: receipt, Timestamp: blk.Time(), MultiTransactionID: w_common.NoMultiTransactionID, } rst = append(rst, transfer) } } logutils.ZapLogger().Debug("subTransactionsFromTransactionData end", zap.Stringer("txHash", tx.Hash()), zap.Stringer("address", address), ) return rst, nil } func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs []types.Log) ([]*DBHeader, error) { concurrent := NewConcurrentDownloader(parent, NoThreadLimit) for i := range logs { l := logs[i] if l.Removed { continue } var address common.Address from, to, txIDs, tokenIDs, values, err := w_common.ParseTransferLog(l) if err != nil { logutils.ZapLogger().Error("failed to parse transfer log", zap.Any("log", l), zap.Stringers("address", d.accounts), zap.Error(err), ) continue } // Double check provider returned the correct log if slices.Contains(d.accounts, from) { address = from } else if slices.Contains(d.accounts, to) { address = to } else { logutils.ZapLogger().Error("from/to address mismatch", zap.Any("log", l), zap.Stringers("addresses", d.accounts), ) continue } eventType := w_common.GetEventType(&l) logType := w_common.EventTypeToSubtransactionType(eventType) for i, txID := range txIDs { logutils.ZapLogger().Debug("block from logs", zap.Uint64("block", l.BlockNumber), zap.Any("log", l), zap.String("logType", string(logType)), zap.Stringer("txID", txID), ) // For ERC20 there is no tokenID, so we use nil var tokenID *big.Int if len(tokenIDs) > i { tokenID = tokenIDs[i] } header := &DBHeader{ Number: big.NewInt(int64(l.BlockNumber)), Hash: l.BlockHash, Address: address, PreloadedTransactions: []*PreloadedTransaction{{ ID: txID, Type: logType, Log: &l, TokenID: tokenID, Value: values[i], }}, Loaded: false, } concurrent.Add(func(ctx context.Context) error { concurrent.PushHeader(header) return nil }) } } select { case <-concurrent.WaitAsync(): case <-parent.Done(): return nil, errLogsDownloaderStuck } return concurrent.GetHeaders(), concurrent.Error() } // GetHeadersInRange returns transfers between two blocks. // time to get logs for 100000 blocks = 1.144686979s. with 249 events in the result set. func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, from, to *big.Int) ([]*DBHeader, error) { start := time.Now() logutils.ZapLogger().Debug("get erc20 transfers in range start", zap.Uint64("chainID", d.client.NetworkID()), zap.Stringer("from", from), zap.Stringer("to", to), zap.Stringers("accounts", d.accounts), ) // TODO #16062: Figure out real root cause of invalid range if from != nil && to != nil && from.Cmp(to) > 0 { logutils.ZapLogger().Error("invalid range", zap.Uint64("chainID", d.client.NetworkID()), zap.Stringer("from", from), zap.Stringer("to", to), zap.Stringers("accounts", d.accounts), ) return nil, errors.New("invalid range") } headers := []*DBHeader{} ctx := context.Background() var err error outbound := []types.Log{} var inboundOrMixed []types.Log // inbound ERC20 or outbound ERC1155 share the same signature for our purposes if !d.incomingOnly { outbound, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{ FromBlock: from, ToBlock: to, Topics: d.outboundTopics(d.accounts), }) if err != nil { return nil, err } inboundOrMixed, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{ FromBlock: from, ToBlock: to, Topics: d.inboundERC20OutboundERC1155Topics(d.accounts), }) if err != nil { return nil, err } } else { inboundOrMixed, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{ FromBlock: from, ToBlock: to, Topics: d.inboundTopics(d.accounts), }) if err != nil { return nil, err } } inbound1155, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ FromBlock: from, ToBlock: to, Topics: d.inboundTopicsERC1155(d.accounts), }) if err != nil { return nil, err } logs := concatLogs(outbound, inboundOrMixed, inbound1155) if len(logs) == 0 { logutils.ZapLogger().Debug("no logs found for account") return nil, nil } rst, err := d.blocksFromLogs(parent, logs) if err != nil { return nil, err } if len(rst) == 0 { logutils.ZapLogger().Warn("no headers found in logs for account", zap.Uint64("chainID", d.client.NetworkID()), zap.Stringers("addresses", d.accounts), zap.Stringer("from", from), zap.Stringer("to", to), ) } else { headers = append(headers, rst...) logutils.ZapLogger().Debug("found erc20 transfers for account", zap.Uint64("chainID", d.client.NetworkID()), zap.Stringers("addresses", d.accounts), zap.Stringer("from", from), zap.Stringer("to", to), zap.Int("headers", len(headers)), ) } logutils.ZapLogger().Debug("get erc20 transfers in range end", zap.Uint64("chainID", d.client.NetworkID()), zap.Stringer("from", from), zap.Stringer("to", to), zap.Int("headers", len(headers)), zap.Stringers("accounts", d.accounts), zap.Duration("took", time.Since(start)), ) return headers, nil } func concatLogs(slices ...[]types.Log) []types.Log { var totalLen int for _, s := range slices { totalLen += len(s) } tmp := make([]types.Log, totalLen) var i int for _, s := range slices { i += copy(tmp[i:], s) } return tmp }