From b6ade53603b4f08bfdcf316f914bee3944537a08 Mon Sep 17 00:00:00 2001 From: Ivan Belyakov Date: Thu, 2 Nov 2023 18:24:23 +0100 Subject: [PATCH] feat(wallet): detect ERC1155 batch transfers. Refactored transfers loading to reduce blockchain RPC requests (getBaseFee, getTransaction, getTransactionReceipt) by reusing preloaded transaction and block fee. Split extraction of subtransaction from logs and from ETH transfer into different methods. Refactored log_parser to extract sender and receiver addresses uniformly for different transfer types. Replaced info logs with debug where needed. closes #4221 --- appdatabase/database.go | 11 +- appdatabase/database_test.go | 15 +- services/wallet/common/log_parser.go | 252 +++++++++----- services/wallet/transfer/commands.go | 40 +-- .../wallet/transfer/commands_sequential.go | 49 +-- .../transfer/commands_sequential_test.go | 90 +++-- services/wallet/transfer/database.go | 25 +- services/wallet/transfer/database_test.go | 4 +- services/wallet/transfer/downloader.go | 323 +++++++++++------- services/wallet/transfer/query.go | 75 +++- 10 files changed, 565 insertions(+), 319 deletions(-) diff --git a/appdatabase/database.go b/appdatabase/database.go index 4b24004f1..ba745797b 100644 --- a/appdatabase/database.go +++ b/appdatabase/database.go @@ -335,7 +335,14 @@ func migrateWalletJSONBlobs(sqlTx *sql.Tx) error { func extractToken(entryType string, tx *types.Transaction, l *types.Log, logValid bool) (correctType w_common.Type, tokenID *big.Int, value *big.Int, tokenAddress *common.Address) { if logValid { - correctType, tokenAddress, tokenID, value, _, _ = w_common.ExtractTokenIdentity(w_common.Type(entryType), l, tx) + correctType, tokenAddress, _, _ = w_common.ExtractTokenTransferData(w_common.Type(entryType), l, tx) + _, _, _, tokenIDs, values, _ := w_common.ParseTransferLog(*l) + if len(tokenIDs) > 0 { + tokenID = tokenIDs[0] + } + if len(values) > 0 { + value = values[0] + } } else { correctType = w_common.Type(entryType) value = new(big.Int).Set(tx.Value()) @@ -394,7 +401,7 @@ func migrateWalletTransferFromToAddresses(sqlTx *sql.Tx) error { if nullableTx.Valid { if nullableL.Valid { - _, tokenAddress, _, _, txFrom, txTo = w_common.ExtractTokenIdentity(w_common.Type(entryType), l, tx) + _, tokenAddress, txFrom, txTo = w_common.ExtractTokenTransferData(w_common.Type(entryType), l, tx) } else { txFrom = &sender txTo = tx.To() diff --git a/appdatabase/database_test.go b/appdatabase/database_test.go index fdd8d6849..bedf8140a 100644 --- a/appdatabase/database_test.go +++ b/appdatabase/database_test.go @@ -297,21 +297,24 @@ func TestMigrateWalletJsonBlobs(t *testing.T) { require.Equal(t, *sqlite.BigIntToPadded128BitsStr(tt.Value()), amount128Hex.String) require.True(t, isTokenIDNull) } else { - actualEntryType, expectedTokenAddress, expectedTokenID, expectedValue, expectedFrom, expectedTo := w_common.ExtractTokenIdentity(expectedEntryType, tl, tt) + actualEntryType, expectedTokenAddress, _, _ := w_common.ExtractTokenTransferData(expectedEntryType, tl, tt) if actualEntryType == w_common.Erc20Transfer { + expectedFrom, expectedTo, expectedValue := w_common.ParseErc20TransferLog(tl) require.True(t, amount128Hex.Valid) require.Equal(t, *sqlite.BigIntToPadded128BitsStr(expectedValue), amount128Hex.String) require.True(t, isTokenIDNull) require.Equal(t, *expectedTokenAddress, *tokenAddress) - require.Equal(t, *expectedFrom, *txFrom) - require.Equal(t, *expectedTo, *txTo) + require.Equal(t, expectedFrom, *txFrom) + require.Equal(t, expectedTo, *txTo) } else if actualEntryType == w_common.Erc721Transfer { - require.False(t, amount128Hex.Valid) + expectedFrom, expectedTo, expectedTokenID := w_common.ParseErc721TransferLog(tl) + require.True(t, amount128Hex.Valid) + require.Equal(t, *sqlite.BigIntToPadded128BitsStr(big.NewInt(1)), amount128Hex.String) require.False(t, isTokenIDNull) require.Equal(t, expectedTokenID, expectedTokenID) require.Equal(t, *expectedTokenAddress, *tokenAddress) - require.Equal(t, *expectedFrom, *txFrom) - require.Equal(t, *expectedTo, *txTo) + require.Equal(t, expectedFrom, *txFrom) + require.Equal(t, expectedTo, *txTo) } else { require.False(t, amount128Hex.Valid) require.True(t, isTokenIDNull) diff --git a/services/wallet/common/log_parser.go b/services/wallet/common/log_parser.go index b6dbc4f12..4a5dcc1a8 100644 --- a/services/wallet/common/log_parser.go +++ b/services/wallet/common/log_parser.go @@ -3,6 +3,7 @@ package common import ( + "encoding/binary" "fmt" "math/big" @@ -20,16 +21,15 @@ type EventType string const ( // Transaction types - EthTransfer Type = "eth" - Erc20Transfer Type = "erc20" - Erc721Transfer Type = "erc721" - Erc1155SingleTransfer Type = "erc1155" - Erc1155BatchTransfer Type = "erc1155" - UniswapV2Swap Type = "uniswapV2Swap" - UniswapV3Swap Type = "uniswapV3Swap" - HopBridgeFrom Type = "HopBridgeFrom" - HopBridgeTo Type = "HopBridgeTo" - unknownTransaction Type = "unknown" + EthTransfer Type = "eth" + Erc20Transfer Type = "erc20" + Erc721Transfer Type = "erc721" + Erc1155Transfer Type = "erc1155" + UniswapV2Swap Type = "uniswapV2Swap" + UniswapV3Swap Type = "uniswapV3Swap" + HopBridgeFrom Type = "HopBridgeFrom" + HopBridgeTo Type = "HopBridgeTo" + unknownTransaction Type = "unknown" // Event types WETHDepositEventType EventType = "wethDepositEvent" @@ -136,10 +136,8 @@ func EventTypeToSubtransactionType(eventType EventType) Type { return Erc20Transfer case Erc721TransferEventType: return Erc721Transfer - case Erc1155TransferSingleEventType: - return Erc1155SingleTransfer - case Erc1155TransferBatchEventType: - return Erc1155BatchTransfer + case Erc1155TransferSingleEventType, Erc1155TransferBatchEventType: + return Erc1155Transfer case UniswapV2SwapEventType: return UniswapV2Swap case UniswapV3SwapEventType: @@ -226,16 +224,13 @@ func ParseErc20TransferLog(ethlog *types.Log) (from, to common.Address, amount * log.Warn("not enough topics for erc20 transfer", "topics", ethlog.Topics) return } - if len(ethlog.Topics[1]) != 32 { - log.Warn("second topic is not padded to 32 byte address", "topic", ethlog.Topics[1]) + var err error + from, to, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseErc20TransferLog", err) return } - if len(ethlog.Topics[2]) != 32 { - log.Warn("third topic is not padded to 32 byte address", "topic", ethlog.Topics[2]) - return - } - copy(from[:], ethlog.Topics[1][12:]) - copy(to[:], ethlog.Topics[2][12:]) + if len(ethlog.Data) != 32 { log.Warn("data is not padded to 32 byts big int", "data", ethlog.Data) return @@ -251,52 +246,150 @@ func ParseErc721TransferLog(ethlog *types.Log) (from, to common.Address, tokenID log.Warn("not enough topics for erc721 transfer", "topics", ethlog.Topics) return } - if len(ethlog.Topics[1]) != 32 { - log.Warn("second topic is not padded to 32 byte address", "topic", ethlog.Topics[1]) + + var err error + from, to, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseErc721TransferLog", err) return } - if len(ethlog.Topics[2]) != 32 { - log.Warn("third topic is not padded to 32 byte address", "topic", ethlog.Topics[2]) - return - } - if len(ethlog.Topics[3]) != 32 { - log.Warn("fourth topic is not 32 byte tokenId", "topic", ethlog.Topics[3]) - return - } - copy(from[:], ethlog.Topics[1][12:]) - copy(to[:], ethlog.Topics[2][12:]) tokenID.SetBytes(ethlog.Topics[3][:]) return } -func ParseErc1155TransferSingleLog(ethlog *types.Log) (operator, from, to common.Address, id, amount *big.Int) { - if len(ethlog.Topics) < erc1155TransferEventIndexedParameters { - log.Warn("not enough topics for erc1155 transfer single", "topics", ethlog.Topics) - return - } +func GetLogSubTxID(log types.Log) common.Hash { + // Get unique ID by using TxHash and log index + index := [4]byte{} + binary.BigEndian.PutUint32(index[:], uint32(log.Index)) + return crypto.Keccak256Hash(log.TxHash.Bytes(), index[:]) +} - amount = new(big.Int) - id = new(big.Int) +func getLogSubTxIDWithTokenIDIndex(log types.Log, tokenIDIdx uint16) common.Hash { + // Get unique ID by using TxHash, log index and extra bytes (token id index for ERC1155 TransferBatch) + index := [4]byte{} + value := uint32(log.Index&0x0000FFFF) | (uint32(tokenIDIdx) << 16) // log index should not exceed uint16 max value + binary.BigEndian.PutUint32(index[:], value) + return crypto.Keccak256Hash(log.TxHash.Bytes(), index[:]) +} - for i := 1; i < erc1155TransferEventIndexedParameters; i++ { +func checkTopicsLength(ethlog types.Log, startIdx, endIdx int) (err error) { + for i := startIdx; i < endIdx; i++ { if len(ethlog.Topics[i]) != common.HashLength { - log.Warn(fmt.Sprintf("topic %d is not padded to %d byte address, topic=%s", i, common.HashLength, ethlog.Topics[1])) + err = fmt.Errorf("topic %d is not padded to %d byte address, topic=%s", i, common.HashLength, ethlog.Topics[i]) + log.Error("log_parser::checkTopicsLength", err) return } } - addressIdx := common.HashLength - common.AddressLength - copy(operator[:], ethlog.Topics[1][addressIdx:]) - copy(from[:], ethlog.Topics[2][addressIdx:]) - copy(to[:], ethlog.Topics[3][addressIdx:]) + return +} - if len(ethlog.Data) != common.HashLength*2 { - log.Warn("data is not padded to 64 bytes", "data", ethlog.Data) +func getFromToAddresses(ethlog types.Log) (from, to common.Address, err error) { + eventType := GetEventType(ðlog) + addressIdx := common.HashLength - common.AddressLength + switch eventType { + case Erc1155TransferSingleEventType, Erc1155TransferBatchEventType: + err = checkTopicsLength(ethlog, 2, 4) + if err != nil { + return + } + copy(from[:], ethlog.Topics[2][addressIdx:]) + copy(to[:], ethlog.Topics[3][addressIdx:]) + return + + case Erc20TransferEventType, Erc721TransferEventType, UniswapV2SwapEventType, UniswapV3SwapEventType, HopBridgeTransferFromL1CompletedEventType: + err = checkTopicsLength(ethlog, 1, 3) + if err != nil { + return + } + copy(from[:], ethlog.Topics[1][addressIdx:]) + copy(to[:], ethlog.Topics[2][addressIdx:]) return } - id.SetBytes(ethlog.Data[:common.HashLength]) - amount.SetBytes(ethlog.Data[common.HashLength:]) + return from, to, fmt.Errorf("unsupported event type to get from/to adddresses %s", eventType) +} +func ParseTransferLog(ethlog types.Log) (from, to common.Address, txIDs []common.Hash, tokenIDs, values []*big.Int, err error) { + eventType := GetEventType(ðlog) + + switch eventType { + case Erc20TransferEventType: + var amount *big.Int + from, to, amount = ParseErc20TransferLog(ðlog) + txIDs = append(txIDs, GetLogSubTxID(ethlog)) + values = append(values, amount) + return + case Erc721TransferEventType: + var tokenID *big.Int + from, to, tokenID = ParseErc721TransferLog(ðlog) + txIDs = append(txIDs, GetLogSubTxID(ethlog)) + tokenIDs = append(tokenIDs, tokenID) + values = append(values, big.NewInt(1)) + return + case Erc1155TransferSingleEventType, Erc1155TransferBatchEventType: + _, from, to, tokenIDs, values, err = ParseErc1155TransferLog(ðlog, eventType) + for i := range tokenIDs { + txIDs = append(txIDs, getLogSubTxIDWithTokenIDIndex(ethlog, uint16(i))) + } + return + } + + return from, to, txIDs, tokenIDs, values, fmt.Errorf("unsupported event type in log_parser::ParseTransferLogs %s", eventType) +} + +func ParseErc1155TransferLog(ethlog *types.Log, evType EventType) (operator, from, to common.Address, ids, amounts []*big.Int, err error) { + if len(ethlog.Topics) < erc1155TransferEventIndexedParameters { + err = fmt.Errorf("not enough topics for erc1155 transfer %s, %v", "topics", ethlog.Topics) + log.Error("log_parser::ParseErc1155TransferLog", "err", err) + return + } + + err = checkTopicsLength(*ethlog, 1, erc1155TransferEventIndexedParameters) + if err != nil { + return + } + + addressIdx := common.HashLength - common.AddressLength + copy(operator[:], ethlog.Topics[1][addressIdx:]) + from, to, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseErc1155TransferLog", "err", err) + return + } + + if len(ethlog.Data) == 0 || len(ethlog.Data)%(common.HashLength*2) != 0 { + err = fmt.Errorf("data is not padded to 64 bytes %s, %v", "data", ethlog.Data) + log.Error("log_parser::ParseErc1155TransferLog", "err", err) + return + } + + if evType == Erc1155TransferSingleEventType { + ids = append(ids, new(big.Int).SetBytes(ethlog.Data[:common.HashLength])) + amounts = append(amounts, new(big.Int).SetBytes(ethlog.Data[common.HashLength:])) + log.Debug("log_parser::ParseErc1155TransferSingleLog", "ids", ids, "amounts", amounts) + } else { + // idTypeSize := new(big.Int).SetBytes(ethlog.Data[:common.HashLength]).Uint64() // Left for knowledge + // valueTypeSize := new(big.Int).SetBytes(ethlog.Data[common.HashLength : common.HashLength*2]).Uint64() // Left for knowledge + idsArraySize := new(big.Int).SetBytes(ethlog.Data[common.HashLength*2 : common.HashLength*2+common.HashLength]).Uint64() + + initialOffset := common.HashLength*2 + common.HashLength + for i := 0; i < int(idsArraySize); i++ { + ids = append(ids, new(big.Int).SetBytes(ethlog.Data[initialOffset+i*common.HashLength:initialOffset+(i+1)*common.HashLength])) + } + valuesArraySize := new(big.Int).SetBytes(ethlog.Data[initialOffset+int(idsArraySize)*common.HashLength : initialOffset+int(idsArraySize+1)*common.HashLength]).Uint64() + + if idsArraySize != valuesArraySize { + err = fmt.Errorf("ids and values sizes don't match %d, %d", idsArraySize, valuesArraySize) + log.Error("log_parser::ParseErc1155TransferBatchLog", "err", err) + return + } + + initialOffset = initialOffset + int(idsArraySize+1)*common.HashLength + for i := 0; i < int(valuesArraySize); i++ { + amounts = append(amounts, new(big.Int).SetBytes(ethlog.Data[initialOffset+i*common.HashLength:initialOffset+(i+1)*common.HashLength])) + log.Debug("log_parser::ParseErc1155TransferBatchLog", "id", ids[i], "amount", amounts[i]) + } + } return } @@ -313,17 +406,11 @@ func ParseUniswapV2Log(ethlog *types.Log) (pairAddress common.Address, from comm } pairAddress = ethlog.Address - - if len(ethlog.Topics[1]) != 32 { - err = fmt.Errorf("second topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[1]) + from, to, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseUniswapV2Log", err) return } - if len(ethlog.Topics[2]) != 32 { - err = fmt.Errorf("third topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[2]) - return - } - copy(from[:], ethlog.Topics[1][12:]) - copy(to[:], ethlog.Topics[2][12:]) if len(ethlog.Data) != 32*4 { err = fmt.Errorf("data is not padded to 4 * 32 bytes big int %s, %v", "data", ethlog.Data) return @@ -359,17 +446,11 @@ func ParseUniswapV3Log(ethlog *types.Log) (poolAddress common.Address, sender co } poolAddress = ethlog.Address - - if len(ethlog.Topics[1]) != 32 { - err = fmt.Errorf("second topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[1]) + sender, recipient, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseUniswapV3Log", err) return } - if len(ethlog.Topics[2]) != 32 { - err = fmt.Errorf("third topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[2]) - return - } - copy(sender[:], ethlog.Topics[1][12:]) - copy(recipient[:], ethlog.Topics[2][12:]) if len(ethlog.Data) != 32*5 { err = fmt.Errorf("data is not padded to 5 * 32 bytes big int %s, %v", "data", ethlog.Data) return @@ -426,17 +507,11 @@ func ParseHopBridgeTransferFromL1CompletedLog(ethlog *types.Log) (recipient comm return } - if len(ethlog.Topics[1]) != 32 { - err = fmt.Errorf("second topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[1]) + recipient, relayer, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseHopBridgeTransferFromL1CompletedLog", err) return } - copy(recipient[:], ethlog.Topics[1][12:]) - - if len(ethlog.Topics[2]) != 32 { - err = fmt.Errorf("third topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[2]) - return - } - copy(relayer[:], ethlog.Topics[2][12:]) if len(ethlog.Data) != 32*4 { err = fmt.Errorf("data is not padded to 4 * 32 bytes big int %s, %v", "data", ethlog.Data) @@ -526,7 +601,7 @@ func GetEventSignatureHash(signature string) common.Hash { return crypto.Keccak256Hash([]byte(signature)) } -func ExtractTokenIdentity(dbEntryType Type, log *types.Log, tx *types.Transaction) (correctType Type, tokenAddress *common.Address, txTokenID *big.Int, txValue *big.Int, txFrom *common.Address, txTo *common.Address) { +func ExtractTokenTransferData(dbEntryType Type, log *types.Log, tx *types.Transaction) (correctType Type, tokenAddress *common.Address, txFrom *common.Address, txTo *common.Address) { // erc721 transfers share signature with erc20 ones, so they both used to be categorized as erc20 // by the Downloader. We fix this here since they might be mis-categorized in the db. if dbEntryType == Erc20Transfer { @@ -537,32 +612,27 @@ func ExtractTokenIdentity(dbEntryType Type, log *types.Log, tx *types.Transactio } switch correctType { - case EthTransfer: - if tx != nil { - txValue = new(big.Int).Set(tx.Value()) - } case Erc20Transfer: tokenAddress = new(common.Address) *tokenAddress = log.Address - from, to, value := ParseErc20TransferLog(log) - txValue = value + from, to, _ := ParseErc20TransferLog(log) txFrom = &from txTo = &to case Erc721Transfer: tokenAddress = new(common.Address) *tokenAddress = log.Address - from, to, tokenID := ParseErc721TransferLog(log) - txTokenID = tokenID + from, to, _ := ParseErc721TransferLog(log) txFrom = &from txTo = &to - case Erc1155SingleTransfer: + case Erc1155Transfer: tokenAddress = new(common.Address) *tokenAddress = log.Address - _, from, to, tokenID, value := ParseErc1155TransferSingleLog(log) - txTokenID = tokenID + _, from, to, _, _, err := ParseErc1155TransferLog(log, Erc1155TransferSingleEventType) // from/to extraction is the same for single and batch + if err != nil { + return + } txFrom = &from txTo = &to - txValue = value } return diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index 8256c099e..c29b31366 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -116,10 +116,9 @@ type erc20HistoricalCommand struct { chainClient chain.ClientInterface feed *event.Feed - iterator *IterativeDownloader - to *big.Int - from *big.Int - + iterator *IterativeDownloader + to *big.Int + from *big.Int foundHeaders []*DBHeader } @@ -203,7 +202,8 @@ func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) error { } func (c *controlCommand) Run(parent context.Context) error { - log.Info("start control command") + log.Debug("start control command") + ctx, cancel := context.WithTimeout(parent, 3*time.Second) head, err := c.chainClient.HeaderByNumber(ctx, nil) cancel() @@ -221,7 +221,7 @@ func (c *controlCommand) Run(parent context.Context) error { }) } - log.Info("current head is", "block number", head.Number) + log.Debug("current head is", "block number", head.Number) // Get last known block for each account lastKnownEthBlocks, accountsWithoutHistory, err := c.blockDAO.GetLastKnownBlockByAddresses(c.chainClient.NetworkID(), c.accounts) @@ -326,7 +326,7 @@ func (c *controlCommand) Run(parent context.Context) error { BlockNumber: target, }) } - log.Info("end control command") + log.Debug("end control command") return err } @@ -390,7 +390,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { // Take blocks from cache if available and disrespect the limit // If no blocks are available in cache, take blocks from DB respecting the limit // If no limit is set, take all blocks from DB - log.Info("start transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address, "blockNums", c.blockNums) + log.Debug("start transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address, "blockNums", c.blockNums) startTs := time.Now() for { @@ -450,7 +450,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { } } - log.Info("end transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address, + log.Debug("end transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address, "blocks.len", len(c.blockNums), "transfers.len", len(c.fetchedTransfers), "in", time.Since(startTs)) return nil @@ -771,7 +771,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher b fromByAddress map[common.Address]*Block, toByAddress map[common.Address]*big.Int) (map[common.Address]*big.Int, map[common.Address][]*DBHeader, error) { - log.Info("fast indexer started") + log.Debug("fast indexer started") start := time.Now() group := async.NewGroup(ctx) @@ -804,7 +804,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher b resultingFromByAddress[command.address] = command.resultingFrom headers[command.address] = command.foundHeaders } - log.Info("fast indexer finished", "in", time.Since(start)) + log.Debug("fast indexer finished", "in", time.Since(start)) return resultingFromByAddress, headers, nil } } @@ -812,7 +812,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher b // run fast indexing for every accont up to canonical chain head minus safety depth. // every account will run it from last synced header. func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, fromByAddress map[common.Address]*big.Int, toByAddress map[common.Address]*big.Int) (map[common.Address][]*DBHeader, error) { - log.Info("fast indexer Erc20 started") + log.Debug("fast indexer Erc20 started") start := time.Now() group := async.NewGroup(ctx) @@ -839,7 +839,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from for _, command := range commands { headers[command.address] = command.foundHeaders } - log.Info("fast indexer Erc20 finished", "in", time.Since(start)) + log.Debug("fast indexer Erc20 finished", "in", time.Since(start)) return headers, nil } } @@ -849,7 +849,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *Blo transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, feed *event.Feed) error { - log.Info("loadTransfers start", "accounts", accounts, "chain", chainClient.NetworkID(), "limit", blocksLimitPerAccount) + log.Debug("loadTransfers start", "accounts", accounts, "chain", chainClient.NetworkID(), "limit", blocksLimitPerAccount) start := time.Now() group := async.NewGroup(ctx) @@ -879,7 +879,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *Blo case <-ctx.Done(): return ctx.Err() case <-group.WaitAsync(): - log.Info("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.NetworkID()) + log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.NetworkID()) return nil } } @@ -899,7 +899,7 @@ func getLowestFrom(chainID uint64, to *big.Int) *big.Int { // Finds the latest range up to initialTo where the number of transactions is between 20 and 25 func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client chain.ClientInterface) (*big.Int, error) { - log.Info("findFirstRange", "account", account, "initialTo", initialTo, "client", client) + log.Debug("findFirstRange", "account", account, "initialTo", initialTo, "client", client) from := getLowestFrom(client.NetworkID(), initialTo) to := initialTo @@ -910,7 +910,7 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In } firstNonce, err := client.NonceAt(c, account, to) // this is the latest nonce actually - log.Info("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to) + log.Debug("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to) if err != nil { return nil, err @@ -943,15 +943,15 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In } nonceDiff = firstNonce - fromNonce - log.Info("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce) + log.Debug("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce) if goal <= nonceDiff && nonceDiff <= (goal+5) { - log.Info("range found", "account", account, "from", from, "to", to) + log.Debug("range found", "account", account, "from", from, "to", to) return from, nil } } - log.Info("range found", "account", account, "from", from, "to", to) + log.Debug("range found", "account", account, "from", from, "to", to) return from, nil } diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index a335348f3..51d0e0b8b 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -32,7 +32,7 @@ func (c *findNewBlocksCommand) Command() async.Command { } func (c *findNewBlocksCommand) Run(parent context.Context) (err error) { - log.Debug("start findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit) + log.Debug("start findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber) headNum, err := getHeadBlockNumber(parent, c.chainClient) if err != nil { @@ -47,6 +47,7 @@ func (c *findNewBlocksCommand) Run(parent context.Context) (err error) { return err // Will keep spinning forever nomatter what } + // In case no block range is in DB, skip until history blocks are fetched if blockRange != nil { c.fromBlockNumber = blockRange.LastKnown @@ -64,6 +65,8 @@ func (c *findNewBlocksCommand) Run(parent context.Context) (err error) { _ = c.findBlocksCommand.Run(parent) } + log.Debug("end findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber) + return nil } @@ -436,7 +439,7 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cache select { case <-ctx.Done(): err = ctx.Err() - log.Info("fast indexer ctx Done", "error", err) + log.Debug("fast indexer ctx Done", "error", err) return case <-group.WaitAsync(): if command.error != nil { @@ -491,7 +494,7 @@ func loadTransfersLoop(ctx context.Context, account common.Address, blockDAO *Bl for { select { case <-ctx.Done(): - log.Info("loadTransfersLoop error", "chain", chainClient.NetworkID(), "account", account, "error", ctx.Err()) + log.Info("loadTransfersLoop done", "chain", chainClient.NetworkID(), "account", account, "error", ctx.Err()) return case dbHeaders := <-blocksLoadedCh: log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.NetworkID(), "account", account, "headers", len(dbHeaders)) @@ -523,7 +526,6 @@ func newLoadBlocksAndTransfersCommand(account common.Address, db *Database, chainClient: chainClient, feed: feed, balanceCacher: balanceCacher, - errorsCount: 0, transactionManager: transactionManager, pendingTxManager: pendingTxManager, tokenManager: tokenManager, @@ -540,7 +542,6 @@ type loadBlocksAndTransfersCommand struct { chainClient chain.ClientInterface feed *event.Feed balanceCacher balance.Cacher - errorsCount int // nonArchivalRPCNode bool // TODO Make use of it transactionManager *TransactionManager pendingTxManager *transactions.PendingTxTracker @@ -556,16 +557,28 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { log.Debug("start load all transfers command", "chain", c.chainClient.NetworkID(), "account", c.account) ctx := parent + + // This wait group is used to wait for all the async commands to finish + // but fetchNewBlocksCommand, which is infinite, never finishes, can only be stopped + // by canceling the context which does not happen here, as we don't call group.Stop(). group := async.NewGroup(ctx) + // It will start loadTransfersCommand which will run until success when all transfers from DB are loaded err := c.fetchTransfersForLoadedBlocks(group) for err != nil { return err } + // Start transfers loop to load transfers for new blocks c.startTransfersLoop(ctx) - err = c.fetchHistoryBlocks(parent, group, c.blocksLoadedCh) + fromNum := big.NewInt(0) + toNum, err := getHeadBlockNumber(ctx, c.chainClient) + if err != nil { + return err + } + // This will start findBlocksCommand which will run until success when all blocks are loaded + err = c.fetchHistoryBlocks(parent, group, fromNum, toNum, c.blocksLoadedCh) for err != nil { group.Stop() group.Wait() @@ -595,20 +608,13 @@ func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) c.pendingTxManager, c.tokenManager, c.feed, c.blocksLoadedCh) } -func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, blocksLoadedCh chan []*DBHeader) error { +func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) error { log.Debug("fetchHistoryBlocks start", "chainID", c.chainClient.NetworkID(), "account", c.account, "omit", c.omitHistory) - headNum, err := getHeadBlockNumber(ctx, c.chainClient) - if err != nil { - // c.error = err - return err // Might need to retry a couple of times - } - if c.omitHistory { - blockRange := &BlockRange{nil, big.NewInt(0), headNum} + blockRange := &BlockRange{nil, big.NewInt(0), toNum} err := c.blockRangeDAO.upsertRange(c.chainClient.NetworkID(), c.account, blockRange) - log.Debug("fetchHistoryBlocks omit history", "chainID", c.chainClient.NetworkID(), "account", c.account, "headNum", headNum, "err", err) return err } @@ -619,13 +625,11 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, return err // Will keep spinning forever nomatter what } - /// first allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange) - to := getToHistoryBlockNumber(headNum, blockRange, allHistoryLoaded) - - log.Debug("fetchHistoryBlocks", "chainID", c.chainClient.NetworkID(), "account", c.account, "to", to, "allHistoryLoaded", allHistoryLoaded) if !allHistoryLoaded { + to := getToHistoryBlockNumber(toNum, blockRange, allHistoryLoaded) + fbc := &findBlocksCommand{ account: c.account, db: c.db, @@ -634,7 +638,7 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, balanceCacher: c.balanceCacher, feed: c.feed, noLimit: false, - fromBlockNumber: big.NewInt(0), + fromBlockNumber: fromNum, toBlockNumber: to, transactionManager: c.transactionManager, tokenManager: c.tokenManager, @@ -693,6 +697,11 @@ func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *asy return err } + if len(blocks) == 0 { + log.Debug("fetchTransfers no blocks to load", "chainID", c.chainClient.NetworkID(), "account", c.account) + return nil + } + blocksMap := make(map[common.Address][]*big.Int) blocksMap[c.account] = blocks diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 270d3f1c2..f878e9458 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -28,7 +28,6 @@ import ( "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/balance" - w_common "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/params" @@ -136,8 +135,17 @@ func (tc *TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([ // We do not verify addresses for now allTransfers := []testERC20Transfer{} signatures := q.Topics[0] - erc20TransferSignature := w_common.GetEventSignatureHash(w_common.Erc20_721TransferEventSignature) - erc1155TransferSingleSignature := w_common.GetEventSignatureHash(w_common.Erc1155TransferSingleEventSignature) + erc20TransferSignature := walletcommon.GetEventSignatureHash(walletcommon.Erc20_721TransferEventSignature) + erc1155TransferSingleSignature := walletcommon.GetEventSignatureHash(walletcommon.Erc1155TransferSingleEventSignature) + + var address common.Hash + for i := 1; i < len(q.Topics); i++ { + if len(q.Topics[i]) > 0 { + address = q.Topics[i][0] + break + } + } + if slices.Contains(signatures, erc1155TransferSingleSignature) { from := q.Topics[2] var to []common.Hash @@ -167,10 +175,24 @@ func (tc *TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([ logs := []types.Log{} for _, transfer := range allTransfers { if transfer.block.Cmp(q.FromBlock) >= 0 && transfer.block.Cmp(q.ToBlock) <= 0 { - logs = append(logs, types.Log{ + log := types.Log{ BlockNumber: transfer.block.Uint64(), BlockHash: common.BigToHash(transfer.block), - }) + } + + // Use the address at least in one any(from/to) topic to trick the implementation + switch transfer.eventType { + case walletcommon.Erc20TransferEventType, walletcommon.Erc721TransferEventType: + // To detect properly ERC721, we need a different number of topics. For now we use only ERC20 for testing + log.Topics = []common.Hash{walletcommon.GetEventSignatureHash(walletcommon.Erc20_721TransferEventSignature), address, address} + case walletcommon.Erc1155TransferSingleEventType: + log.Topics = []common.Hash{walletcommon.GetEventSignatureHash(walletcommon.Erc1155TransferSingleEventSignature), address, address, address} + log.Data = make([]byte, 2*common.HashLength) + case walletcommon.Erc1155TransferBatchEventType: + log.Topics = []common.Hash{walletcommon.GetEventSignatureHash(walletcommon.Erc1155TransferBatchEventSignature), address, address, address} + } + + logs = append(logs, log) } } @@ -365,19 +387,23 @@ func (tc *TestClient) prepareTokenBalanceHistory(toBlock int) { transfersPerToken := map[common.Address][]testERC20Transfer{} for _, transfer := range tc.outgoingERC20Transfers { transfer.amount = new(big.Int).Neg(transfer.amount) + transfer.eventType = walletcommon.Erc20TransferEventType transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer) } for _, transfer := range tc.incomingERC20Transfers { + transfer.eventType = walletcommon.Erc20TransferEventType transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer) } for _, transfer := range tc.outgoingERC1155SingleTransfers { transfer.amount = new(big.Int).Neg(transfer.amount) + transfer.eventType = walletcommon.Erc1155TransferSingleEventType transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer) } for _, transfer := range tc.incomingERC1155SingleTransfers { + transfer.eventType = walletcommon.Erc1155TransferSingleEventType transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer) } @@ -392,7 +418,7 @@ func (tc *TestClient) prepareTokenBalanceHistory(toBlock int) { currentBalance := big.NewInt(0) tc.tokenBalanceHistory[token] = map[uint64]*big.Int{} - transfers = append(transfers, testERC20Transfer{big.NewInt(int64(toBlock + 1)), token, big.NewInt(0)}) + transfers = append(transfers, testERC20Transfer{big.NewInt(int64(toBlock + 1)), token, big.NewInt(0), walletcommon.Erc20TransferEventType}) for _, transfer := range transfers { for blockN := currentBlock; blockN < transfer.block.Uint64(); blockN++ { @@ -573,9 +599,10 @@ func (tc *TestClient) GetIsConnected() bool { } type testERC20Transfer struct { - block *big.Int - address common.Address - amount *big.Int + block *big.Int + address common.Address + amount *big.Int + eventType walletcommon.EventType } type findBlockCase struct { @@ -614,7 +641,7 @@ func getCases() []findBlockCase { {75, 0, 1}, }, outgoingERC20Transfers: []testERC20Transfer{ - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, toBlock: 100, expectedBlocksFound: 6, @@ -682,7 +709,7 @@ func getCases() []findBlockCase { rangeSize: 20, expectedBlocksFound: 1, incomingERC20Transfers: []testERC20Transfer{ - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, } @@ -694,8 +721,8 @@ func getCases() []findBlockCase { incomingERC20Transfers: []testERC20Transfer{ // edge case when a regular scan will find transfer at 80, // but erc20 tail scan should only find transfer at block 6 - {big.NewInt(80), tokenTXXAddress, big.NewInt(1)}, - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(80), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 5, @@ -711,8 +738,8 @@ func getCases() []findBlockCase { // thus only 2 blocks found expectedBlocksFound: 2, incomingERC20Transfers: []testERC20Transfer{ - {big.NewInt(7), tokenTXYAddress, big.NewInt(1)}, - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(7), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 5, @@ -741,8 +768,8 @@ func getCases() []findBlockCase { // thus only 2 blocks found expectedBlocksFound: 2, incomingERC1155SingleTransfers: []testERC20Transfer{ - {big.NewInt(7), tokenTXYAddress, big.NewInt(1)}, - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(7), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 5, @@ -758,8 +785,8 @@ func getCases() []findBlockCase { rangeSize: 20, expectedBlocksFound: 3, outgoingERC1155SingleTransfers: []testERC20Transfer{ - {big.NewInt(80), tokenTXYAddress, big.NewInt(1)}, - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(80), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 15, // 3 for each range @@ -774,10 +801,10 @@ func getCases() []findBlockCase { rangeSize: 20, expectedBlocksFound: 3, outgoingERC1155SingleTransfers: []testERC20Transfer{ - {big.NewInt(80), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(80), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, }, outgoingERC20Transfers: []testERC20Transfer{ - {big.NewInt(63), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(63), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 6, // 3 for each range, 0 for tail check becauseERC20ScanByBalance returns no ranges @@ -792,10 +819,10 @@ func getCases() []findBlockCase { rangeSize: 20, expectedBlocksFound: 3, outgoingERC1155SingleTransfers: []testERC20Transfer{ - {big.NewInt(80), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(80), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, }, outgoingERC20Transfers: []testERC20Transfer{ - {big.NewInt(61), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(61), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 9, // 3 for each range of [40-100], 0 for tail check because ERC20ScanByBalance returns no ranges @@ -811,10 +838,10 @@ func getCases() []findBlockCase { rangeSize: 20, expectedBlocksFound: 2, outgoingERC1155SingleTransfers: []testERC20Transfer{ - {big.NewInt(85), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(85), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, }, incomingERC20Transfers: []testERC20Transfer{ - {big.NewInt(88), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(88), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 3, // 3 for each range of [40-100], 0 for tail check because ERC20ScanByBalance returns no ranges @@ -827,7 +854,7 @@ func getCases() []findBlockCase { {75, 0, 1}, }, outgoingERC20Transfers: []testERC20Transfer{ - {big.NewInt(80), tokenTXXAddress, big.NewInt(4)}, + {big.NewInt(80), tokenTXXAddress, big.NewInt(4), walletcommon.Erc20TransferEventType}, }, toBlock: 100, rangeSize: 20, @@ -885,8 +912,8 @@ func TestFindBlocksCommand(t *testing.T) { incomingERC1155SingleTransfers: testCase.incomingERC1155SingleTransfers, callsCounter: map[string]int{}, } - //tc.traceAPICalls = true - //tc.printPreparedData = true + // tc.traceAPICalls = true + // tc.printPreparedData = true tc.prepareBalanceHistory(100) tc.prepareTokenBalanceHistory(100) blockChannel := make(chan []*DBHeader, 100) @@ -1046,7 +1073,6 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) { chainClient: tc, feed: &event.Feed{}, balanceCacher: balance.NewCacherWithTTL(5 * time.Minute), - errorsCount: 0, transactionManager: tm, pendingTxManager: tracker, tokenManager: tokenManager, @@ -1060,7 +1086,11 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) { ctx := context.Background() group := async.NewGroup(ctx) - err = cmd.fetchHistoryBlocks(ctx, group, blockChannel) + + fromNum := big.NewInt(0) + toNum, err := getHeadBlockNumber(ctx, cmd.chainClient) + require.NoError(t, err) + err = cmd.fetchHistoryBlocks(ctx, group, fromNum, toNum, blockChannel) require.NoError(t, err) select { diff --git a/services/wallet/transfer/database.go b/services/wallet/transfer/database.go index 5090ed14c..3484627b5 100644 --- a/services/wallet/transfer/database.go +++ b/services/wallet/transfer/database.go @@ -244,7 +244,7 @@ func (db *Database) GetTransfersForIdentities(ctx context.Context, identities [] query := newTransfersQuery() for _, identity := range identities { subQuery := newSubQuery() - subQuery = subQuery.FilterNetwork(uint64(identity.ChainID)).FilterTransactionHash(identity.Hash).FilterAddress(identity.Address) + subQuery = subQuery.FilterNetwork(uint64(identity.ChainID)).FilterTransactionID(identity.Hash).FilterAddress(identity.Address) query.addSubQuery(subQuery, OrSeparator) } rows, err := db.client.QueryContext(ctx, query.String(), query.Args()...) @@ -255,8 +255,8 @@ func (db *Database) GetTransfersForIdentities(ctx context.Context, identities [] return query.TransferScan(rows) } -func (db *Database) GetTransactionsToLoad(chainID uint64, address common.Address, blockNumber *big.Int) (rst []PreloadedTransaction, err error) { - query := newTransfersQuery(). +func (db *Database) GetTransactionsToLoad(chainID uint64, address common.Address, blockNumber *big.Int) (rst []*PreloadedTransaction, err error) { + query := newTransfersQueryForPreloadedTransactions(). FilterNetwork(chainID). FilterAddress(address). FilterBlockNumber(blockNumber). @@ -354,8 +354,8 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco } insertTx, err := creator.Prepare(`INSERT OR IGNORE - INTO transfers (network_id, address, sender, hash, blk_number, blk_hash, type, timestamp, log, loaded, log_index) - VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, 0, ?)`) + INTO transfers (network_id, address, sender, hash, blk_number, blk_hash, type, timestamp, log, loaded, log_index, token_id, amount_padded128hex) + VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, 0, ?, ?, ?)`) if err != nil { return err } @@ -383,9 +383,11 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco continue } - _, err = insertTx.Exec(chainID, account, account, transaction.ID, (*bigint.SQLBigInt)(header.Number), header.Hash, w_common.Erc20Transfer, &JSONBlob{transaction.Log}, logIndex) + tokenID := (*bigint.SQLBigIntBytes)(transaction.TokenID) + txValue := sqlite.BigIntToPadded128BitsStr(transaction.Value) + _, err = insertTx.Exec(chainID, account, account, transaction.ID, (*bigint.SQLBigInt)(header.Number), header.Hash, transaction.Type, &JSONBlob{transaction.Log}, logIndex, tokenID, txValue) if err != nil { - log.Error("error saving Erc20transfer", "err", err) + log.Error("error saving token transfer", "err", err) return err } } @@ -428,7 +430,14 @@ func updateOrInsertTransfers(chainID uint64, creator statementCreator, transfers var txTo *common.Address if t.Transaction != nil { if t.Log != nil { - _, tokenAddress, tokenID, txValue, txFrom, txTo = w_common.ExtractTokenIdentity(t.Type, t.Log, t.Transaction) + _, tokenAddress, txFrom, txTo = w_common.ExtractTokenTransferData(t.Type, t.Log, t.Transaction) + tokenID = t.TokenID + // Zero tokenID can be used for ERC721 and ERC1155 transfers but when serialzed/deserialized it becomes nil + // as 0 value of big.Int bytes is nil. + if tokenID == nil && (t.Type == w_common.Erc721Transfer || t.Type == w_common.Erc1155Transfer) { + tokenID = big.NewInt(0) + } + txValue = t.TokenValue } else { txValue = new(big.Int).Set(t.Transaction.Value()) txFrom = &t.From diff --git a/services/wallet/transfer/database_test.go b/services/wallet/transfer/database_test.go index 365cca8a0..bc904ffa7 100644 --- a/services/wallet/transfer/database_test.go +++ b/services/wallet/transfer/database_test.go @@ -121,7 +121,7 @@ func TestDBReorgTransfers(t *testing.T) { } require.NoError(t, db.ProcessBlocks(777, original.Address, original.Number, lastBlock, []*DBHeader{original})) require.NoError(t, db.ProcessTransfers(777, []Transfer{ - {w_common.EthTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, true, 1777, common.Address{1}, rcpt, nil, "2100", NoMultiTransactionID}, + {w_common.EthTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, true, 1777, common.Address{1}, rcpt, nil, nil, nil, "2100", NoMultiTransactionID}, }, []*DBHeader{})) nonce = int64(0) lastBlock = &Block{ @@ -131,7 +131,7 @@ func TestDBReorgTransfers(t *testing.T) { } require.NoError(t, db.ProcessBlocks(777, replaced.Address, replaced.Number, lastBlock, []*DBHeader{replaced})) require.NoError(t, db.ProcessTransfers(777, []Transfer{ - {w_common.EthTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, true, 1777, common.Address{1}, rcpt, nil, "2100", NoMultiTransactionID}, + {w_common.EthTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, true, 1777, common.Address{1}, rcpt, nil, nil, nil, "2100", NoMultiTransactionID}, }, []*DBHeader{original})) all, err := db.GetTransfers(777, big.NewInt(0), nil) diff --git a/services/wallet/transfer/downloader.go b/services/wallet/transfer/downloader.go index bf55da7e3..a1dc9136d 100644 --- a/services/wallet/transfer/downloader.go +++ b/services/wallet/transfer/downloader.go @@ -2,7 +2,6 @@ package transfer import ( "context" - "encoding/binary" "errors" "math/big" "time" @@ -11,20 +10,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/rpc/chain" w_common "github.com/status-im/status-go/services/wallet/common" ) -func getLogSubTxID(log types.Log) common.Hash { - // Get unique ID by using TxHash and log index - index := [4]byte{} - binary.BigEndian.PutUint32(index[:], uint32(log.Index)) - return crypto.Keccak256Hash(log.TxHash.Bytes(), index[:]) -} - var ( zero = big.NewInt(0) one = big.NewInt(1) @@ -32,26 +23,23 @@ var ( ) // Partial transaction info obtained by ERC20Downloader. -// A PreloadedTransaction represents a Transaction which contains one or more -// ERC20/ERC721 transfer events. -// To be converted into one or many Transfer objects post-indexing. +// A PreloadedTransaction represents a Transaction which contains one +// ERC20/ERC721/ERC1155 transfer event. +// To be converted into one Transfer object post-indexing. type PreloadedTransaction struct { - NetworkID uint64 - Type w_common.Type `json:"type"` - ID common.Hash `json:"-"` - Address common.Address `json:"address"` - BlockNumber *big.Int `json:"blockNumber"` - BlockHash common.Hash `json:"blockhash"` - Loaded bool - // From is derived from tx signature in order to offload this computation from UI component. - From common.Address `json:"from"` + Type w_common.Type `json:"type"` + ID common.Hash `json:"-"` + Address common.Address `json:"address"` // Log that was used to generate preloaded transaction. - Log *types.Log `json:"log"` - BaseGasFees string + Log *types.Log `json:"log"` + TokenID *big.Int `json:"tokenId"` + Value *big.Int `json:"value"` } // Transfer stores information about transfer. // A Transfer represents a plain ETH transfer or some token activity inside a Transaction +// Since ERC1155 transfers can contain multiple tokens, a single Transfer represents a single token transfer, +// that means ERC1155 batch transfers will be represented by multiple Transfer objects. type Transfer struct { Type w_common.Type `json:"type"` ID common.Hash `json:"-"` @@ -66,13 +54,17 @@ type Transfer struct { From common.Address `json:"from"` Receipt *types.Receipt `json:"receipt"` // Log that was used to generate erc20 transfer. Nil for eth transfer. - Log *types.Log `json:"log"` + Log *types.Log `json:"log"` + // TokenID is the id of the transferred token. Nil for eth transfer. + TokenID *big.Int `json:"tokenId"` + // TokenValue is the value of the token transfer. Nil for eth transfer. + TokenValue *big.Int `json:"tokenValue"` BaseGasFees string // Internal field that is used to track multi-transaction transfers. MultiTransactionID MultiTransactionIDType `json:"multi_transaction_id"` } -// ETHDownloader downloads regular eth transfers. +// ETHDownloader downloads regular eth transfers and tokens transfers. type ETHDownloader struct { chainClient chain.ClientInterface accounts []common.Address @@ -137,12 +129,70 @@ func getTransferByHash(ctx context.Context, client chain.ClientInterface, signer return transfer, nil } -func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) (rst []Transfer, err error) { +func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) ([]Transfer, error) { startTs := time.Now() + rst := make([]Transfer, 0, len(blk.Transactions())) + + receiptsByAddressAndTxHash := make(map[common.Address]map[common.Hash]*types.Receipt) + txsByAddressAndTxHash := make(map[common.Address]map[common.Hash]*types.Transaction) + + addReceiptToCache := func(address common.Address, txHash common.Hash, receipt *types.Receipt) { + if receiptsByAddressAndTxHash[address] == nil { + receiptsByAddressAndTxHash[address] = make(map[common.Hash]*types.Receipt) + } + receiptsByAddressAndTxHash[address][txHash] = receipt + } + + addTxToCache := func(address common.Address, txHash common.Hash, tx *types.Transaction) { + if txsByAddressAndTxHash[address] == nil { + txsByAddressAndTxHash[address] = make(map[common.Hash]*types.Transaction) + } + txsByAddressAndTxHash[address][txHash] = tx + } + + getReceiptFromCache := func(address common.Address, txHash common.Hash) *types.Receipt { + if receiptsByAddressAndTxHash[address] == nil { + return nil + } + return receiptsByAddressAndTxHash[address][txHash] + } + + getTxFromCache := func(address common.Address, txHash common.Hash) *types.Transaction { + if txsByAddressAndTxHash[address] == nil { + return nil + } + return txsByAddressAndTxHash[address][txHash] + } + + getReceipt := func(address common.Address, txHash common.Hash) (receipt *types.Receipt, err error) { + receipt = getReceiptFromCache(address, txHash) + if receipt == nil { + receipt, err = d.fetchTransactionReceipt(ctx, txHash) + if err != nil { + return nil, err + } + addReceiptToCache(address, txHash, receipt) + } + return receipt, nil + } + + getTx := func(address common.Address, txHash common.Hash) (tx *types.Transaction, err error) { + tx = getTxFromCache(address, txHash) + if tx == nil { + tx, err = d.fetchTransaction(ctx, txHash) + if err != nil { + return nil, err + } + addTxToCache(address, txHash, tx) + } + return tx, nil + } + for _, address := range accounts { - // During block discovery, we should have populated the DB with 1 item per Transaction containing - // erc20/erc721 transfers + // During block discovery, we should have populated the DB with 1 item per transfer log containing + // erc20/erc721/erc1155 transfers. + // ID is a hash of the tx hash and the log index. log_index is unique per ERC20/721 tx, but not per ERC1155 tx. transactionsToLoad, err := d.db.GetTransactionsToLoad(d.chainClient.NetworkID(), address, blk.Number()) if err != nil { return nil, err @@ -150,10 +200,22 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc areSubTxsCheckedForTxHash := make(map[common.Hash]bool) + log.Debug("getTransfersInBlock", "block", blk.Number(), "transactionsToLoad", len(transactionsToLoad)) + for _, t := range transactionsToLoad { - subtransactions, err := d.subTransactionsFromTransactionHash(ctx, t.Log.TxHash, address) + receipt, err := getReceipt(address, t.Log.TxHash) if err != nil { - log.Error("can't fetch subTxs for erc20/erc721 transfer", "error", err) + return nil, err + } + + tx, err := getTx(address, t.Log.TxHash) + if err != nil { + return nil, err + } + + subtransactions, err := d.subTransactionsFromPreloaded(t, tx, receipt, blk) + if err != nil { + log.Error("can't fetch subTxs for erc20/erc721/erc1155 transfer", "error", err) return nil, err } rst = append(rst, subtransactions...) @@ -188,12 +250,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc } if isPlainTransfer || mustCheckSubTxs { - receipt, err := d.chainClient.TransactionReceipt(ctx, tx.Hash()) - if err != nil { - return nil, err - } - - baseGasFee, err := d.chainClient.GetBaseFeeFromBlock(blk.Number()) + receipt, err := getReceipt(address, tx.Hash()) if err != nil { return nil, err } @@ -201,7 +258,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc // Since we've already got the receipt, check for subTxs of // interest in case we haven't already. if !areSubTxsCheckedForTxHash[tx.Hash()] { - subtransactions, err := d.subTransactionsFromTransactionData(tx, receipt, blk, baseGasFee, address) + subtransactions, err := d.subTransactionsFromTransactionData(address, from, tx, receipt, blk) if err != nil { log.Error("can't fetch subTxs for eth transfer", "error", err) return nil, err @@ -224,7 +281,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc From: from, Receipt: receipt, Log: nil, - BaseGasFees: baseGasFee, + BaseGasFees: blk.BaseFee().String(), MultiTransactionID: NoMultiTransactionID}) } } @@ -284,44 +341,40 @@ func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]co } func (d *ERC20TransfersDownloader) inboundERC20OutboundERC1155Topics(address common.Address) [][]common.Hash { - return [][]common.Hash{{d.signature, d.signatureErc1155Single}, {}, {d.paddedAddress(address)}} + return [][]common.Hash{{d.signature, d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {d.paddedAddress(address)}} } -func (d *ERC20TransfersDownloader) inboundTopicsERC1155Single(address common.Address) [][]common.Hash { - return [][]common.Hash{{d.signatureErc1155Single}, {}, {}, {d.paddedAddress(address)}} +func (d *ERC20TransfersDownloader) inboundTopicsERC1155(address common.Address) [][]common.Hash { + return [][]common.Hash{{d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {}, {d.paddedAddress(address)}} } -func (d *ETHDownloader) subTransactionsFromTransactionHash(parent context.Context, txHash common.Hash, address common.Address) ([]Transfer, error) { +func (d *ETHDownloader) fetchTransactionReceipt(parent context.Context, txHash common.Hash) (*types.Receipt, error) { ctx, cancel := context.WithTimeout(parent, 3*time.Second) - tx, _, err := d.chainClient.TransactionByHash(ctx, txHash) - cancel() - if err != nil { - return nil, err - } - - ctx, cancel = context.WithTimeout(parent, 3*time.Second) receipt, err := d.chainClient.TransactionReceipt(ctx, txHash) cancel() if err != nil { return nil, err } + return receipt, nil +} - ctx, cancel = context.WithTimeout(parent, 3*time.Second) - blk, err := d.chainClient.BlockByHash(ctx, receipt.BlockHash) +func (d *ETHDownloader) fetchTransaction(parent context.Context, txHash common.Hash) (*types.Transaction, error) { + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + tx, _, err := d.chainClient.TransactionByHash(ctx, txHash) // TODO Save on requests by checking in the DB first cancel() if err != nil { return nil, err } - - baseGasFee, err := d.chainClient.GetBaseFeeFromBlock(receipt.BlockNumber) - if err != nil { - return nil, err - } - - return d.subTransactionsFromTransactionData(tx, receipt, blk, baseGasFee, address) + return tx, nil } -func (d *ETHDownloader) subTransactionsFromTransactionData(tx *types.Transaction, receipt *types.Receipt, blk *types.Block, baseGasFee string, address common.Address) ([]Transfer, error) { +func (d *ETHDownloader) subTransactionsFromPreloaded(preloadedTx *PreloadedTransaction, tx *types.Transaction, receipt *types.Receipt, blk *types.Block) ([]Transfer, error) { + log.Debug("subTransactionsFromPreloaded start", "txHash", tx.Hash().Hex(), "address", preloadedTx.Address, "tokenID", preloadedTx.TokenID, "value", preloadedTx.Value) + address := preloadedTx.Address + txLog := preloadedTx.Log + + rst := make([]Transfer, 0, 1) + from, err := types.Sender(d.signer, tx) if err != nil { if err == core.ErrTxTypeNotSupported { @@ -330,47 +383,63 @@ func (d *ETHDownloader) subTransactionsFromTransactionData(tx *types.Transaction return nil, err } - rst := make([]Transfer, 0, len(receipt.Logs)) - for _, log := range receipt.Logs { - eventType := w_common.GetEventType(log) - // Only add ERC20/ERC721/ERC1155 transfers from/to the given account - // Other types of events get always added - mustAppend := false - switch eventType { - case w_common.Erc20TransferEventType: - trFrom, trTo, _ := w_common.ParseErc20TransferLog(log) - if trFrom == address || trTo == address { - mustAppend = true - } - case w_common.Erc721TransferEventType: - trFrom, trTo, _ := w_common.ParseErc721TransferLog(log) - if trFrom == address || trTo == address { - mustAppend = true - } - case w_common.Erc1155TransferSingleEventType: - _, trFrom, trTo, _, _ := w_common.ParseErc1155TransferSingleLog(log) - if trFrom == address || trTo == address { - mustAppend = true - } - case w_common.UniswapV2SwapEventType, w_common.UniswapV3SwapEventType: - mustAppend = true - case w_common.HopBridgeTransferSentToL2EventType, w_common.HopBridgeTransferFromL1CompletedEventType: - mustAppend = true - case w_common.HopBridgeWithdrawalBondedEventType, w_common.HopBridgeTransferSentEventType: - mustAppend = true + eventType := w_common.GetEventType(preloadedTx.Log) + // Only add ERC20/ERC721/ERC1155 transfers from/to the given account + // from/to matching is already handled by getLogs filter + switch eventType { + case w_common.Erc20TransferEventType, + w_common.Erc721TransferEventType, + w_common.Erc1155TransferSingleEventType, w_common.Erc1155TransferBatchEventType: + log.Debug("subTransactionsFromPreloaded transfer", "eventType", eventType, "logIdx", txLog.Index, "txHash", tx.Hash().Hex(), "address", address.Hex(), "tokenID", preloadedTx.TokenID, "value", preloadedTx.Value, "baseFee", blk.BaseFee().String()) + + transfer := Transfer{ + Type: w_common.EventTypeToSubtransactionType(eventType), + ID: preloadedTx.ID, + Address: address, + BlockNumber: new(big.Int).SetUint64(txLog.BlockNumber), + BlockHash: txLog.BlockHash, + Loaded: true, + NetworkID: d.signer.ChainID().Uint64(), + From: from, + Log: txLog, + TokenID: preloadedTx.TokenID, + TokenValue: preloadedTx.Value, + BaseGasFees: blk.BaseFee().String(), + Transaction: tx, + Receipt: receipt, + Timestamp: blk.Time(), + MultiTransactionID: NoMultiTransactionID, } - if mustAppend { + + rst = append(rst, transfer) + } + + log.Debug("subTransactionsFromPreloaded end", "txHash", tx.Hash().Hex(), "address", address.Hex(), "tokenID", preloadedTx.TokenID, "value", preloadedTx.Value) + return rst, nil +} + +func (d *ETHDownloader) subTransactionsFromTransactionData(address, from common.Address, tx *types.Transaction, receipt *types.Receipt, blk *types.Block) ([]Transfer, error) { + log.Debug("subTransactionsFromTransactionData start", "txHash", tx.Hash().Hex(), "address", address) + + rst := make([]Transfer, 0, 1) + + for _, txLog := range receipt.Logs { + eventType := w_common.GetEventType(txLog) + switch eventType { + case w_common.UniswapV2SwapEventType, w_common.UniswapV3SwapEventType, + w_common.HopBridgeTransferSentToL2EventType, w_common.HopBridgeTransferFromL1CompletedEventType, + w_common.HopBridgeWithdrawalBondedEventType, w_common.HopBridgeTransferSentEventType: transfer := Transfer{ Type: w_common.EventTypeToSubtransactionType(eventType), - ID: getLogSubTxID(*log), + ID: w_common.GetLogSubTxID(*txLog), Address: address, - BlockNumber: new(big.Int).SetUint64(log.BlockNumber), - BlockHash: log.BlockHash, + BlockNumber: new(big.Int).SetUint64(txLog.BlockNumber), + BlockHash: txLog.BlockHash, Loaded: true, NetworkID: d.signer.ChainID().Uint64(), From: from, - Log: log, - BaseGasFees: baseGasFee, + Log: txLog, + BaseGasFees: blk.BaseFee().String(), Transaction: tx, Receipt: receipt, Timestamp: blk.Time(), @@ -381,11 +450,13 @@ func (d *ETHDownloader) subTransactionsFromTransactionData(tx *types.Transaction } } + log.Debug("subTransactionsFromTransactionData end", "txHash", tx.Hash().Hex(), "address", address.Hex()) return rst, nil } func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]*DBHeader, error) { concurrent := NewConcurrentDownloader(parent, NoThreadLimit) + for i := range logs { l := logs[i] @@ -393,40 +464,48 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [ continue } - id := getLogSubTxID(l) - baseGasFee, err := d.client.GetBaseFeeFromBlock(new(big.Int).SetUint64(l.BlockNumber)) + from, to, txIDs, tokenIDs, values, err := w_common.ParseTransferLog(l) if err != nil { - return nil, err + log.Error("failed to parse transfer log", "log", l, "address", address, "error", err) + continue } - logType := w_common.EventTypeToSubtransactionType(w_common.GetEventType(&l)) - log.Debug("block from logs", "block", l.BlockNumber, "log", l, "logType", logType, "address", address, "id", id) - - if logType != w_common.Erc1155SingleTransfer && logType != w_common.Erc1155BatchTransfer { - logType = w_common.Erc20Transfer + // Double check provider returned the correct log + if from != address && to != address { + log.Error("from/to address mismatch", "log", l, "address", address) + continue } - header := &DBHeader{ - Number: big.NewInt(int64(l.BlockNumber)), - Hash: l.BlockHash, - PreloadedTransactions: []*PreloadedTransaction{{ - Address: address, - BlockNumber: big.NewInt(int64(l.BlockNumber)), - BlockHash: l.BlockHash, - ID: id, - From: address, - Loaded: false, - Type: logType, - Log: &l, - BaseGasFees: baseGasFee, - }}, - Loaded: false, - } + eventType := w_common.GetEventType(&l) + logType := w_common.EventTypeToSubtransactionType(eventType) - concurrent.Add(func(ctx context.Context) error { - concurrent.PushHeader(header) - return nil - }) + for i, txID := range txIDs { + log.Debug("block from logs", "block", l.BlockNumber, "log", l, "logType", logType, "address", address, "txID", txID) + + // For ERC20 there is no tokenID, so we use nil + var tokenID *big.Int + if len(tokenIDs) > i { + tokenID = tokenIDs[i] + } + + header := &DBHeader{ + Number: big.NewInt(int64(l.BlockNumber)), + Hash: l.BlockHash, + PreloadedTransactions: []*PreloadedTransaction{{ + ID: txID, + Type: logType, + Log: &l, + TokenID: tokenID, + Value: values[i], + }}, + Loaded: false, + } + + concurrent.Add(func(ctx context.Context) error { + concurrent.PushHeader(header) + return nil + }) + } } select { case <-concurrent.WaitAsync(): @@ -478,7 +557,7 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro inbound1155, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ FromBlock: from, ToBlock: to, - Topics: d.inboundTopicsERC1155Single(address), + Topics: d.inboundTopicsERC1155(address), }) if err != nil { return nil, err diff --git a/services/wallet/transfer/query.go b/services/wallet/transfer/query.go index 8cf425ae1..39c1072cd 100644 --- a/services/wallet/transfer/query.go +++ b/services/wallet/transfer/query.go @@ -3,6 +3,7 @@ package transfer import ( "bytes" "database/sql" + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -10,7 +11,8 @@ import ( "github.com/status-im/status-go/services/wallet/bigint" ) -const baseTransfersQuery = "SELECT hash, type, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log, network_id, base_gas_fee, COALESCE(multi_transaction_id, 0) FROM transfers" +const baseTransfersQuery = "SELECT hash, type, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log, network_id, base_gas_fee, COALESCE(multi_transaction_id, 0) %s FROM transfers" +const preloadedTransfersQuery = "SELECT hash, type, address, log, token_id, amount_padded128hex FROM transfers" type transfersQuery struct { buf *bytes.Buffer @@ -21,7 +23,14 @@ type transfersQuery struct { func newTransfersQuery() *transfersQuery { newQuery := newEmptyQuery() - newQuery.buf.WriteString(baseTransfersQuery) + transfersQueryString := fmt.Sprintf(baseTransfersQuery, "") + newQuery.buf.WriteString(transfersQueryString) + return newQuery +} + +func newTransfersQueryForPreloadedTransactions() *transfersQuery { + newQuery := newEmptyQuery() + newQuery.buf.WriteString(preloadedTransfersQuery) return newQuery } @@ -110,13 +119,20 @@ func (q *transfersQuery) FilterAddress(address common.Address) *transfersQuery { return q } -func (q *transfersQuery) FilterTransactionHash(hash common.Hash) *transfersQuery { +func (q *transfersQuery) FilterTransactionID(hash common.Hash) *transfersQuery { q.addWhereSeparator(AndSeparator) q.buf.WriteString(" hash = ?") q.args = append(q.args, hash) return q } +func (q *transfersQuery) FilterTransactionHash(hash common.Hash) *transfersQuery { + q.addWhereSeparator(AndSeparator) + q.buf.WriteString(" tx_hash = ?") + q.args = append(q.args, hash) + return q +} + func (q *transfersQuery) FilterBlockHash(blockHash common.Hash) *transfersQuery { q.addWhereSeparator(AndSeparator) q.buf.WriteString(" blk_hash = ?") @@ -167,25 +183,48 @@ func (q *transfersQuery) TransferScan(rows *sql.Rows) (rst []Transfer, err error return rst, nil } -func (q *transfersQuery) PreloadedTransactionScan(rows *sql.Rows) (rst []PreloadedTransaction, err error) { - transfers, err := q.TransferScan(rows) - if err != nil { - return +func (q *transfersQuery) PreloadedTransactionScan(rows *sql.Rows) (rst []*PreloadedTransaction, err error) { + transfers := make([]Transfer, 0) + for rows.Next() { + transfer := Transfer{ + Log: &types.Log{}, + } + tokenValue := sql.NullString{} + tokenID := sql.RawBytes{} + err = rows.Scan( + &transfer.ID, &transfer.Type, + &transfer.Address, + &JSONBlob{transfer.Log}, + &tokenID, &tokenValue) + + if len(tokenID) > 0 { + transfer.TokenID = new(big.Int).SetBytes(tokenID) + } + + if tokenValue.Valid { + var ok bool + transfer.TokenValue, ok = new(big.Int).SetString(tokenValue.String, 16) + if !ok { + panic("failed to parse token value") + } + } + + if err != nil { + return nil, err + } + transfers = append(transfers, transfer) } - rst = make([]PreloadedTransaction, 0, len(transfers)) + rst = make([]*PreloadedTransaction, 0, len(transfers)) for _, transfer := range transfers { - preloadedTransaction := PreloadedTransaction{ - ID: transfer.ID, - Type: transfer.Type, - BlockHash: transfer.BlockHash, - BlockNumber: transfer.BlockNumber, - Address: transfer.Address, - From: transfer.From, - Log: transfer.Log, - NetworkID: transfer.NetworkID, - BaseGasFees: transfer.BaseGasFees, + preloadedTransaction := &PreloadedTransaction{ + ID: transfer.ID, + Type: transfer.Type, + Address: transfer.Address, + Log: transfer.Log, + TokenID: transfer.TokenID, + Value: transfer.TokenValue, } rst = append(rst, preloadedTransaction)