diff --git a/appdatabase/database.go b/appdatabase/database.go index 4b24004f1..ba745797b 100644 --- a/appdatabase/database.go +++ b/appdatabase/database.go @@ -335,7 +335,14 @@ func migrateWalletJSONBlobs(sqlTx *sql.Tx) error { func extractToken(entryType string, tx *types.Transaction, l *types.Log, logValid bool) (correctType w_common.Type, tokenID *big.Int, value *big.Int, tokenAddress *common.Address) { if logValid { - correctType, tokenAddress, tokenID, value, _, _ = w_common.ExtractTokenIdentity(w_common.Type(entryType), l, tx) + correctType, tokenAddress, _, _ = w_common.ExtractTokenTransferData(w_common.Type(entryType), l, tx) + _, _, _, tokenIDs, values, _ := w_common.ParseTransferLog(*l) + if len(tokenIDs) > 0 { + tokenID = tokenIDs[0] + } + if len(values) > 0 { + value = values[0] + } } else { correctType = w_common.Type(entryType) value = new(big.Int).Set(tx.Value()) @@ -394,7 +401,7 @@ func migrateWalletTransferFromToAddresses(sqlTx *sql.Tx) error { if nullableTx.Valid { if nullableL.Valid { - _, tokenAddress, _, _, txFrom, txTo = w_common.ExtractTokenIdentity(w_common.Type(entryType), l, tx) + _, tokenAddress, txFrom, txTo = w_common.ExtractTokenTransferData(w_common.Type(entryType), l, tx) } else { txFrom = &sender txTo = tx.To() diff --git a/appdatabase/database_test.go b/appdatabase/database_test.go index fdd8d6849..bedf8140a 100644 --- a/appdatabase/database_test.go +++ b/appdatabase/database_test.go @@ -297,21 +297,24 @@ func TestMigrateWalletJsonBlobs(t *testing.T) { require.Equal(t, *sqlite.BigIntToPadded128BitsStr(tt.Value()), amount128Hex.String) require.True(t, isTokenIDNull) } else { - actualEntryType, expectedTokenAddress, expectedTokenID, expectedValue, expectedFrom, expectedTo := w_common.ExtractTokenIdentity(expectedEntryType, tl, tt) + actualEntryType, expectedTokenAddress, _, _ := w_common.ExtractTokenTransferData(expectedEntryType, tl, tt) if actualEntryType == w_common.Erc20Transfer { + expectedFrom, expectedTo, expectedValue := w_common.ParseErc20TransferLog(tl) require.True(t, amount128Hex.Valid) require.Equal(t, *sqlite.BigIntToPadded128BitsStr(expectedValue), amount128Hex.String) require.True(t, isTokenIDNull) require.Equal(t, *expectedTokenAddress, *tokenAddress) - require.Equal(t, *expectedFrom, *txFrom) - require.Equal(t, *expectedTo, *txTo) + require.Equal(t, expectedFrom, *txFrom) + require.Equal(t, expectedTo, *txTo) } else if actualEntryType == w_common.Erc721Transfer { - require.False(t, amount128Hex.Valid) + expectedFrom, expectedTo, expectedTokenID := w_common.ParseErc721TransferLog(tl) + require.True(t, amount128Hex.Valid) + require.Equal(t, *sqlite.BigIntToPadded128BitsStr(big.NewInt(1)), amount128Hex.String) require.False(t, isTokenIDNull) require.Equal(t, expectedTokenID, expectedTokenID) require.Equal(t, *expectedTokenAddress, *tokenAddress) - require.Equal(t, *expectedFrom, *txFrom) - require.Equal(t, *expectedTo, *txTo) + require.Equal(t, expectedFrom, *txFrom) + require.Equal(t, expectedTo, *txTo) } else { require.False(t, amount128Hex.Valid) require.True(t, isTokenIDNull) diff --git a/services/wallet/common/log_parser.go b/services/wallet/common/log_parser.go index b6dbc4f12..4a5dcc1a8 100644 --- a/services/wallet/common/log_parser.go +++ b/services/wallet/common/log_parser.go @@ -3,6 +3,7 @@ package common import ( + "encoding/binary" "fmt" "math/big" @@ -20,16 +21,15 @@ type EventType string const ( // Transaction types - EthTransfer Type = "eth" - Erc20Transfer Type = "erc20" - Erc721Transfer Type = "erc721" - Erc1155SingleTransfer Type = "erc1155" - Erc1155BatchTransfer Type = "erc1155" - UniswapV2Swap Type = "uniswapV2Swap" - UniswapV3Swap Type = "uniswapV3Swap" - HopBridgeFrom Type = "HopBridgeFrom" - HopBridgeTo Type = "HopBridgeTo" - unknownTransaction Type = "unknown" + EthTransfer Type = "eth" + Erc20Transfer Type = "erc20" + Erc721Transfer Type = "erc721" + Erc1155Transfer Type = "erc1155" + UniswapV2Swap Type = "uniswapV2Swap" + UniswapV3Swap Type = "uniswapV3Swap" + HopBridgeFrom Type = "HopBridgeFrom" + HopBridgeTo Type = "HopBridgeTo" + unknownTransaction Type = "unknown" // Event types WETHDepositEventType EventType = "wethDepositEvent" @@ -136,10 +136,8 @@ func EventTypeToSubtransactionType(eventType EventType) Type { return Erc20Transfer case Erc721TransferEventType: return Erc721Transfer - case Erc1155TransferSingleEventType: - return Erc1155SingleTransfer - case Erc1155TransferBatchEventType: - return Erc1155BatchTransfer + case Erc1155TransferSingleEventType, Erc1155TransferBatchEventType: + return Erc1155Transfer case UniswapV2SwapEventType: return UniswapV2Swap case UniswapV3SwapEventType: @@ -226,16 +224,13 @@ func ParseErc20TransferLog(ethlog *types.Log) (from, to common.Address, amount * log.Warn("not enough topics for erc20 transfer", "topics", ethlog.Topics) return } - if len(ethlog.Topics[1]) != 32 { - log.Warn("second topic is not padded to 32 byte address", "topic", ethlog.Topics[1]) + var err error + from, to, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseErc20TransferLog", err) return } - if len(ethlog.Topics[2]) != 32 { - log.Warn("third topic is not padded to 32 byte address", "topic", ethlog.Topics[2]) - return - } - copy(from[:], ethlog.Topics[1][12:]) - copy(to[:], ethlog.Topics[2][12:]) + if len(ethlog.Data) != 32 { log.Warn("data is not padded to 32 byts big int", "data", ethlog.Data) return @@ -251,52 +246,150 @@ func ParseErc721TransferLog(ethlog *types.Log) (from, to common.Address, tokenID log.Warn("not enough topics for erc721 transfer", "topics", ethlog.Topics) return } - if len(ethlog.Topics[1]) != 32 { - log.Warn("second topic is not padded to 32 byte address", "topic", ethlog.Topics[1]) + + var err error + from, to, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseErc721TransferLog", err) return } - if len(ethlog.Topics[2]) != 32 { - log.Warn("third topic is not padded to 32 byte address", "topic", ethlog.Topics[2]) - return - } - if len(ethlog.Topics[3]) != 32 { - log.Warn("fourth topic is not 32 byte tokenId", "topic", ethlog.Topics[3]) - return - } - copy(from[:], ethlog.Topics[1][12:]) - copy(to[:], ethlog.Topics[2][12:]) tokenID.SetBytes(ethlog.Topics[3][:]) return } -func ParseErc1155TransferSingleLog(ethlog *types.Log) (operator, from, to common.Address, id, amount *big.Int) { - if len(ethlog.Topics) < erc1155TransferEventIndexedParameters { - log.Warn("not enough topics for erc1155 transfer single", "topics", ethlog.Topics) - return - } +func GetLogSubTxID(log types.Log) common.Hash { + // Get unique ID by using TxHash and log index + index := [4]byte{} + binary.BigEndian.PutUint32(index[:], uint32(log.Index)) + return crypto.Keccak256Hash(log.TxHash.Bytes(), index[:]) +} - amount = new(big.Int) - id = new(big.Int) +func getLogSubTxIDWithTokenIDIndex(log types.Log, tokenIDIdx uint16) common.Hash { + // Get unique ID by using TxHash, log index and extra bytes (token id index for ERC1155 TransferBatch) + index := [4]byte{} + value := uint32(log.Index&0x0000FFFF) | (uint32(tokenIDIdx) << 16) // log index should not exceed uint16 max value + binary.BigEndian.PutUint32(index[:], value) + return crypto.Keccak256Hash(log.TxHash.Bytes(), index[:]) +} - for i := 1; i < erc1155TransferEventIndexedParameters; i++ { +func checkTopicsLength(ethlog types.Log, startIdx, endIdx int) (err error) { + for i := startIdx; i < endIdx; i++ { if len(ethlog.Topics[i]) != common.HashLength { - log.Warn(fmt.Sprintf("topic %d is not padded to %d byte address, topic=%s", i, common.HashLength, ethlog.Topics[1])) + err = fmt.Errorf("topic %d is not padded to %d byte address, topic=%s", i, common.HashLength, ethlog.Topics[i]) + log.Error("log_parser::checkTopicsLength", err) return } } - addressIdx := common.HashLength - common.AddressLength - copy(operator[:], ethlog.Topics[1][addressIdx:]) - copy(from[:], ethlog.Topics[2][addressIdx:]) - copy(to[:], ethlog.Topics[3][addressIdx:]) + return +} - if len(ethlog.Data) != common.HashLength*2 { - log.Warn("data is not padded to 64 bytes", "data", ethlog.Data) +func getFromToAddresses(ethlog types.Log) (from, to common.Address, err error) { + eventType := GetEventType(ðlog) + addressIdx := common.HashLength - common.AddressLength + switch eventType { + case Erc1155TransferSingleEventType, Erc1155TransferBatchEventType: + err = checkTopicsLength(ethlog, 2, 4) + if err != nil { + return + } + copy(from[:], ethlog.Topics[2][addressIdx:]) + copy(to[:], ethlog.Topics[3][addressIdx:]) + return + + case Erc20TransferEventType, Erc721TransferEventType, UniswapV2SwapEventType, UniswapV3SwapEventType, HopBridgeTransferFromL1CompletedEventType: + err = checkTopicsLength(ethlog, 1, 3) + if err != nil { + return + } + copy(from[:], ethlog.Topics[1][addressIdx:]) + copy(to[:], ethlog.Topics[2][addressIdx:]) return } - id.SetBytes(ethlog.Data[:common.HashLength]) - amount.SetBytes(ethlog.Data[common.HashLength:]) + return from, to, fmt.Errorf("unsupported event type to get from/to adddresses %s", eventType) +} +func ParseTransferLog(ethlog types.Log) (from, to common.Address, txIDs []common.Hash, tokenIDs, values []*big.Int, err error) { + eventType := GetEventType(ðlog) + + switch eventType { + case Erc20TransferEventType: + var amount *big.Int + from, to, amount = ParseErc20TransferLog(ðlog) + txIDs = append(txIDs, GetLogSubTxID(ethlog)) + values = append(values, amount) + return + case Erc721TransferEventType: + var tokenID *big.Int + from, to, tokenID = ParseErc721TransferLog(ðlog) + txIDs = append(txIDs, GetLogSubTxID(ethlog)) + tokenIDs = append(tokenIDs, tokenID) + values = append(values, big.NewInt(1)) + return + case Erc1155TransferSingleEventType, Erc1155TransferBatchEventType: + _, from, to, tokenIDs, values, err = ParseErc1155TransferLog(ðlog, eventType) + for i := range tokenIDs { + txIDs = append(txIDs, getLogSubTxIDWithTokenIDIndex(ethlog, uint16(i))) + } + return + } + + return from, to, txIDs, tokenIDs, values, fmt.Errorf("unsupported event type in log_parser::ParseTransferLogs %s", eventType) +} + +func ParseErc1155TransferLog(ethlog *types.Log, evType EventType) (operator, from, to common.Address, ids, amounts []*big.Int, err error) { + if len(ethlog.Topics) < erc1155TransferEventIndexedParameters { + err = fmt.Errorf("not enough topics for erc1155 transfer %s, %v", "topics", ethlog.Topics) + log.Error("log_parser::ParseErc1155TransferLog", "err", err) + return + } + + err = checkTopicsLength(*ethlog, 1, erc1155TransferEventIndexedParameters) + if err != nil { + return + } + + addressIdx := common.HashLength - common.AddressLength + copy(operator[:], ethlog.Topics[1][addressIdx:]) + from, to, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseErc1155TransferLog", "err", err) + return + } + + if len(ethlog.Data) == 0 || len(ethlog.Data)%(common.HashLength*2) != 0 { + err = fmt.Errorf("data is not padded to 64 bytes %s, %v", "data", ethlog.Data) + log.Error("log_parser::ParseErc1155TransferLog", "err", err) + return + } + + if evType == Erc1155TransferSingleEventType { + ids = append(ids, new(big.Int).SetBytes(ethlog.Data[:common.HashLength])) + amounts = append(amounts, new(big.Int).SetBytes(ethlog.Data[common.HashLength:])) + log.Debug("log_parser::ParseErc1155TransferSingleLog", "ids", ids, "amounts", amounts) + } else { + // idTypeSize := new(big.Int).SetBytes(ethlog.Data[:common.HashLength]).Uint64() // Left for knowledge + // valueTypeSize := new(big.Int).SetBytes(ethlog.Data[common.HashLength : common.HashLength*2]).Uint64() // Left for knowledge + idsArraySize := new(big.Int).SetBytes(ethlog.Data[common.HashLength*2 : common.HashLength*2+common.HashLength]).Uint64() + + initialOffset := common.HashLength*2 + common.HashLength + for i := 0; i < int(idsArraySize); i++ { + ids = append(ids, new(big.Int).SetBytes(ethlog.Data[initialOffset+i*common.HashLength:initialOffset+(i+1)*common.HashLength])) + } + valuesArraySize := new(big.Int).SetBytes(ethlog.Data[initialOffset+int(idsArraySize)*common.HashLength : initialOffset+int(idsArraySize+1)*common.HashLength]).Uint64() + + if idsArraySize != valuesArraySize { + err = fmt.Errorf("ids and values sizes don't match %d, %d", idsArraySize, valuesArraySize) + log.Error("log_parser::ParseErc1155TransferBatchLog", "err", err) + return + } + + initialOffset = initialOffset + int(idsArraySize+1)*common.HashLength + for i := 0; i < int(valuesArraySize); i++ { + amounts = append(amounts, new(big.Int).SetBytes(ethlog.Data[initialOffset+i*common.HashLength:initialOffset+(i+1)*common.HashLength])) + log.Debug("log_parser::ParseErc1155TransferBatchLog", "id", ids[i], "amount", amounts[i]) + } + } return } @@ -313,17 +406,11 @@ func ParseUniswapV2Log(ethlog *types.Log) (pairAddress common.Address, from comm } pairAddress = ethlog.Address - - if len(ethlog.Topics[1]) != 32 { - err = fmt.Errorf("second topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[1]) + from, to, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseUniswapV2Log", err) return } - if len(ethlog.Topics[2]) != 32 { - err = fmt.Errorf("third topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[2]) - return - } - copy(from[:], ethlog.Topics[1][12:]) - copy(to[:], ethlog.Topics[2][12:]) if len(ethlog.Data) != 32*4 { err = fmt.Errorf("data is not padded to 4 * 32 bytes big int %s, %v", "data", ethlog.Data) return @@ -359,17 +446,11 @@ func ParseUniswapV3Log(ethlog *types.Log) (poolAddress common.Address, sender co } poolAddress = ethlog.Address - - if len(ethlog.Topics[1]) != 32 { - err = fmt.Errorf("second topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[1]) + sender, recipient, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseUniswapV3Log", err) return } - if len(ethlog.Topics[2]) != 32 { - err = fmt.Errorf("third topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[2]) - return - } - copy(sender[:], ethlog.Topics[1][12:]) - copy(recipient[:], ethlog.Topics[2][12:]) if len(ethlog.Data) != 32*5 { err = fmt.Errorf("data is not padded to 5 * 32 bytes big int %s, %v", "data", ethlog.Data) return @@ -426,17 +507,11 @@ func ParseHopBridgeTransferFromL1CompletedLog(ethlog *types.Log) (recipient comm return } - if len(ethlog.Topics[1]) != 32 { - err = fmt.Errorf("second topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[1]) + recipient, relayer, err = getFromToAddresses(*ethlog) + if err != nil { + log.Error("log_parser::ParseHopBridgeTransferFromL1CompletedLog", err) return } - copy(recipient[:], ethlog.Topics[1][12:]) - - if len(ethlog.Topics[2]) != 32 { - err = fmt.Errorf("third topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[2]) - return - } - copy(relayer[:], ethlog.Topics[2][12:]) if len(ethlog.Data) != 32*4 { err = fmt.Errorf("data is not padded to 4 * 32 bytes big int %s, %v", "data", ethlog.Data) @@ -526,7 +601,7 @@ func GetEventSignatureHash(signature string) common.Hash { return crypto.Keccak256Hash([]byte(signature)) } -func ExtractTokenIdentity(dbEntryType Type, log *types.Log, tx *types.Transaction) (correctType Type, tokenAddress *common.Address, txTokenID *big.Int, txValue *big.Int, txFrom *common.Address, txTo *common.Address) { +func ExtractTokenTransferData(dbEntryType Type, log *types.Log, tx *types.Transaction) (correctType Type, tokenAddress *common.Address, txFrom *common.Address, txTo *common.Address) { // erc721 transfers share signature with erc20 ones, so they both used to be categorized as erc20 // by the Downloader. We fix this here since they might be mis-categorized in the db. if dbEntryType == Erc20Transfer { @@ -537,32 +612,27 @@ func ExtractTokenIdentity(dbEntryType Type, log *types.Log, tx *types.Transactio } switch correctType { - case EthTransfer: - if tx != nil { - txValue = new(big.Int).Set(tx.Value()) - } case Erc20Transfer: tokenAddress = new(common.Address) *tokenAddress = log.Address - from, to, value := ParseErc20TransferLog(log) - txValue = value + from, to, _ := ParseErc20TransferLog(log) txFrom = &from txTo = &to case Erc721Transfer: tokenAddress = new(common.Address) *tokenAddress = log.Address - from, to, tokenID := ParseErc721TransferLog(log) - txTokenID = tokenID + from, to, _ := ParseErc721TransferLog(log) txFrom = &from txTo = &to - case Erc1155SingleTransfer: + case Erc1155Transfer: tokenAddress = new(common.Address) *tokenAddress = log.Address - _, from, to, tokenID, value := ParseErc1155TransferSingleLog(log) - txTokenID = tokenID + _, from, to, _, _, err := ParseErc1155TransferLog(log, Erc1155TransferSingleEventType) // from/to extraction is the same for single and batch + if err != nil { + return + } txFrom = &from txTo = &to - txValue = value } return diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index 8256c099e..c29b31366 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -116,10 +116,9 @@ type erc20HistoricalCommand struct { chainClient chain.ClientInterface feed *event.Feed - iterator *IterativeDownloader - to *big.Int - from *big.Int - + iterator *IterativeDownloader + to *big.Int + from *big.Int foundHeaders []*DBHeader } @@ -203,7 +202,8 @@ func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) error { } func (c *controlCommand) Run(parent context.Context) error { - log.Info("start control command") + log.Debug("start control command") + ctx, cancel := context.WithTimeout(parent, 3*time.Second) head, err := c.chainClient.HeaderByNumber(ctx, nil) cancel() @@ -221,7 +221,7 @@ func (c *controlCommand) Run(parent context.Context) error { }) } - log.Info("current head is", "block number", head.Number) + log.Debug("current head is", "block number", head.Number) // Get last known block for each account lastKnownEthBlocks, accountsWithoutHistory, err := c.blockDAO.GetLastKnownBlockByAddresses(c.chainClient.NetworkID(), c.accounts) @@ -326,7 +326,7 @@ func (c *controlCommand) Run(parent context.Context) error { BlockNumber: target, }) } - log.Info("end control command") + log.Debug("end control command") return err } @@ -390,7 +390,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { // Take blocks from cache if available and disrespect the limit // If no blocks are available in cache, take blocks from DB respecting the limit // If no limit is set, take all blocks from DB - log.Info("start transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address, "blockNums", c.blockNums) + log.Debug("start transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address, "blockNums", c.blockNums) startTs := time.Now() for { @@ -450,7 +450,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) { } } - log.Info("end transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address, + log.Debug("end transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address, "blocks.len", len(c.blockNums), "transfers.len", len(c.fetchedTransfers), "in", time.Since(startTs)) return nil @@ -771,7 +771,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher b fromByAddress map[common.Address]*Block, toByAddress map[common.Address]*big.Int) (map[common.Address]*big.Int, map[common.Address][]*DBHeader, error) { - log.Info("fast indexer started") + log.Debug("fast indexer started") start := time.Now() group := async.NewGroup(ctx) @@ -804,7 +804,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher b resultingFromByAddress[command.address] = command.resultingFrom headers[command.address] = command.foundHeaders } - log.Info("fast indexer finished", "in", time.Since(start)) + log.Debug("fast indexer finished", "in", time.Since(start)) return resultingFromByAddress, headers, nil } } @@ -812,7 +812,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher b // run fast indexing for every accont up to canonical chain head minus safety depth. // every account will run it from last synced header. func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, fromByAddress map[common.Address]*big.Int, toByAddress map[common.Address]*big.Int) (map[common.Address][]*DBHeader, error) { - log.Info("fast indexer Erc20 started") + log.Debug("fast indexer Erc20 started") start := time.Now() group := async.NewGroup(ctx) @@ -839,7 +839,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from for _, command := range commands { headers[command.address] = command.foundHeaders } - log.Info("fast indexer Erc20 finished", "in", time.Since(start)) + log.Debug("fast indexer Erc20 finished", "in", time.Since(start)) return headers, nil } } @@ -849,7 +849,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *Blo transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, feed *event.Feed) error { - log.Info("loadTransfers start", "accounts", accounts, "chain", chainClient.NetworkID(), "limit", blocksLimitPerAccount) + log.Debug("loadTransfers start", "accounts", accounts, "chain", chainClient.NetworkID(), "limit", blocksLimitPerAccount) start := time.Now() group := async.NewGroup(ctx) @@ -879,7 +879,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *Blo case <-ctx.Done(): return ctx.Err() case <-group.WaitAsync(): - log.Info("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.NetworkID()) + log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.NetworkID()) return nil } } @@ -899,7 +899,7 @@ func getLowestFrom(chainID uint64, to *big.Int) *big.Int { // Finds the latest range up to initialTo where the number of transactions is between 20 and 25 func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client chain.ClientInterface) (*big.Int, error) { - log.Info("findFirstRange", "account", account, "initialTo", initialTo, "client", client) + log.Debug("findFirstRange", "account", account, "initialTo", initialTo, "client", client) from := getLowestFrom(client.NetworkID(), initialTo) to := initialTo @@ -910,7 +910,7 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In } firstNonce, err := client.NonceAt(c, account, to) // this is the latest nonce actually - log.Info("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to) + log.Debug("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to) if err != nil { return nil, err @@ -943,15 +943,15 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In } nonceDiff = firstNonce - fromNonce - log.Info("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce) + log.Debug("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce) if goal <= nonceDiff && nonceDiff <= (goal+5) { - log.Info("range found", "account", account, "from", from, "to", to) + log.Debug("range found", "account", account, "from", from, "to", to) return from, nil } } - log.Info("range found", "account", account, "from", from, "to", to) + log.Debug("range found", "account", account, "from", from, "to", to) return from, nil } diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index a335348f3..51d0e0b8b 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -32,7 +32,7 @@ func (c *findNewBlocksCommand) Command() async.Command { } func (c *findNewBlocksCommand) Run(parent context.Context) (err error) { - log.Debug("start findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit) + log.Debug("start findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber) headNum, err := getHeadBlockNumber(parent, c.chainClient) if err != nil { @@ -47,6 +47,7 @@ func (c *findNewBlocksCommand) Run(parent context.Context) (err error) { return err // Will keep spinning forever nomatter what } + // In case no block range is in DB, skip until history blocks are fetched if blockRange != nil { c.fromBlockNumber = blockRange.LastKnown @@ -64,6 +65,8 @@ func (c *findNewBlocksCommand) Run(parent context.Context) (err error) { _ = c.findBlocksCommand.Run(parent) } + log.Debug("end findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber) + return nil } @@ -436,7 +439,7 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cache select { case <-ctx.Done(): err = ctx.Err() - log.Info("fast indexer ctx Done", "error", err) + log.Debug("fast indexer ctx Done", "error", err) return case <-group.WaitAsync(): if command.error != nil { @@ -491,7 +494,7 @@ func loadTransfersLoop(ctx context.Context, account common.Address, blockDAO *Bl for { select { case <-ctx.Done(): - log.Info("loadTransfersLoop error", "chain", chainClient.NetworkID(), "account", account, "error", ctx.Err()) + log.Info("loadTransfersLoop done", "chain", chainClient.NetworkID(), "account", account, "error", ctx.Err()) return case dbHeaders := <-blocksLoadedCh: log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.NetworkID(), "account", account, "headers", len(dbHeaders)) @@ -523,7 +526,6 @@ func newLoadBlocksAndTransfersCommand(account common.Address, db *Database, chainClient: chainClient, feed: feed, balanceCacher: balanceCacher, - errorsCount: 0, transactionManager: transactionManager, pendingTxManager: pendingTxManager, tokenManager: tokenManager, @@ -540,7 +542,6 @@ type loadBlocksAndTransfersCommand struct { chainClient chain.ClientInterface feed *event.Feed balanceCacher balance.Cacher - errorsCount int // nonArchivalRPCNode bool // TODO Make use of it transactionManager *TransactionManager pendingTxManager *transactions.PendingTxTracker @@ -556,16 +557,28 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { log.Debug("start load all transfers command", "chain", c.chainClient.NetworkID(), "account", c.account) ctx := parent + + // This wait group is used to wait for all the async commands to finish + // but fetchNewBlocksCommand, which is infinite, never finishes, can only be stopped + // by canceling the context which does not happen here, as we don't call group.Stop(). group := async.NewGroup(ctx) + // It will start loadTransfersCommand which will run until success when all transfers from DB are loaded err := c.fetchTransfersForLoadedBlocks(group) for err != nil { return err } + // Start transfers loop to load transfers for new blocks c.startTransfersLoop(ctx) - err = c.fetchHistoryBlocks(parent, group, c.blocksLoadedCh) + fromNum := big.NewInt(0) + toNum, err := getHeadBlockNumber(ctx, c.chainClient) + if err != nil { + return err + } + // This will start findBlocksCommand which will run until success when all blocks are loaded + err = c.fetchHistoryBlocks(parent, group, fromNum, toNum, c.blocksLoadedCh) for err != nil { group.Stop() group.Wait() @@ -595,20 +608,13 @@ func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) c.pendingTxManager, c.tokenManager, c.feed, c.blocksLoadedCh) } -func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, blocksLoadedCh chan []*DBHeader) error { +func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) error { log.Debug("fetchHistoryBlocks start", "chainID", c.chainClient.NetworkID(), "account", c.account, "omit", c.omitHistory) - headNum, err := getHeadBlockNumber(ctx, c.chainClient) - if err != nil { - // c.error = err - return err // Might need to retry a couple of times - } - if c.omitHistory { - blockRange := &BlockRange{nil, big.NewInt(0), headNum} + blockRange := &BlockRange{nil, big.NewInt(0), toNum} err := c.blockRangeDAO.upsertRange(c.chainClient.NetworkID(), c.account, blockRange) - log.Debug("fetchHistoryBlocks omit history", "chainID", c.chainClient.NetworkID(), "account", c.account, "headNum", headNum, "err", err) return err } @@ -619,13 +625,11 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, return err // Will keep spinning forever nomatter what } - /// first allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange) - to := getToHistoryBlockNumber(headNum, blockRange, allHistoryLoaded) - - log.Debug("fetchHistoryBlocks", "chainID", c.chainClient.NetworkID(), "account", c.account, "to", to, "allHistoryLoaded", allHistoryLoaded) if !allHistoryLoaded { + to := getToHistoryBlockNumber(toNum, blockRange, allHistoryLoaded) + fbc := &findBlocksCommand{ account: c.account, db: c.db, @@ -634,7 +638,7 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, balanceCacher: c.balanceCacher, feed: c.feed, noLimit: false, - fromBlockNumber: big.NewInt(0), + fromBlockNumber: fromNum, toBlockNumber: to, transactionManager: c.transactionManager, tokenManager: c.tokenManager, @@ -693,6 +697,11 @@ func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *asy return err } + if len(blocks) == 0 { + log.Debug("fetchTransfers no blocks to load", "chainID", c.chainClient.NetworkID(), "account", c.account) + return nil + } + blocksMap := make(map[common.Address][]*big.Int) blocksMap[c.account] = blocks diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 270d3f1c2..f878e9458 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -28,7 +28,6 @@ import ( "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/balance" - w_common "github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/params" @@ -136,8 +135,17 @@ func (tc *TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([ // We do not verify addresses for now allTransfers := []testERC20Transfer{} signatures := q.Topics[0] - erc20TransferSignature := w_common.GetEventSignatureHash(w_common.Erc20_721TransferEventSignature) - erc1155TransferSingleSignature := w_common.GetEventSignatureHash(w_common.Erc1155TransferSingleEventSignature) + erc20TransferSignature := walletcommon.GetEventSignatureHash(walletcommon.Erc20_721TransferEventSignature) + erc1155TransferSingleSignature := walletcommon.GetEventSignatureHash(walletcommon.Erc1155TransferSingleEventSignature) + + var address common.Hash + for i := 1; i < len(q.Topics); i++ { + if len(q.Topics[i]) > 0 { + address = q.Topics[i][0] + break + } + } + if slices.Contains(signatures, erc1155TransferSingleSignature) { from := q.Topics[2] var to []common.Hash @@ -167,10 +175,24 @@ func (tc *TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([ logs := []types.Log{} for _, transfer := range allTransfers { if transfer.block.Cmp(q.FromBlock) >= 0 && transfer.block.Cmp(q.ToBlock) <= 0 { - logs = append(logs, types.Log{ + log := types.Log{ BlockNumber: transfer.block.Uint64(), BlockHash: common.BigToHash(transfer.block), - }) + } + + // Use the address at least in one any(from/to) topic to trick the implementation + switch transfer.eventType { + case walletcommon.Erc20TransferEventType, walletcommon.Erc721TransferEventType: + // To detect properly ERC721, we need a different number of topics. For now we use only ERC20 for testing + log.Topics = []common.Hash{walletcommon.GetEventSignatureHash(walletcommon.Erc20_721TransferEventSignature), address, address} + case walletcommon.Erc1155TransferSingleEventType: + log.Topics = []common.Hash{walletcommon.GetEventSignatureHash(walletcommon.Erc1155TransferSingleEventSignature), address, address, address} + log.Data = make([]byte, 2*common.HashLength) + case walletcommon.Erc1155TransferBatchEventType: + log.Topics = []common.Hash{walletcommon.GetEventSignatureHash(walletcommon.Erc1155TransferBatchEventSignature), address, address, address} + } + + logs = append(logs, log) } } @@ -365,19 +387,23 @@ func (tc *TestClient) prepareTokenBalanceHistory(toBlock int) { transfersPerToken := map[common.Address][]testERC20Transfer{} for _, transfer := range tc.outgoingERC20Transfers { transfer.amount = new(big.Int).Neg(transfer.amount) + transfer.eventType = walletcommon.Erc20TransferEventType transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer) } for _, transfer := range tc.incomingERC20Transfers { + transfer.eventType = walletcommon.Erc20TransferEventType transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer) } for _, transfer := range tc.outgoingERC1155SingleTransfers { transfer.amount = new(big.Int).Neg(transfer.amount) + transfer.eventType = walletcommon.Erc1155TransferSingleEventType transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer) } for _, transfer := range tc.incomingERC1155SingleTransfers { + transfer.eventType = walletcommon.Erc1155TransferSingleEventType transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer) } @@ -392,7 +418,7 @@ func (tc *TestClient) prepareTokenBalanceHistory(toBlock int) { currentBalance := big.NewInt(0) tc.tokenBalanceHistory[token] = map[uint64]*big.Int{} - transfers = append(transfers, testERC20Transfer{big.NewInt(int64(toBlock + 1)), token, big.NewInt(0)}) + transfers = append(transfers, testERC20Transfer{big.NewInt(int64(toBlock + 1)), token, big.NewInt(0), walletcommon.Erc20TransferEventType}) for _, transfer := range transfers { for blockN := currentBlock; blockN < transfer.block.Uint64(); blockN++ { @@ -573,9 +599,10 @@ func (tc *TestClient) GetIsConnected() bool { } type testERC20Transfer struct { - block *big.Int - address common.Address - amount *big.Int + block *big.Int + address common.Address + amount *big.Int + eventType walletcommon.EventType } type findBlockCase struct { @@ -614,7 +641,7 @@ func getCases() []findBlockCase { {75, 0, 1}, }, outgoingERC20Transfers: []testERC20Transfer{ - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, toBlock: 100, expectedBlocksFound: 6, @@ -682,7 +709,7 @@ func getCases() []findBlockCase { rangeSize: 20, expectedBlocksFound: 1, incomingERC20Transfers: []testERC20Transfer{ - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, } @@ -694,8 +721,8 @@ func getCases() []findBlockCase { incomingERC20Transfers: []testERC20Transfer{ // edge case when a regular scan will find transfer at 80, // but erc20 tail scan should only find transfer at block 6 - {big.NewInt(80), tokenTXXAddress, big.NewInt(1)}, - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(80), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 5, @@ -711,8 +738,8 @@ func getCases() []findBlockCase { // thus only 2 blocks found expectedBlocksFound: 2, incomingERC20Transfers: []testERC20Transfer{ - {big.NewInt(7), tokenTXYAddress, big.NewInt(1)}, - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(7), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 5, @@ -741,8 +768,8 @@ func getCases() []findBlockCase { // thus only 2 blocks found expectedBlocksFound: 2, incomingERC1155SingleTransfers: []testERC20Transfer{ - {big.NewInt(7), tokenTXYAddress, big.NewInt(1)}, - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(7), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 5, @@ -758,8 +785,8 @@ func getCases() []findBlockCase { rangeSize: 20, expectedBlocksFound: 3, outgoingERC1155SingleTransfers: []testERC20Transfer{ - {big.NewInt(80), tokenTXYAddress, big.NewInt(1)}, - {big.NewInt(6), tokenTXXAddress, big.NewInt(1)}, + {big.NewInt(80), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, + {big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 15, // 3 for each range @@ -774,10 +801,10 @@ func getCases() []findBlockCase { rangeSize: 20, expectedBlocksFound: 3, outgoingERC1155SingleTransfers: []testERC20Transfer{ - {big.NewInt(80), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(80), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, }, outgoingERC20Transfers: []testERC20Transfer{ - {big.NewInt(63), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(63), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 6, // 3 for each range, 0 for tail check becauseERC20ScanByBalance returns no ranges @@ -792,10 +819,10 @@ func getCases() []findBlockCase { rangeSize: 20, expectedBlocksFound: 3, outgoingERC1155SingleTransfers: []testERC20Transfer{ - {big.NewInt(80), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(80), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, }, outgoingERC20Transfers: []testERC20Transfer{ - {big.NewInt(61), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(61), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 9, // 3 for each range of [40-100], 0 for tail check because ERC20ScanByBalance returns no ranges @@ -811,10 +838,10 @@ func getCases() []findBlockCase { rangeSize: 20, expectedBlocksFound: 2, outgoingERC1155SingleTransfers: []testERC20Transfer{ - {big.NewInt(85), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(85), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType}, }, incomingERC20Transfers: []testERC20Transfer{ - {big.NewInt(88), tokenTXYAddress, big.NewInt(1)}, + {big.NewInt(88), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}, }, expectedCalls: map[string]int{ "FilterLogs": 3, // 3 for each range of [40-100], 0 for tail check because ERC20ScanByBalance returns no ranges @@ -827,7 +854,7 @@ func getCases() []findBlockCase { {75, 0, 1}, }, outgoingERC20Transfers: []testERC20Transfer{ - {big.NewInt(80), tokenTXXAddress, big.NewInt(4)}, + {big.NewInt(80), tokenTXXAddress, big.NewInt(4), walletcommon.Erc20TransferEventType}, }, toBlock: 100, rangeSize: 20, @@ -885,8 +912,8 @@ func TestFindBlocksCommand(t *testing.T) { incomingERC1155SingleTransfers: testCase.incomingERC1155SingleTransfers, callsCounter: map[string]int{}, } - //tc.traceAPICalls = true - //tc.printPreparedData = true + // tc.traceAPICalls = true + // tc.printPreparedData = true tc.prepareBalanceHistory(100) tc.prepareTokenBalanceHistory(100) blockChannel := make(chan []*DBHeader, 100) @@ -1046,7 +1073,6 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) { chainClient: tc, feed: &event.Feed{}, balanceCacher: balance.NewCacherWithTTL(5 * time.Minute), - errorsCount: 0, transactionManager: tm, pendingTxManager: tracker, tokenManager: tokenManager, @@ -1060,7 +1086,11 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) { ctx := context.Background() group := async.NewGroup(ctx) - err = cmd.fetchHistoryBlocks(ctx, group, blockChannel) + + fromNum := big.NewInt(0) + toNum, err := getHeadBlockNumber(ctx, cmd.chainClient) + require.NoError(t, err) + err = cmd.fetchHistoryBlocks(ctx, group, fromNum, toNum, blockChannel) require.NoError(t, err) select { diff --git a/services/wallet/transfer/database.go b/services/wallet/transfer/database.go index 5090ed14c..3484627b5 100644 --- a/services/wallet/transfer/database.go +++ b/services/wallet/transfer/database.go @@ -244,7 +244,7 @@ func (db *Database) GetTransfersForIdentities(ctx context.Context, identities [] query := newTransfersQuery() for _, identity := range identities { subQuery := newSubQuery() - subQuery = subQuery.FilterNetwork(uint64(identity.ChainID)).FilterTransactionHash(identity.Hash).FilterAddress(identity.Address) + subQuery = subQuery.FilterNetwork(uint64(identity.ChainID)).FilterTransactionID(identity.Hash).FilterAddress(identity.Address) query.addSubQuery(subQuery, OrSeparator) } rows, err := db.client.QueryContext(ctx, query.String(), query.Args()...) @@ -255,8 +255,8 @@ func (db *Database) GetTransfersForIdentities(ctx context.Context, identities [] return query.TransferScan(rows) } -func (db *Database) GetTransactionsToLoad(chainID uint64, address common.Address, blockNumber *big.Int) (rst []PreloadedTransaction, err error) { - query := newTransfersQuery(). +func (db *Database) GetTransactionsToLoad(chainID uint64, address common.Address, blockNumber *big.Int) (rst []*PreloadedTransaction, err error) { + query := newTransfersQueryForPreloadedTransactions(). FilterNetwork(chainID). FilterAddress(address). FilterBlockNumber(blockNumber). @@ -354,8 +354,8 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco } insertTx, err := creator.Prepare(`INSERT OR IGNORE - INTO transfers (network_id, address, sender, hash, blk_number, blk_hash, type, timestamp, log, loaded, log_index) - VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, 0, ?)`) + INTO transfers (network_id, address, sender, hash, blk_number, blk_hash, type, timestamp, log, loaded, log_index, token_id, amount_padded128hex) + VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, 0, ?, ?, ?)`) if err != nil { return err } @@ -383,9 +383,11 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco continue } - _, err = insertTx.Exec(chainID, account, account, transaction.ID, (*bigint.SQLBigInt)(header.Number), header.Hash, w_common.Erc20Transfer, &JSONBlob{transaction.Log}, logIndex) + tokenID := (*bigint.SQLBigIntBytes)(transaction.TokenID) + txValue := sqlite.BigIntToPadded128BitsStr(transaction.Value) + _, err = insertTx.Exec(chainID, account, account, transaction.ID, (*bigint.SQLBigInt)(header.Number), header.Hash, transaction.Type, &JSONBlob{transaction.Log}, logIndex, tokenID, txValue) if err != nil { - log.Error("error saving Erc20transfer", "err", err) + log.Error("error saving token transfer", "err", err) return err } } @@ -428,7 +430,14 @@ func updateOrInsertTransfers(chainID uint64, creator statementCreator, transfers var txTo *common.Address if t.Transaction != nil { if t.Log != nil { - _, tokenAddress, tokenID, txValue, txFrom, txTo = w_common.ExtractTokenIdentity(t.Type, t.Log, t.Transaction) + _, tokenAddress, txFrom, txTo = w_common.ExtractTokenTransferData(t.Type, t.Log, t.Transaction) + tokenID = t.TokenID + // Zero tokenID can be used for ERC721 and ERC1155 transfers but when serialzed/deserialized it becomes nil + // as 0 value of big.Int bytes is nil. + if tokenID == nil && (t.Type == w_common.Erc721Transfer || t.Type == w_common.Erc1155Transfer) { + tokenID = big.NewInt(0) + } + txValue = t.TokenValue } else { txValue = new(big.Int).Set(t.Transaction.Value()) txFrom = &t.From diff --git a/services/wallet/transfer/database_test.go b/services/wallet/transfer/database_test.go index 365cca8a0..bc904ffa7 100644 --- a/services/wallet/transfer/database_test.go +++ b/services/wallet/transfer/database_test.go @@ -121,7 +121,7 @@ func TestDBReorgTransfers(t *testing.T) { } require.NoError(t, db.ProcessBlocks(777, original.Address, original.Number, lastBlock, []*DBHeader{original})) require.NoError(t, db.ProcessTransfers(777, []Transfer{ - {w_common.EthTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, true, 1777, common.Address{1}, rcpt, nil, "2100", NoMultiTransactionID}, + {w_common.EthTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, true, 1777, common.Address{1}, rcpt, nil, nil, nil, "2100", NoMultiTransactionID}, }, []*DBHeader{})) nonce = int64(0) lastBlock = &Block{ @@ -131,7 +131,7 @@ func TestDBReorgTransfers(t *testing.T) { } require.NoError(t, db.ProcessBlocks(777, replaced.Address, replaced.Number, lastBlock, []*DBHeader{replaced})) require.NoError(t, db.ProcessTransfers(777, []Transfer{ - {w_common.EthTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, true, 1777, common.Address{1}, rcpt, nil, "2100", NoMultiTransactionID}, + {w_common.EthTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, true, 1777, common.Address{1}, rcpt, nil, nil, nil, "2100", NoMultiTransactionID}, }, []*DBHeader{original})) all, err := db.GetTransfers(777, big.NewInt(0), nil) diff --git a/services/wallet/transfer/downloader.go b/services/wallet/transfer/downloader.go index bf55da7e3..a1dc9136d 100644 --- a/services/wallet/transfer/downloader.go +++ b/services/wallet/transfer/downloader.go @@ -2,7 +2,6 @@ package transfer import ( "context" - "encoding/binary" "errors" "math/big" "time" @@ -11,20 +10,12 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" - "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/rpc/chain" w_common "github.com/status-im/status-go/services/wallet/common" ) -func getLogSubTxID(log types.Log) common.Hash { - // Get unique ID by using TxHash and log index - index := [4]byte{} - binary.BigEndian.PutUint32(index[:], uint32(log.Index)) - return crypto.Keccak256Hash(log.TxHash.Bytes(), index[:]) -} - var ( zero = big.NewInt(0) one = big.NewInt(1) @@ -32,26 +23,23 @@ var ( ) // Partial transaction info obtained by ERC20Downloader. -// A PreloadedTransaction represents a Transaction which contains one or more -// ERC20/ERC721 transfer events. -// To be converted into one or many Transfer objects post-indexing. +// A PreloadedTransaction represents a Transaction which contains one +// ERC20/ERC721/ERC1155 transfer event. +// To be converted into one Transfer object post-indexing. type PreloadedTransaction struct { - NetworkID uint64 - Type w_common.Type `json:"type"` - ID common.Hash `json:"-"` - Address common.Address `json:"address"` - BlockNumber *big.Int `json:"blockNumber"` - BlockHash common.Hash `json:"blockhash"` - Loaded bool - // From is derived from tx signature in order to offload this computation from UI component. - From common.Address `json:"from"` + Type w_common.Type `json:"type"` + ID common.Hash `json:"-"` + Address common.Address `json:"address"` // Log that was used to generate preloaded transaction. - Log *types.Log `json:"log"` - BaseGasFees string + Log *types.Log `json:"log"` + TokenID *big.Int `json:"tokenId"` + Value *big.Int `json:"value"` } // Transfer stores information about transfer. // A Transfer represents a plain ETH transfer or some token activity inside a Transaction +// Since ERC1155 transfers can contain multiple tokens, a single Transfer represents a single token transfer, +// that means ERC1155 batch transfers will be represented by multiple Transfer objects. type Transfer struct { Type w_common.Type `json:"type"` ID common.Hash `json:"-"` @@ -66,13 +54,17 @@ type Transfer struct { From common.Address `json:"from"` Receipt *types.Receipt `json:"receipt"` // Log that was used to generate erc20 transfer. Nil for eth transfer. - Log *types.Log `json:"log"` + Log *types.Log `json:"log"` + // TokenID is the id of the transferred token. Nil for eth transfer. + TokenID *big.Int `json:"tokenId"` + // TokenValue is the value of the token transfer. Nil for eth transfer. + TokenValue *big.Int `json:"tokenValue"` BaseGasFees string // Internal field that is used to track multi-transaction transfers. MultiTransactionID MultiTransactionIDType `json:"multi_transaction_id"` } -// ETHDownloader downloads regular eth transfers. +// ETHDownloader downloads regular eth transfers and tokens transfers. type ETHDownloader struct { chainClient chain.ClientInterface accounts []common.Address @@ -137,12 +129,70 @@ func getTransferByHash(ctx context.Context, client chain.ClientInterface, signer return transfer, nil } -func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) (rst []Transfer, err error) { +func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) ([]Transfer, error) { startTs := time.Now() + rst := make([]Transfer, 0, len(blk.Transactions())) + + receiptsByAddressAndTxHash := make(map[common.Address]map[common.Hash]*types.Receipt) + txsByAddressAndTxHash := make(map[common.Address]map[common.Hash]*types.Transaction) + + addReceiptToCache := func(address common.Address, txHash common.Hash, receipt *types.Receipt) { + if receiptsByAddressAndTxHash[address] == nil { + receiptsByAddressAndTxHash[address] = make(map[common.Hash]*types.Receipt) + } + receiptsByAddressAndTxHash[address][txHash] = receipt + } + + addTxToCache := func(address common.Address, txHash common.Hash, tx *types.Transaction) { + if txsByAddressAndTxHash[address] == nil { + txsByAddressAndTxHash[address] = make(map[common.Hash]*types.Transaction) + } + txsByAddressAndTxHash[address][txHash] = tx + } + + getReceiptFromCache := func(address common.Address, txHash common.Hash) *types.Receipt { + if receiptsByAddressAndTxHash[address] == nil { + return nil + } + return receiptsByAddressAndTxHash[address][txHash] + } + + getTxFromCache := func(address common.Address, txHash common.Hash) *types.Transaction { + if txsByAddressAndTxHash[address] == nil { + return nil + } + return txsByAddressAndTxHash[address][txHash] + } + + getReceipt := func(address common.Address, txHash common.Hash) (receipt *types.Receipt, err error) { + receipt = getReceiptFromCache(address, txHash) + if receipt == nil { + receipt, err = d.fetchTransactionReceipt(ctx, txHash) + if err != nil { + return nil, err + } + addReceiptToCache(address, txHash, receipt) + } + return receipt, nil + } + + getTx := func(address common.Address, txHash common.Hash) (tx *types.Transaction, err error) { + tx = getTxFromCache(address, txHash) + if tx == nil { + tx, err = d.fetchTransaction(ctx, txHash) + if err != nil { + return nil, err + } + addTxToCache(address, txHash, tx) + } + return tx, nil + } + for _, address := range accounts { - // During block discovery, we should have populated the DB with 1 item per Transaction containing - // erc20/erc721 transfers + // During block discovery, we should have populated the DB with 1 item per transfer log containing + // erc20/erc721/erc1155 transfers. + // ID is a hash of the tx hash and the log index. log_index is unique per ERC20/721 tx, but not per ERC1155 tx. transactionsToLoad, err := d.db.GetTransactionsToLoad(d.chainClient.NetworkID(), address, blk.Number()) if err != nil { return nil, err @@ -150,10 +200,22 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc areSubTxsCheckedForTxHash := make(map[common.Hash]bool) + log.Debug("getTransfersInBlock", "block", blk.Number(), "transactionsToLoad", len(transactionsToLoad)) + for _, t := range transactionsToLoad { - subtransactions, err := d.subTransactionsFromTransactionHash(ctx, t.Log.TxHash, address) + receipt, err := getReceipt(address, t.Log.TxHash) if err != nil { - log.Error("can't fetch subTxs for erc20/erc721 transfer", "error", err) + return nil, err + } + + tx, err := getTx(address, t.Log.TxHash) + if err != nil { + return nil, err + } + + subtransactions, err := d.subTransactionsFromPreloaded(t, tx, receipt, blk) + if err != nil { + log.Error("can't fetch subTxs for erc20/erc721/erc1155 transfer", "error", err) return nil, err } rst = append(rst, subtransactions...) @@ -188,12 +250,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc } if isPlainTransfer || mustCheckSubTxs { - receipt, err := d.chainClient.TransactionReceipt(ctx, tx.Hash()) - if err != nil { - return nil, err - } - - baseGasFee, err := d.chainClient.GetBaseFeeFromBlock(blk.Number()) + receipt, err := getReceipt(address, tx.Hash()) if err != nil { return nil, err } @@ -201,7 +258,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc // Since we've already got the receipt, check for subTxs of // interest in case we haven't already. if !areSubTxsCheckedForTxHash[tx.Hash()] { - subtransactions, err := d.subTransactionsFromTransactionData(tx, receipt, blk, baseGasFee, address) + subtransactions, err := d.subTransactionsFromTransactionData(address, from, tx, receipt, blk) if err != nil { log.Error("can't fetch subTxs for eth transfer", "error", err) return nil, err @@ -224,7 +281,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc From: from, Receipt: receipt, Log: nil, - BaseGasFees: baseGasFee, + BaseGasFees: blk.BaseFee().String(), MultiTransactionID: NoMultiTransactionID}) } } @@ -284,44 +341,40 @@ func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]co } func (d *ERC20TransfersDownloader) inboundERC20OutboundERC1155Topics(address common.Address) [][]common.Hash { - return [][]common.Hash{{d.signature, d.signatureErc1155Single}, {}, {d.paddedAddress(address)}} + return [][]common.Hash{{d.signature, d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {d.paddedAddress(address)}} } -func (d *ERC20TransfersDownloader) inboundTopicsERC1155Single(address common.Address) [][]common.Hash { - return [][]common.Hash{{d.signatureErc1155Single}, {}, {}, {d.paddedAddress(address)}} +func (d *ERC20TransfersDownloader) inboundTopicsERC1155(address common.Address) [][]common.Hash { + return [][]common.Hash{{d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {}, {d.paddedAddress(address)}} } -func (d *ETHDownloader) subTransactionsFromTransactionHash(parent context.Context, txHash common.Hash, address common.Address) ([]Transfer, error) { +func (d *ETHDownloader) fetchTransactionReceipt(parent context.Context, txHash common.Hash) (*types.Receipt, error) { ctx, cancel := context.WithTimeout(parent, 3*time.Second) - tx, _, err := d.chainClient.TransactionByHash(ctx, txHash) - cancel() - if err != nil { - return nil, err - } - - ctx, cancel = context.WithTimeout(parent, 3*time.Second) receipt, err := d.chainClient.TransactionReceipt(ctx, txHash) cancel() if err != nil { return nil, err } + return receipt, nil +} - ctx, cancel = context.WithTimeout(parent, 3*time.Second) - blk, err := d.chainClient.BlockByHash(ctx, receipt.BlockHash) +func (d *ETHDownloader) fetchTransaction(parent context.Context, txHash common.Hash) (*types.Transaction, error) { + ctx, cancel := context.WithTimeout(parent, 3*time.Second) + tx, _, err := d.chainClient.TransactionByHash(ctx, txHash) // TODO Save on requests by checking in the DB first cancel() if err != nil { return nil, err } - - baseGasFee, err := d.chainClient.GetBaseFeeFromBlock(receipt.BlockNumber) - if err != nil { - return nil, err - } - - return d.subTransactionsFromTransactionData(tx, receipt, blk, baseGasFee, address) + return tx, nil } -func (d *ETHDownloader) subTransactionsFromTransactionData(tx *types.Transaction, receipt *types.Receipt, blk *types.Block, baseGasFee string, address common.Address) ([]Transfer, error) { +func (d *ETHDownloader) subTransactionsFromPreloaded(preloadedTx *PreloadedTransaction, tx *types.Transaction, receipt *types.Receipt, blk *types.Block) ([]Transfer, error) { + log.Debug("subTransactionsFromPreloaded start", "txHash", tx.Hash().Hex(), "address", preloadedTx.Address, "tokenID", preloadedTx.TokenID, "value", preloadedTx.Value) + address := preloadedTx.Address + txLog := preloadedTx.Log + + rst := make([]Transfer, 0, 1) + from, err := types.Sender(d.signer, tx) if err != nil { if err == core.ErrTxTypeNotSupported { @@ -330,47 +383,63 @@ func (d *ETHDownloader) subTransactionsFromTransactionData(tx *types.Transaction return nil, err } - rst := make([]Transfer, 0, len(receipt.Logs)) - for _, log := range receipt.Logs { - eventType := w_common.GetEventType(log) - // Only add ERC20/ERC721/ERC1155 transfers from/to the given account - // Other types of events get always added - mustAppend := false - switch eventType { - case w_common.Erc20TransferEventType: - trFrom, trTo, _ := w_common.ParseErc20TransferLog(log) - if trFrom == address || trTo == address { - mustAppend = true - } - case w_common.Erc721TransferEventType: - trFrom, trTo, _ := w_common.ParseErc721TransferLog(log) - if trFrom == address || trTo == address { - mustAppend = true - } - case w_common.Erc1155TransferSingleEventType: - _, trFrom, trTo, _, _ := w_common.ParseErc1155TransferSingleLog(log) - if trFrom == address || trTo == address { - mustAppend = true - } - case w_common.UniswapV2SwapEventType, w_common.UniswapV3SwapEventType: - mustAppend = true - case w_common.HopBridgeTransferSentToL2EventType, w_common.HopBridgeTransferFromL1CompletedEventType: - mustAppend = true - case w_common.HopBridgeWithdrawalBondedEventType, w_common.HopBridgeTransferSentEventType: - mustAppend = true + eventType := w_common.GetEventType(preloadedTx.Log) + // Only add ERC20/ERC721/ERC1155 transfers from/to the given account + // from/to matching is already handled by getLogs filter + switch eventType { + case w_common.Erc20TransferEventType, + w_common.Erc721TransferEventType, + w_common.Erc1155TransferSingleEventType, w_common.Erc1155TransferBatchEventType: + log.Debug("subTransactionsFromPreloaded transfer", "eventType", eventType, "logIdx", txLog.Index, "txHash", tx.Hash().Hex(), "address", address.Hex(), "tokenID", preloadedTx.TokenID, "value", preloadedTx.Value, "baseFee", blk.BaseFee().String()) + + transfer := Transfer{ + Type: w_common.EventTypeToSubtransactionType(eventType), + ID: preloadedTx.ID, + Address: address, + BlockNumber: new(big.Int).SetUint64(txLog.BlockNumber), + BlockHash: txLog.BlockHash, + Loaded: true, + NetworkID: d.signer.ChainID().Uint64(), + From: from, + Log: txLog, + TokenID: preloadedTx.TokenID, + TokenValue: preloadedTx.Value, + BaseGasFees: blk.BaseFee().String(), + Transaction: tx, + Receipt: receipt, + Timestamp: blk.Time(), + MultiTransactionID: NoMultiTransactionID, } - if mustAppend { + + rst = append(rst, transfer) + } + + log.Debug("subTransactionsFromPreloaded end", "txHash", tx.Hash().Hex(), "address", address.Hex(), "tokenID", preloadedTx.TokenID, "value", preloadedTx.Value) + return rst, nil +} + +func (d *ETHDownloader) subTransactionsFromTransactionData(address, from common.Address, tx *types.Transaction, receipt *types.Receipt, blk *types.Block) ([]Transfer, error) { + log.Debug("subTransactionsFromTransactionData start", "txHash", tx.Hash().Hex(), "address", address) + + rst := make([]Transfer, 0, 1) + + for _, txLog := range receipt.Logs { + eventType := w_common.GetEventType(txLog) + switch eventType { + case w_common.UniswapV2SwapEventType, w_common.UniswapV3SwapEventType, + w_common.HopBridgeTransferSentToL2EventType, w_common.HopBridgeTransferFromL1CompletedEventType, + w_common.HopBridgeWithdrawalBondedEventType, w_common.HopBridgeTransferSentEventType: transfer := Transfer{ Type: w_common.EventTypeToSubtransactionType(eventType), - ID: getLogSubTxID(*log), + ID: w_common.GetLogSubTxID(*txLog), Address: address, - BlockNumber: new(big.Int).SetUint64(log.BlockNumber), - BlockHash: log.BlockHash, + BlockNumber: new(big.Int).SetUint64(txLog.BlockNumber), + BlockHash: txLog.BlockHash, Loaded: true, NetworkID: d.signer.ChainID().Uint64(), From: from, - Log: log, - BaseGasFees: baseGasFee, + Log: txLog, + BaseGasFees: blk.BaseFee().String(), Transaction: tx, Receipt: receipt, Timestamp: blk.Time(), @@ -381,11 +450,13 @@ func (d *ETHDownloader) subTransactionsFromTransactionData(tx *types.Transaction } } + log.Debug("subTransactionsFromTransactionData end", "txHash", tx.Hash().Hex(), "address", address.Hex()) return rst, nil } func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]*DBHeader, error) { concurrent := NewConcurrentDownloader(parent, NoThreadLimit) + for i := range logs { l := logs[i] @@ -393,40 +464,48 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [ continue } - id := getLogSubTxID(l) - baseGasFee, err := d.client.GetBaseFeeFromBlock(new(big.Int).SetUint64(l.BlockNumber)) + from, to, txIDs, tokenIDs, values, err := w_common.ParseTransferLog(l) if err != nil { - return nil, err + log.Error("failed to parse transfer log", "log", l, "address", address, "error", err) + continue } - logType := w_common.EventTypeToSubtransactionType(w_common.GetEventType(&l)) - log.Debug("block from logs", "block", l.BlockNumber, "log", l, "logType", logType, "address", address, "id", id) - - if logType != w_common.Erc1155SingleTransfer && logType != w_common.Erc1155BatchTransfer { - logType = w_common.Erc20Transfer + // Double check provider returned the correct log + if from != address && to != address { + log.Error("from/to address mismatch", "log", l, "address", address) + continue } - header := &DBHeader{ - Number: big.NewInt(int64(l.BlockNumber)), - Hash: l.BlockHash, - PreloadedTransactions: []*PreloadedTransaction{{ - Address: address, - BlockNumber: big.NewInt(int64(l.BlockNumber)), - BlockHash: l.BlockHash, - ID: id, - From: address, - Loaded: false, - Type: logType, - Log: &l, - BaseGasFees: baseGasFee, - }}, - Loaded: false, - } + eventType := w_common.GetEventType(&l) + logType := w_common.EventTypeToSubtransactionType(eventType) - concurrent.Add(func(ctx context.Context) error { - concurrent.PushHeader(header) - return nil - }) + for i, txID := range txIDs { + log.Debug("block from logs", "block", l.BlockNumber, "log", l, "logType", logType, "address", address, "txID", txID) + + // For ERC20 there is no tokenID, so we use nil + var tokenID *big.Int + if len(tokenIDs) > i { + tokenID = tokenIDs[i] + } + + header := &DBHeader{ + Number: big.NewInt(int64(l.BlockNumber)), + Hash: l.BlockHash, + PreloadedTransactions: []*PreloadedTransaction{{ + ID: txID, + Type: logType, + Log: &l, + TokenID: tokenID, + Value: values[i], + }}, + Loaded: false, + } + + concurrent.Add(func(ctx context.Context) error { + concurrent.PushHeader(header) + return nil + }) + } } select { case <-concurrent.WaitAsync(): @@ -478,7 +557,7 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro inbound1155, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{ FromBlock: from, ToBlock: to, - Topics: d.inboundTopicsERC1155Single(address), + Topics: d.inboundTopicsERC1155(address), }) if err != nil { return nil, err diff --git a/services/wallet/transfer/query.go b/services/wallet/transfer/query.go index 8cf425ae1..39c1072cd 100644 --- a/services/wallet/transfer/query.go +++ b/services/wallet/transfer/query.go @@ -3,6 +3,7 @@ package transfer import ( "bytes" "database/sql" + "fmt" "math/big" "github.com/ethereum/go-ethereum/common" @@ -10,7 +11,8 @@ import ( "github.com/status-im/status-go/services/wallet/bigint" ) -const baseTransfersQuery = "SELECT hash, type, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log, network_id, base_gas_fee, COALESCE(multi_transaction_id, 0) FROM transfers" +const baseTransfersQuery = "SELECT hash, type, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log, network_id, base_gas_fee, COALESCE(multi_transaction_id, 0) %s FROM transfers" +const preloadedTransfersQuery = "SELECT hash, type, address, log, token_id, amount_padded128hex FROM transfers" type transfersQuery struct { buf *bytes.Buffer @@ -21,7 +23,14 @@ type transfersQuery struct { func newTransfersQuery() *transfersQuery { newQuery := newEmptyQuery() - newQuery.buf.WriteString(baseTransfersQuery) + transfersQueryString := fmt.Sprintf(baseTransfersQuery, "") + newQuery.buf.WriteString(transfersQueryString) + return newQuery +} + +func newTransfersQueryForPreloadedTransactions() *transfersQuery { + newQuery := newEmptyQuery() + newQuery.buf.WriteString(preloadedTransfersQuery) return newQuery } @@ -110,13 +119,20 @@ func (q *transfersQuery) FilterAddress(address common.Address) *transfersQuery { return q } -func (q *transfersQuery) FilterTransactionHash(hash common.Hash) *transfersQuery { +func (q *transfersQuery) FilterTransactionID(hash common.Hash) *transfersQuery { q.addWhereSeparator(AndSeparator) q.buf.WriteString(" hash = ?") q.args = append(q.args, hash) return q } +func (q *transfersQuery) FilterTransactionHash(hash common.Hash) *transfersQuery { + q.addWhereSeparator(AndSeparator) + q.buf.WriteString(" tx_hash = ?") + q.args = append(q.args, hash) + return q +} + func (q *transfersQuery) FilterBlockHash(blockHash common.Hash) *transfersQuery { q.addWhereSeparator(AndSeparator) q.buf.WriteString(" blk_hash = ?") @@ -167,25 +183,48 @@ func (q *transfersQuery) TransferScan(rows *sql.Rows) (rst []Transfer, err error return rst, nil } -func (q *transfersQuery) PreloadedTransactionScan(rows *sql.Rows) (rst []PreloadedTransaction, err error) { - transfers, err := q.TransferScan(rows) - if err != nil { - return +func (q *transfersQuery) PreloadedTransactionScan(rows *sql.Rows) (rst []*PreloadedTransaction, err error) { + transfers := make([]Transfer, 0) + for rows.Next() { + transfer := Transfer{ + Log: &types.Log{}, + } + tokenValue := sql.NullString{} + tokenID := sql.RawBytes{} + err = rows.Scan( + &transfer.ID, &transfer.Type, + &transfer.Address, + &JSONBlob{transfer.Log}, + &tokenID, &tokenValue) + + if len(tokenID) > 0 { + transfer.TokenID = new(big.Int).SetBytes(tokenID) + } + + if tokenValue.Valid { + var ok bool + transfer.TokenValue, ok = new(big.Int).SetString(tokenValue.String, 16) + if !ok { + panic("failed to parse token value") + } + } + + if err != nil { + return nil, err + } + transfers = append(transfers, transfer) } - rst = make([]PreloadedTransaction, 0, len(transfers)) + rst = make([]*PreloadedTransaction, 0, len(transfers)) for _, transfer := range transfers { - preloadedTransaction := PreloadedTransaction{ - ID: transfer.ID, - Type: transfer.Type, - BlockHash: transfer.BlockHash, - BlockNumber: transfer.BlockNumber, - Address: transfer.Address, - From: transfer.From, - Log: transfer.Log, - NetworkID: transfer.NetworkID, - BaseGasFees: transfer.BaseGasFees, + preloadedTransaction := &PreloadedTransaction{ + ID: transfer.ID, + Type: transfer.Type, + Address: transfer.Address, + Log: transfer.Log, + TokenID: transfer.TokenID, + Value: transfer.TokenValue, } rst = append(rst, preloadedTransaction)