feat(wallet): detect ERC1155 batch transfers.

Refactored transfers loading to reduce blockchain RPC requests (getBaseFee, getTransaction,
getTransactionReceipt) by reusing preloaded transaction and block fee.
Split extraction of subtransaction from logs and from ETH transfer into
different methods.
Refactored log_parser to extract sender and receiver addresses
uniformly for different transfer types.
Replaced info logs with debug where needed.

closes #4221
This commit is contained in:
Ivan Belyakov 2023-11-02 18:24:23 +01:00 committed by IvanBelyakoff
parent d04f99d56d
commit b6ade53603
10 changed files with 565 additions and 319 deletions

View File

@ -335,7 +335,14 @@ func migrateWalletJSONBlobs(sqlTx *sql.Tx) error {
func extractToken(entryType string, tx *types.Transaction, l *types.Log, logValid bool) (correctType w_common.Type, tokenID *big.Int, value *big.Int, tokenAddress *common.Address) {
if logValid {
correctType, tokenAddress, tokenID, value, _, _ = w_common.ExtractTokenIdentity(w_common.Type(entryType), l, tx)
correctType, tokenAddress, _, _ = w_common.ExtractTokenTransferData(w_common.Type(entryType), l, tx)
_, _, _, tokenIDs, values, _ := w_common.ParseTransferLog(*l)
if len(tokenIDs) > 0 {
tokenID = tokenIDs[0]
}
if len(values) > 0 {
value = values[0]
}
} else {
correctType = w_common.Type(entryType)
value = new(big.Int).Set(tx.Value())
@ -394,7 +401,7 @@ func migrateWalletTransferFromToAddresses(sqlTx *sql.Tx) error {
if nullableTx.Valid {
if nullableL.Valid {
_, tokenAddress, _, _, txFrom, txTo = w_common.ExtractTokenIdentity(w_common.Type(entryType), l, tx)
_, tokenAddress, txFrom, txTo = w_common.ExtractTokenTransferData(w_common.Type(entryType), l, tx)
} else {
txFrom = &sender
txTo = tx.To()

View File

@ -297,21 +297,24 @@ func TestMigrateWalletJsonBlobs(t *testing.T) {
require.Equal(t, *sqlite.BigIntToPadded128BitsStr(tt.Value()), amount128Hex.String)
require.True(t, isTokenIDNull)
} else {
actualEntryType, expectedTokenAddress, expectedTokenID, expectedValue, expectedFrom, expectedTo := w_common.ExtractTokenIdentity(expectedEntryType, tl, tt)
actualEntryType, expectedTokenAddress, _, _ := w_common.ExtractTokenTransferData(expectedEntryType, tl, tt)
if actualEntryType == w_common.Erc20Transfer {
expectedFrom, expectedTo, expectedValue := w_common.ParseErc20TransferLog(tl)
require.True(t, amount128Hex.Valid)
require.Equal(t, *sqlite.BigIntToPadded128BitsStr(expectedValue), amount128Hex.String)
require.True(t, isTokenIDNull)
require.Equal(t, *expectedTokenAddress, *tokenAddress)
require.Equal(t, *expectedFrom, *txFrom)
require.Equal(t, *expectedTo, *txTo)
require.Equal(t, expectedFrom, *txFrom)
require.Equal(t, expectedTo, *txTo)
} else if actualEntryType == w_common.Erc721Transfer {
require.False(t, amount128Hex.Valid)
expectedFrom, expectedTo, expectedTokenID := w_common.ParseErc721TransferLog(tl)
require.True(t, amount128Hex.Valid)
require.Equal(t, *sqlite.BigIntToPadded128BitsStr(big.NewInt(1)), amount128Hex.String)
require.False(t, isTokenIDNull)
require.Equal(t, expectedTokenID, expectedTokenID)
require.Equal(t, *expectedTokenAddress, *tokenAddress)
require.Equal(t, *expectedFrom, *txFrom)
require.Equal(t, *expectedTo, *txTo)
require.Equal(t, expectedFrom, *txFrom)
require.Equal(t, expectedTo, *txTo)
} else {
require.False(t, amount128Hex.Valid)
require.True(t, isTokenIDNull)

View File

@ -3,6 +3,7 @@
package common
import (
"encoding/binary"
"fmt"
"math/big"
@ -23,8 +24,7 @@ const (
EthTransfer Type = "eth"
Erc20Transfer Type = "erc20"
Erc721Transfer Type = "erc721"
Erc1155SingleTransfer Type = "erc1155"
Erc1155BatchTransfer Type = "erc1155"
Erc1155Transfer Type = "erc1155"
UniswapV2Swap Type = "uniswapV2Swap"
UniswapV3Swap Type = "uniswapV3Swap"
HopBridgeFrom Type = "HopBridgeFrom"
@ -136,10 +136,8 @@ func EventTypeToSubtransactionType(eventType EventType) Type {
return Erc20Transfer
case Erc721TransferEventType:
return Erc721Transfer
case Erc1155TransferSingleEventType:
return Erc1155SingleTransfer
case Erc1155TransferBatchEventType:
return Erc1155BatchTransfer
case Erc1155TransferSingleEventType, Erc1155TransferBatchEventType:
return Erc1155Transfer
case UniswapV2SwapEventType:
return UniswapV2Swap
case UniswapV3SwapEventType:
@ -226,16 +224,13 @@ func ParseErc20TransferLog(ethlog *types.Log) (from, to common.Address, amount *
log.Warn("not enough topics for erc20 transfer", "topics", ethlog.Topics)
return
}
if len(ethlog.Topics[1]) != 32 {
log.Warn("second topic is not padded to 32 byte address", "topic", ethlog.Topics[1])
var err error
from, to, err = getFromToAddresses(*ethlog)
if err != nil {
log.Error("log_parser::ParseErc20TransferLog", err)
return
}
if len(ethlog.Topics[2]) != 32 {
log.Warn("third topic is not padded to 32 byte address", "topic", ethlog.Topics[2])
return
}
copy(from[:], ethlog.Topics[1][12:])
copy(to[:], ethlog.Topics[2][12:])
if len(ethlog.Data) != 32 {
log.Warn("data is not padded to 32 byts big int", "data", ethlog.Data)
return
@ -251,52 +246,150 @@ func ParseErc721TransferLog(ethlog *types.Log) (from, to common.Address, tokenID
log.Warn("not enough topics for erc721 transfer", "topics", ethlog.Topics)
return
}
if len(ethlog.Topics[1]) != 32 {
log.Warn("second topic is not padded to 32 byte address", "topic", ethlog.Topics[1])
var err error
from, to, err = getFromToAddresses(*ethlog)
if err != nil {
log.Error("log_parser::ParseErc721TransferLog", err)
return
}
if len(ethlog.Topics[2]) != 32 {
log.Warn("third topic is not padded to 32 byte address", "topic", ethlog.Topics[2])
return
}
if len(ethlog.Topics[3]) != 32 {
log.Warn("fourth topic is not 32 byte tokenId", "topic", ethlog.Topics[3])
return
}
copy(from[:], ethlog.Topics[1][12:])
copy(to[:], ethlog.Topics[2][12:])
tokenID.SetBytes(ethlog.Topics[3][:])
return
}
func ParseErc1155TransferSingleLog(ethlog *types.Log) (operator, from, to common.Address, id, amount *big.Int) {
if len(ethlog.Topics) < erc1155TransferEventIndexedParameters {
log.Warn("not enough topics for erc1155 transfer single", "topics", ethlog.Topics)
return
}
func GetLogSubTxID(log types.Log) common.Hash {
// Get unique ID by using TxHash and log index
index := [4]byte{}
binary.BigEndian.PutUint32(index[:], uint32(log.Index))
return crypto.Keccak256Hash(log.TxHash.Bytes(), index[:])
}
amount = new(big.Int)
id = new(big.Int)
func getLogSubTxIDWithTokenIDIndex(log types.Log, tokenIDIdx uint16) common.Hash {
// Get unique ID by using TxHash, log index and extra bytes (token id index for ERC1155 TransferBatch)
index := [4]byte{}
value := uint32(log.Index&0x0000FFFF) | (uint32(tokenIDIdx) << 16) // log index should not exceed uint16 max value
binary.BigEndian.PutUint32(index[:], value)
return crypto.Keccak256Hash(log.TxHash.Bytes(), index[:])
}
for i := 1; i < erc1155TransferEventIndexedParameters; i++ {
func checkTopicsLength(ethlog types.Log, startIdx, endIdx int) (err error) {
for i := startIdx; i < endIdx; i++ {
if len(ethlog.Topics[i]) != common.HashLength {
log.Warn(fmt.Sprintf("topic %d is not padded to %d byte address, topic=%s", i, common.HashLength, ethlog.Topics[1]))
err = fmt.Errorf("topic %d is not padded to %d byte address, topic=%s", i, common.HashLength, ethlog.Topics[i])
log.Error("log_parser::checkTopicsLength", err)
return
}
}
return
}
func getFromToAddresses(ethlog types.Log) (from, to common.Address, err error) {
eventType := GetEventType(&ethlog)
addressIdx := common.HashLength - common.AddressLength
copy(operator[:], ethlog.Topics[1][addressIdx:])
switch eventType {
case Erc1155TransferSingleEventType, Erc1155TransferBatchEventType:
err = checkTopicsLength(ethlog, 2, 4)
if err != nil {
return
}
copy(from[:], ethlog.Topics[2][addressIdx:])
copy(to[:], ethlog.Topics[3][addressIdx:])
return
if len(ethlog.Data) != common.HashLength*2 {
log.Warn("data is not padded to 64 bytes", "data", ethlog.Data)
case Erc20TransferEventType, Erc721TransferEventType, UniswapV2SwapEventType, UniswapV3SwapEventType, HopBridgeTransferFromL1CompletedEventType:
err = checkTopicsLength(ethlog, 1, 3)
if err != nil {
return
}
copy(from[:], ethlog.Topics[1][addressIdx:])
copy(to[:], ethlog.Topics[2][addressIdx:])
return
}
id.SetBytes(ethlog.Data[:common.HashLength])
amount.SetBytes(ethlog.Data[common.HashLength:])
return from, to, fmt.Errorf("unsupported event type to get from/to adddresses %s", eventType)
}
func ParseTransferLog(ethlog types.Log) (from, to common.Address, txIDs []common.Hash, tokenIDs, values []*big.Int, err error) {
eventType := GetEventType(&ethlog)
switch eventType {
case Erc20TransferEventType:
var amount *big.Int
from, to, amount = ParseErc20TransferLog(&ethlog)
txIDs = append(txIDs, GetLogSubTxID(ethlog))
values = append(values, amount)
return
case Erc721TransferEventType:
var tokenID *big.Int
from, to, tokenID = ParseErc721TransferLog(&ethlog)
txIDs = append(txIDs, GetLogSubTxID(ethlog))
tokenIDs = append(tokenIDs, tokenID)
values = append(values, big.NewInt(1))
return
case Erc1155TransferSingleEventType, Erc1155TransferBatchEventType:
_, from, to, tokenIDs, values, err = ParseErc1155TransferLog(&ethlog, eventType)
for i := range tokenIDs {
txIDs = append(txIDs, getLogSubTxIDWithTokenIDIndex(ethlog, uint16(i)))
}
return
}
return from, to, txIDs, tokenIDs, values, fmt.Errorf("unsupported event type in log_parser::ParseTransferLogs %s", eventType)
}
func ParseErc1155TransferLog(ethlog *types.Log, evType EventType) (operator, from, to common.Address, ids, amounts []*big.Int, err error) {
if len(ethlog.Topics) < erc1155TransferEventIndexedParameters {
err = fmt.Errorf("not enough topics for erc1155 transfer %s, %v", "topics", ethlog.Topics)
log.Error("log_parser::ParseErc1155TransferLog", "err", err)
return
}
err = checkTopicsLength(*ethlog, 1, erc1155TransferEventIndexedParameters)
if err != nil {
return
}
addressIdx := common.HashLength - common.AddressLength
copy(operator[:], ethlog.Topics[1][addressIdx:])
from, to, err = getFromToAddresses(*ethlog)
if err != nil {
log.Error("log_parser::ParseErc1155TransferLog", "err", err)
return
}
if len(ethlog.Data) == 0 || len(ethlog.Data)%(common.HashLength*2) != 0 {
err = fmt.Errorf("data is not padded to 64 bytes %s, %v", "data", ethlog.Data)
log.Error("log_parser::ParseErc1155TransferLog", "err", err)
return
}
if evType == Erc1155TransferSingleEventType {
ids = append(ids, new(big.Int).SetBytes(ethlog.Data[:common.HashLength]))
amounts = append(amounts, new(big.Int).SetBytes(ethlog.Data[common.HashLength:]))
log.Debug("log_parser::ParseErc1155TransferSingleLog", "ids", ids, "amounts", amounts)
} else {
// idTypeSize := new(big.Int).SetBytes(ethlog.Data[:common.HashLength]).Uint64() // Left for knowledge
// valueTypeSize := new(big.Int).SetBytes(ethlog.Data[common.HashLength : common.HashLength*2]).Uint64() // Left for knowledge
idsArraySize := new(big.Int).SetBytes(ethlog.Data[common.HashLength*2 : common.HashLength*2+common.HashLength]).Uint64()
initialOffset := common.HashLength*2 + common.HashLength
for i := 0; i < int(idsArraySize); i++ {
ids = append(ids, new(big.Int).SetBytes(ethlog.Data[initialOffset+i*common.HashLength:initialOffset+(i+1)*common.HashLength]))
}
valuesArraySize := new(big.Int).SetBytes(ethlog.Data[initialOffset+int(idsArraySize)*common.HashLength : initialOffset+int(idsArraySize+1)*common.HashLength]).Uint64()
if idsArraySize != valuesArraySize {
err = fmt.Errorf("ids and values sizes don't match %d, %d", idsArraySize, valuesArraySize)
log.Error("log_parser::ParseErc1155TransferBatchLog", "err", err)
return
}
initialOffset = initialOffset + int(idsArraySize+1)*common.HashLength
for i := 0; i < int(valuesArraySize); i++ {
amounts = append(amounts, new(big.Int).SetBytes(ethlog.Data[initialOffset+i*common.HashLength:initialOffset+(i+1)*common.HashLength]))
log.Debug("log_parser::ParseErc1155TransferBatchLog", "id", ids[i], "amount", amounts[i])
}
}
return
}
@ -313,17 +406,11 @@ func ParseUniswapV2Log(ethlog *types.Log) (pairAddress common.Address, from comm
}
pairAddress = ethlog.Address
if len(ethlog.Topics[1]) != 32 {
err = fmt.Errorf("second topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[1])
from, to, err = getFromToAddresses(*ethlog)
if err != nil {
log.Error("log_parser::ParseUniswapV2Log", err)
return
}
if len(ethlog.Topics[2]) != 32 {
err = fmt.Errorf("third topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[2])
return
}
copy(from[:], ethlog.Topics[1][12:])
copy(to[:], ethlog.Topics[2][12:])
if len(ethlog.Data) != 32*4 {
err = fmt.Errorf("data is not padded to 4 * 32 bytes big int %s, %v", "data", ethlog.Data)
return
@ -359,17 +446,11 @@ func ParseUniswapV3Log(ethlog *types.Log) (poolAddress common.Address, sender co
}
poolAddress = ethlog.Address
if len(ethlog.Topics[1]) != 32 {
err = fmt.Errorf("second topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[1])
sender, recipient, err = getFromToAddresses(*ethlog)
if err != nil {
log.Error("log_parser::ParseUniswapV3Log", err)
return
}
if len(ethlog.Topics[2]) != 32 {
err = fmt.Errorf("third topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[2])
return
}
copy(sender[:], ethlog.Topics[1][12:])
copy(recipient[:], ethlog.Topics[2][12:])
if len(ethlog.Data) != 32*5 {
err = fmt.Errorf("data is not padded to 5 * 32 bytes big int %s, %v", "data", ethlog.Data)
return
@ -426,17 +507,11 @@ func ParseHopBridgeTransferFromL1CompletedLog(ethlog *types.Log) (recipient comm
return
}
if len(ethlog.Topics[1]) != 32 {
err = fmt.Errorf("second topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[1])
recipient, relayer, err = getFromToAddresses(*ethlog)
if err != nil {
log.Error("log_parser::ParseHopBridgeTransferFromL1CompletedLog", err)
return
}
copy(recipient[:], ethlog.Topics[1][12:])
if len(ethlog.Topics[2]) != 32 {
err = fmt.Errorf("third topic is not padded to 32 byte address %s, %v", "topic", ethlog.Topics[2])
return
}
copy(relayer[:], ethlog.Topics[2][12:])
if len(ethlog.Data) != 32*4 {
err = fmt.Errorf("data is not padded to 4 * 32 bytes big int %s, %v", "data", ethlog.Data)
@ -526,7 +601,7 @@ func GetEventSignatureHash(signature string) common.Hash {
return crypto.Keccak256Hash([]byte(signature))
}
func ExtractTokenIdentity(dbEntryType Type, log *types.Log, tx *types.Transaction) (correctType Type, tokenAddress *common.Address, txTokenID *big.Int, txValue *big.Int, txFrom *common.Address, txTo *common.Address) {
func ExtractTokenTransferData(dbEntryType Type, log *types.Log, tx *types.Transaction) (correctType Type, tokenAddress *common.Address, txFrom *common.Address, txTo *common.Address) {
// erc721 transfers share signature with erc20 ones, so they both used to be categorized as erc20
// by the Downloader. We fix this here since they might be mis-categorized in the db.
if dbEntryType == Erc20Transfer {
@ -537,32 +612,27 @@ func ExtractTokenIdentity(dbEntryType Type, log *types.Log, tx *types.Transactio
}
switch correctType {
case EthTransfer:
if tx != nil {
txValue = new(big.Int).Set(tx.Value())
}
case Erc20Transfer:
tokenAddress = new(common.Address)
*tokenAddress = log.Address
from, to, value := ParseErc20TransferLog(log)
txValue = value
from, to, _ := ParseErc20TransferLog(log)
txFrom = &from
txTo = &to
case Erc721Transfer:
tokenAddress = new(common.Address)
*tokenAddress = log.Address
from, to, tokenID := ParseErc721TransferLog(log)
txTokenID = tokenID
from, to, _ := ParseErc721TransferLog(log)
txFrom = &from
txTo = &to
case Erc1155SingleTransfer:
case Erc1155Transfer:
tokenAddress = new(common.Address)
*tokenAddress = log.Address
_, from, to, tokenID, value := ParseErc1155TransferSingleLog(log)
txTokenID = tokenID
_, from, to, _, _, err := ParseErc1155TransferLog(log, Erc1155TransferSingleEventType) // from/to extraction is the same for single and batch
if err != nil {
return
}
txFrom = &from
txTo = &to
txValue = value
}
return

View File

@ -119,7 +119,6 @@ type erc20HistoricalCommand struct {
iterator *IterativeDownloader
to *big.Int
from *big.Int
foundHeaders []*DBHeader
}
@ -203,7 +202,8 @@ func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) error {
}
func (c *controlCommand) Run(parent context.Context) error {
log.Info("start control command")
log.Debug("start control command")
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
head, err := c.chainClient.HeaderByNumber(ctx, nil)
cancel()
@ -221,7 +221,7 @@ func (c *controlCommand) Run(parent context.Context) error {
})
}
log.Info("current head is", "block number", head.Number)
log.Debug("current head is", "block number", head.Number)
// Get last known block for each account
lastKnownEthBlocks, accountsWithoutHistory, err := c.blockDAO.GetLastKnownBlockByAddresses(c.chainClient.NetworkID(), c.accounts)
@ -326,7 +326,7 @@ func (c *controlCommand) Run(parent context.Context) error {
BlockNumber: target,
})
}
log.Info("end control command")
log.Debug("end control command")
return err
}
@ -390,7 +390,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
// Take blocks from cache if available and disrespect the limit
// If no blocks are available in cache, take blocks from DB respecting the limit
// If no limit is set, take all blocks from DB
log.Info("start transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address, "blockNums", c.blockNums)
log.Debug("start transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address, "blockNums", c.blockNums)
startTs := time.Now()
for {
@ -450,7 +450,7 @@ func (c *transfersCommand) Run(ctx context.Context) (err error) {
}
}
log.Info("end transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address,
log.Debug("end transfersCommand", "chain", c.chainClient.NetworkID(), "address", c.address,
"blocks.len", len(c.blockNums), "transfers.len", len(c.fetchedTransfers), "in", time.Since(startTs))
return nil
@ -771,7 +771,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher b
fromByAddress map[common.Address]*Block, toByAddress map[common.Address]*big.Int) (map[common.Address]*big.Int,
map[common.Address][]*DBHeader, error) {
log.Info("fast indexer started")
log.Debug("fast indexer started")
start := time.Now()
group := async.NewGroup(ctx)
@ -804,7 +804,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher b
resultingFromByAddress[command.address] = command.resultingFrom
headers[command.address] = command.foundHeaders
}
log.Info("fast indexer finished", "in", time.Since(start))
log.Debug("fast indexer finished", "in", time.Since(start))
return resultingFromByAddress, headers, nil
}
}
@ -812,7 +812,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher b
// run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header.
func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, fromByAddress map[common.Address]*big.Int, toByAddress map[common.Address]*big.Int) (map[common.Address][]*DBHeader, error) {
log.Info("fast indexer Erc20 started")
log.Debug("fast indexer Erc20 started")
start := time.Now()
group := async.NewGroup(ctx)
@ -839,7 +839,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from
for _, command := range commands {
headers[command.address] = command.foundHeaders
}
log.Info("fast indexer Erc20 finished", "in", time.Since(start))
log.Debug("fast indexer Erc20 finished", "in", time.Since(start))
return headers, nil
}
}
@ -849,7 +849,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *Blo
transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker,
tokenManager *token.Manager, feed *event.Feed) error {
log.Info("loadTransfers start", "accounts", accounts, "chain", chainClient.NetworkID(), "limit", blocksLimitPerAccount)
log.Debug("loadTransfers start", "accounts", accounts, "chain", chainClient.NetworkID(), "limit", blocksLimitPerAccount)
start := time.Now()
group := async.NewGroup(ctx)
@ -879,7 +879,7 @@ func loadTransfers(ctx context.Context, accounts []common.Address, blockDAO *Blo
case <-ctx.Done():
return ctx.Err()
case <-group.WaitAsync():
log.Info("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.NetworkID())
log.Debug("loadTransfers finished for account", "in", time.Since(start), "chain", chainClient.NetworkID())
return nil
}
}
@ -899,7 +899,7 @@ func getLowestFrom(chainID uint64, to *big.Int) *big.Int {
// Finds the latest range up to initialTo where the number of transactions is between 20 and 25
func findFirstRange(c context.Context, account common.Address, initialTo *big.Int, client chain.ClientInterface) (*big.Int, error) {
log.Info("findFirstRange", "account", account, "initialTo", initialTo, "client", client)
log.Debug("findFirstRange", "account", account, "initialTo", initialTo, "client", client)
from := getLowestFrom(client.NetworkID(), initialTo)
to := initialTo
@ -910,7 +910,7 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In
}
firstNonce, err := client.NonceAt(c, account, to) // this is the latest nonce actually
log.Info("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to)
log.Debug("find range with 20 <= len(tx) <= 25", "account", account, "firstNonce", firstNonce, "from", from, "to", to)
if err != nil {
return nil, err
@ -943,15 +943,15 @@ func findFirstRange(c context.Context, account common.Address, initialTo *big.In
}
nonceDiff = firstNonce - fromNonce
log.Info("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce)
log.Debug("next nonce", "from", from, "n", fromNonce, "diff", firstNonce-fromNonce)
if goal <= nonceDiff && nonceDiff <= (goal+5) {
log.Info("range found", "account", account, "from", from, "to", to)
log.Debug("range found", "account", account, "from", from, "to", to)
return from, nil
}
}
log.Info("range found", "account", account, "from", from, "to", to)
log.Debug("range found", "account", account, "from", from, "to", to)
return from, nil
}

View File

@ -32,7 +32,7 @@ func (c *findNewBlocksCommand) Command() async.Command {
}
func (c *findNewBlocksCommand) Run(parent context.Context) (err error) {
log.Debug("start findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit)
log.Debug("start findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber)
headNum, err := getHeadBlockNumber(parent, c.chainClient)
if err != nil {
@ -47,6 +47,7 @@ func (c *findNewBlocksCommand) Run(parent context.Context) (err error) {
return err // Will keep spinning forever nomatter what
}
// In case no block range is in DB, skip until history blocks are fetched
if blockRange != nil {
c.fromBlockNumber = blockRange.LastKnown
@ -64,6 +65,8 @@ func (c *findNewBlocksCommand) Run(parent context.Context) (err error) {
_ = c.findBlocksCommand.Run(parent)
}
log.Debug("end findNewBlocksCommand", "account", c.account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", c.fromBlockNumber, "to", c.toBlockNumber)
return nil
}
@ -436,7 +439,7 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cache
select {
case <-ctx.Done():
err = ctx.Err()
log.Info("fast indexer ctx Done", "error", err)
log.Debug("fast indexer ctx Done", "error", err)
return
case <-group.WaitAsync():
if command.error != nil {
@ -491,7 +494,7 @@ func loadTransfersLoop(ctx context.Context, account common.Address, blockDAO *Bl
for {
select {
case <-ctx.Done():
log.Info("loadTransfersLoop error", "chain", chainClient.NetworkID(), "account", account, "error", ctx.Err())
log.Info("loadTransfersLoop done", "chain", chainClient.NetworkID(), "account", account, "error", ctx.Err())
return
case dbHeaders := <-blocksLoadedCh:
log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.NetworkID(), "account", account, "headers", len(dbHeaders))
@ -523,7 +526,6 @@ func newLoadBlocksAndTransfersCommand(account common.Address, db *Database,
chainClient: chainClient,
feed: feed,
balanceCacher: balanceCacher,
errorsCount: 0,
transactionManager: transactionManager,
pendingTxManager: pendingTxManager,
tokenManager: tokenManager,
@ -540,7 +542,6 @@ type loadBlocksAndTransfersCommand struct {
chainClient chain.ClientInterface
feed *event.Feed
balanceCacher balance.Cacher
errorsCount int
// nonArchivalRPCNode bool // TODO Make use of it
transactionManager *TransactionManager
pendingTxManager *transactions.PendingTxTracker
@ -556,16 +557,28 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
log.Debug("start load all transfers command", "chain", c.chainClient.NetworkID(), "account", c.account)
ctx := parent
// This wait group is used to wait for all the async commands to finish
// but fetchNewBlocksCommand, which is infinite, never finishes, can only be stopped
// by canceling the context which does not happen here, as we don't call group.Stop().
group := async.NewGroup(ctx)
// It will start loadTransfersCommand which will run until success when all transfers from DB are loaded
err := c.fetchTransfersForLoadedBlocks(group)
for err != nil {
return err
}
// Start transfers loop to load transfers for new blocks
c.startTransfersLoop(ctx)
err = c.fetchHistoryBlocks(parent, group, c.blocksLoadedCh)
fromNum := big.NewInt(0)
toNum, err := getHeadBlockNumber(ctx, c.chainClient)
if err != nil {
return err
}
// This will start findBlocksCommand which will run until success when all blocks are loaded
err = c.fetchHistoryBlocks(parent, group, fromNum, toNum, c.blocksLoadedCh)
for err != nil {
group.Stop()
group.Wait()
@ -595,20 +608,13 @@ func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context)
c.pendingTxManager, c.tokenManager, c.feed, c.blocksLoadedCh)
}
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, blocksLoadedCh chan []*DBHeader) error {
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context, group *async.Group, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) error {
log.Debug("fetchHistoryBlocks start", "chainID", c.chainClient.NetworkID(), "account", c.account, "omit", c.omitHistory)
headNum, err := getHeadBlockNumber(ctx, c.chainClient)
if err != nil {
// c.error = err
return err // Might need to retry a couple of times
}
if c.omitHistory {
blockRange := &BlockRange{nil, big.NewInt(0), headNum}
blockRange := &BlockRange{nil, big.NewInt(0), toNum}
err := c.blockRangeDAO.upsertRange(c.chainClient.NetworkID(), c.account, blockRange)
log.Debug("fetchHistoryBlocks omit history", "chainID", c.chainClient.NetworkID(), "account", c.account, "headNum", headNum, "err", err)
return err
}
@ -619,13 +625,11 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context,
return err // Will keep spinning forever nomatter what
}
/// first
allHistoryLoaded := areAllHistoryBlocksLoaded(blockRange)
to := getToHistoryBlockNumber(headNum, blockRange, allHistoryLoaded)
log.Debug("fetchHistoryBlocks", "chainID", c.chainClient.NetworkID(), "account", c.account, "to", to, "allHistoryLoaded", allHistoryLoaded)
if !allHistoryLoaded {
to := getToHistoryBlockNumber(toNum, blockRange, allHistoryLoaded)
fbc := &findBlocksCommand{
account: c.account,
db: c.db,
@ -634,7 +638,7 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context,
balanceCacher: c.balanceCacher,
feed: c.feed,
noLimit: false,
fromBlockNumber: big.NewInt(0),
fromBlockNumber: fromNum,
toBlockNumber: to,
transactionManager: c.transactionManager,
tokenManager: c.tokenManager,
@ -693,6 +697,11 @@ func (c *loadBlocksAndTransfersCommand) fetchTransfersForLoadedBlocks(group *asy
return err
}
if len(blocks) == 0 {
log.Debug("fetchTransfers no blocks to load", "chainID", c.chainClient.NetworkID(), "account", c.account)
return nil
}
blocksMap := make(map[common.Address][]*big.Int)
blocksMap[c.account] = blocks

View File

@ -28,7 +28,6 @@ import (
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
w_common "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/params"
@ -136,8 +135,17 @@ func (tc *TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([
// We do not verify addresses for now
allTransfers := []testERC20Transfer{}
signatures := q.Topics[0]
erc20TransferSignature := w_common.GetEventSignatureHash(w_common.Erc20_721TransferEventSignature)
erc1155TransferSingleSignature := w_common.GetEventSignatureHash(w_common.Erc1155TransferSingleEventSignature)
erc20TransferSignature := walletcommon.GetEventSignatureHash(walletcommon.Erc20_721TransferEventSignature)
erc1155TransferSingleSignature := walletcommon.GetEventSignatureHash(walletcommon.Erc1155TransferSingleEventSignature)
var address common.Hash
for i := 1; i < len(q.Topics); i++ {
if len(q.Topics[i]) > 0 {
address = q.Topics[i][0]
break
}
}
if slices.Contains(signatures, erc1155TransferSingleSignature) {
from := q.Topics[2]
var to []common.Hash
@ -167,10 +175,24 @@ func (tc *TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([
logs := []types.Log{}
for _, transfer := range allTransfers {
if transfer.block.Cmp(q.FromBlock) >= 0 && transfer.block.Cmp(q.ToBlock) <= 0 {
logs = append(logs, types.Log{
log := types.Log{
BlockNumber: transfer.block.Uint64(),
BlockHash: common.BigToHash(transfer.block),
})
}
// Use the address at least in one any(from/to) topic to trick the implementation
switch transfer.eventType {
case walletcommon.Erc20TransferEventType, walletcommon.Erc721TransferEventType:
// To detect properly ERC721, we need a different number of topics. For now we use only ERC20 for testing
log.Topics = []common.Hash{walletcommon.GetEventSignatureHash(walletcommon.Erc20_721TransferEventSignature), address, address}
case walletcommon.Erc1155TransferSingleEventType:
log.Topics = []common.Hash{walletcommon.GetEventSignatureHash(walletcommon.Erc1155TransferSingleEventSignature), address, address, address}
log.Data = make([]byte, 2*common.HashLength)
case walletcommon.Erc1155TransferBatchEventType:
log.Topics = []common.Hash{walletcommon.GetEventSignatureHash(walletcommon.Erc1155TransferBatchEventSignature), address, address, address}
}
logs = append(logs, log)
}
}
@ -365,19 +387,23 @@ func (tc *TestClient) prepareTokenBalanceHistory(toBlock int) {
transfersPerToken := map[common.Address][]testERC20Transfer{}
for _, transfer := range tc.outgoingERC20Transfers {
transfer.amount = new(big.Int).Neg(transfer.amount)
transfer.eventType = walletcommon.Erc20TransferEventType
transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer)
}
for _, transfer := range tc.incomingERC20Transfers {
transfer.eventType = walletcommon.Erc20TransferEventType
transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer)
}
for _, transfer := range tc.outgoingERC1155SingleTransfers {
transfer.amount = new(big.Int).Neg(transfer.amount)
transfer.eventType = walletcommon.Erc1155TransferSingleEventType
transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer)
}
for _, transfer := range tc.incomingERC1155SingleTransfers {
transfer.eventType = walletcommon.Erc1155TransferSingleEventType
transfersPerToken[transfer.address] = append(transfersPerToken[transfer.address], transfer)
}
@ -392,7 +418,7 @@ func (tc *TestClient) prepareTokenBalanceHistory(toBlock int) {
currentBalance := big.NewInt(0)
tc.tokenBalanceHistory[token] = map[uint64]*big.Int{}
transfers = append(transfers, testERC20Transfer{big.NewInt(int64(toBlock + 1)), token, big.NewInt(0)})
transfers = append(transfers, testERC20Transfer{big.NewInt(int64(toBlock + 1)), token, big.NewInt(0), walletcommon.Erc20TransferEventType})
for _, transfer := range transfers {
for blockN := currentBlock; blockN < transfer.block.Uint64(); blockN++ {
@ -576,6 +602,7 @@ type testERC20Transfer struct {
block *big.Int
address common.Address
amount *big.Int
eventType walletcommon.EventType
}
type findBlockCase struct {
@ -614,7 +641,7 @@ func getCases() []findBlockCase {
{75, 0, 1},
},
outgoingERC20Transfers: []testERC20Transfer{
{big.NewInt(6), tokenTXXAddress, big.NewInt(1)},
{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType},
},
toBlock: 100,
expectedBlocksFound: 6,
@ -682,7 +709,7 @@ func getCases() []findBlockCase {
rangeSize: 20,
expectedBlocksFound: 1,
incomingERC20Transfers: []testERC20Transfer{
{big.NewInt(6), tokenTXXAddress, big.NewInt(1)},
{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType},
},
}
@ -694,8 +721,8 @@ func getCases() []findBlockCase {
incomingERC20Transfers: []testERC20Transfer{
// edge case when a regular scan will find transfer at 80,
// but erc20 tail scan should only find transfer at block 6
{big.NewInt(80), tokenTXXAddress, big.NewInt(1)},
{big.NewInt(6), tokenTXXAddress, big.NewInt(1)},
{big.NewInt(80), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType},
{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType},
},
expectedCalls: map[string]int{
"FilterLogs": 5,
@ -711,8 +738,8 @@ func getCases() []findBlockCase {
// thus only 2 blocks found
expectedBlocksFound: 2,
incomingERC20Transfers: []testERC20Transfer{
{big.NewInt(7), tokenTXYAddress, big.NewInt(1)},
{big.NewInt(6), tokenTXXAddress, big.NewInt(1)},
{big.NewInt(7), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType},
{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType},
},
expectedCalls: map[string]int{
"FilterLogs": 5,
@ -741,8 +768,8 @@ func getCases() []findBlockCase {
// thus only 2 blocks found
expectedBlocksFound: 2,
incomingERC1155SingleTransfers: []testERC20Transfer{
{big.NewInt(7), tokenTXYAddress, big.NewInt(1)},
{big.NewInt(6), tokenTXXAddress, big.NewInt(1)},
{big.NewInt(7), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType},
{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType},
},
expectedCalls: map[string]int{
"FilterLogs": 5,
@ -758,8 +785,8 @@ func getCases() []findBlockCase {
rangeSize: 20,
expectedBlocksFound: 3,
outgoingERC1155SingleTransfers: []testERC20Transfer{
{big.NewInt(80), tokenTXYAddress, big.NewInt(1)},
{big.NewInt(6), tokenTXXAddress, big.NewInt(1)},
{big.NewInt(80), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType},
{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType},
},
expectedCalls: map[string]int{
"FilterLogs": 15, // 3 for each range
@ -774,10 +801,10 @@ func getCases() []findBlockCase {
rangeSize: 20,
expectedBlocksFound: 3,
outgoingERC1155SingleTransfers: []testERC20Transfer{
{big.NewInt(80), tokenTXYAddress, big.NewInt(1)},
{big.NewInt(80), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType},
},
outgoingERC20Transfers: []testERC20Transfer{
{big.NewInt(63), tokenTXYAddress, big.NewInt(1)},
{big.NewInt(63), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType},
},
expectedCalls: map[string]int{
"FilterLogs": 6, // 3 for each range, 0 for tail check becauseERC20ScanByBalance returns no ranges
@ -792,10 +819,10 @@ func getCases() []findBlockCase {
rangeSize: 20,
expectedBlocksFound: 3,
outgoingERC1155SingleTransfers: []testERC20Transfer{
{big.NewInt(80), tokenTXYAddress, big.NewInt(1)},
{big.NewInt(80), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType},
},
outgoingERC20Transfers: []testERC20Transfer{
{big.NewInt(61), tokenTXYAddress, big.NewInt(1)},
{big.NewInt(61), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType},
},
expectedCalls: map[string]int{
"FilterLogs": 9, // 3 for each range of [40-100], 0 for tail check because ERC20ScanByBalance returns no ranges
@ -811,10 +838,10 @@ func getCases() []findBlockCase {
rangeSize: 20,
expectedBlocksFound: 2,
outgoingERC1155SingleTransfers: []testERC20Transfer{
{big.NewInt(85), tokenTXYAddress, big.NewInt(1)},
{big.NewInt(85), tokenTXYAddress, big.NewInt(1), walletcommon.Erc1155TransferSingleEventType},
},
incomingERC20Transfers: []testERC20Transfer{
{big.NewInt(88), tokenTXYAddress, big.NewInt(1)},
{big.NewInt(88), tokenTXYAddress, big.NewInt(1), walletcommon.Erc20TransferEventType},
},
expectedCalls: map[string]int{
"FilterLogs": 3, // 3 for each range of [40-100], 0 for tail check because ERC20ScanByBalance returns no ranges
@ -827,7 +854,7 @@ func getCases() []findBlockCase {
{75, 0, 1},
},
outgoingERC20Transfers: []testERC20Transfer{
{big.NewInt(80), tokenTXXAddress, big.NewInt(4)},
{big.NewInt(80), tokenTXXAddress, big.NewInt(4), walletcommon.Erc20TransferEventType},
},
toBlock: 100,
rangeSize: 20,
@ -885,8 +912,8 @@ func TestFindBlocksCommand(t *testing.T) {
incomingERC1155SingleTransfers: testCase.incomingERC1155SingleTransfers,
callsCounter: map[string]int{},
}
//tc.traceAPICalls = true
//tc.printPreparedData = true
// tc.traceAPICalls = true
// tc.printPreparedData = true
tc.prepareBalanceHistory(100)
tc.prepareTokenBalanceHistory(100)
blockChannel := make(chan []*DBHeader, 100)
@ -1046,7 +1073,6 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) {
chainClient: tc,
feed: &event.Feed{},
balanceCacher: balance.NewCacherWithTTL(5 * time.Minute),
errorsCount: 0,
transactionManager: tm,
pendingTxManager: tracker,
tokenManager: tokenManager,
@ -1060,7 +1086,11 @@ func TestFetchTransfersForLoadedBlocks(t *testing.T) {
ctx := context.Background()
group := async.NewGroup(ctx)
err = cmd.fetchHistoryBlocks(ctx, group, blockChannel)
fromNum := big.NewInt(0)
toNum, err := getHeadBlockNumber(ctx, cmd.chainClient)
require.NoError(t, err)
err = cmd.fetchHistoryBlocks(ctx, group, fromNum, toNum, blockChannel)
require.NoError(t, err)
select {

View File

@ -244,7 +244,7 @@ func (db *Database) GetTransfersForIdentities(ctx context.Context, identities []
query := newTransfersQuery()
for _, identity := range identities {
subQuery := newSubQuery()
subQuery = subQuery.FilterNetwork(uint64(identity.ChainID)).FilterTransactionHash(identity.Hash).FilterAddress(identity.Address)
subQuery = subQuery.FilterNetwork(uint64(identity.ChainID)).FilterTransactionID(identity.Hash).FilterAddress(identity.Address)
query.addSubQuery(subQuery, OrSeparator)
}
rows, err := db.client.QueryContext(ctx, query.String(), query.Args()...)
@ -255,8 +255,8 @@ func (db *Database) GetTransfersForIdentities(ctx context.Context, identities []
return query.TransferScan(rows)
}
func (db *Database) GetTransactionsToLoad(chainID uint64, address common.Address, blockNumber *big.Int) (rst []PreloadedTransaction, err error) {
query := newTransfersQuery().
func (db *Database) GetTransactionsToLoad(chainID uint64, address common.Address, blockNumber *big.Int) (rst []*PreloadedTransaction, err error) {
query := newTransfersQueryForPreloadedTransactions().
FilterNetwork(chainID).
FilterAddress(address).
FilterBlockNumber(blockNumber).
@ -354,8 +354,8 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco
}
insertTx, err := creator.Prepare(`INSERT OR IGNORE
INTO transfers (network_id, address, sender, hash, blk_number, blk_hash, type, timestamp, log, loaded, log_index)
VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, 0, ?)`)
INTO transfers (network_id, address, sender, hash, blk_number, blk_hash, type, timestamp, log, loaded, log_index, token_id, amount_padded128hex)
VALUES (?, ?, ?, ?, ?, ?, ?, 0, ?, 0, ?, ?, ?)`)
if err != nil {
return err
}
@ -383,9 +383,11 @@ func insertBlocksWithTransactions(chainID uint64, creator statementCreator, acco
continue
}
_, err = insertTx.Exec(chainID, account, account, transaction.ID, (*bigint.SQLBigInt)(header.Number), header.Hash, w_common.Erc20Transfer, &JSONBlob{transaction.Log}, logIndex)
tokenID := (*bigint.SQLBigIntBytes)(transaction.TokenID)
txValue := sqlite.BigIntToPadded128BitsStr(transaction.Value)
_, err = insertTx.Exec(chainID, account, account, transaction.ID, (*bigint.SQLBigInt)(header.Number), header.Hash, transaction.Type, &JSONBlob{transaction.Log}, logIndex, tokenID, txValue)
if err != nil {
log.Error("error saving Erc20transfer", "err", err)
log.Error("error saving token transfer", "err", err)
return err
}
}
@ -428,7 +430,14 @@ func updateOrInsertTransfers(chainID uint64, creator statementCreator, transfers
var txTo *common.Address
if t.Transaction != nil {
if t.Log != nil {
_, tokenAddress, tokenID, txValue, txFrom, txTo = w_common.ExtractTokenIdentity(t.Type, t.Log, t.Transaction)
_, tokenAddress, txFrom, txTo = w_common.ExtractTokenTransferData(t.Type, t.Log, t.Transaction)
tokenID = t.TokenID
// Zero tokenID can be used for ERC721 and ERC1155 transfers but when serialzed/deserialized it becomes nil
// as 0 value of big.Int bytes is nil.
if tokenID == nil && (t.Type == w_common.Erc721Transfer || t.Type == w_common.Erc1155Transfer) {
tokenID = big.NewInt(0)
}
txValue = t.TokenValue
} else {
txValue = new(big.Int).Set(t.Transaction.Value())
txFrom = &t.From

View File

@ -121,7 +121,7 @@ func TestDBReorgTransfers(t *testing.T) {
}
require.NoError(t, db.ProcessBlocks(777, original.Address, original.Number, lastBlock, []*DBHeader{original}))
require.NoError(t, db.ProcessTransfers(777, []Transfer{
{w_common.EthTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, true, 1777, common.Address{1}, rcpt, nil, "2100", NoMultiTransactionID},
{w_common.EthTransfer, common.Hash{1}, *originalTX.To(), original.Number, original.Hash, 100, originalTX, true, 1777, common.Address{1}, rcpt, nil, nil, nil, "2100", NoMultiTransactionID},
}, []*DBHeader{}))
nonce = int64(0)
lastBlock = &Block{
@ -131,7 +131,7 @@ func TestDBReorgTransfers(t *testing.T) {
}
require.NoError(t, db.ProcessBlocks(777, replaced.Address, replaced.Number, lastBlock, []*DBHeader{replaced}))
require.NoError(t, db.ProcessTransfers(777, []Transfer{
{w_common.EthTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, true, 1777, common.Address{1}, rcpt, nil, "2100", NoMultiTransactionID},
{w_common.EthTransfer, common.Hash{2}, *replacedTX.To(), replaced.Number, replaced.Hash, 100, replacedTX, true, 1777, common.Address{1}, rcpt, nil, nil, nil, "2100", NoMultiTransactionID},
}, []*DBHeader{original}))
all, err := db.GetTransfers(777, big.NewInt(0), nil)

View File

@ -2,7 +2,6 @@ package transfer
import (
"context"
"encoding/binary"
"errors"
"math/big"
"time"
@ -11,20 +10,12 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/rpc/chain"
w_common "github.com/status-im/status-go/services/wallet/common"
)
func getLogSubTxID(log types.Log) common.Hash {
// Get unique ID by using TxHash and log index
index := [4]byte{}
binary.BigEndian.PutUint32(index[:], uint32(log.Index))
return crypto.Keccak256Hash(log.TxHash.Bytes(), index[:])
}
var (
zero = big.NewInt(0)
one = big.NewInt(1)
@ -32,26 +23,23 @@ var (
)
// Partial transaction info obtained by ERC20Downloader.
// A PreloadedTransaction represents a Transaction which contains one or more
// ERC20/ERC721 transfer events.
// To be converted into one or many Transfer objects post-indexing.
// A PreloadedTransaction represents a Transaction which contains one
// ERC20/ERC721/ERC1155 transfer event.
// To be converted into one Transfer object post-indexing.
type PreloadedTransaction struct {
NetworkID uint64
Type w_common.Type `json:"type"`
ID common.Hash `json:"-"`
Address common.Address `json:"address"`
BlockNumber *big.Int `json:"blockNumber"`
BlockHash common.Hash `json:"blockhash"`
Loaded bool
// From is derived from tx signature in order to offload this computation from UI component.
From common.Address `json:"from"`
// Log that was used to generate preloaded transaction.
Log *types.Log `json:"log"`
BaseGasFees string
TokenID *big.Int `json:"tokenId"`
Value *big.Int `json:"value"`
}
// Transfer stores information about transfer.
// A Transfer represents a plain ETH transfer or some token activity inside a Transaction
// Since ERC1155 transfers can contain multiple tokens, a single Transfer represents a single token transfer,
// that means ERC1155 batch transfers will be represented by multiple Transfer objects.
type Transfer struct {
Type w_common.Type `json:"type"`
ID common.Hash `json:"-"`
@ -67,12 +55,16 @@ type Transfer struct {
Receipt *types.Receipt `json:"receipt"`
// Log that was used to generate erc20 transfer. Nil for eth transfer.
Log *types.Log `json:"log"`
// TokenID is the id of the transferred token. Nil for eth transfer.
TokenID *big.Int `json:"tokenId"`
// TokenValue is the value of the token transfer. Nil for eth transfer.
TokenValue *big.Int `json:"tokenValue"`
BaseGasFees string
// Internal field that is used to track multi-transaction transfers.
MultiTransactionID MultiTransactionIDType `json:"multi_transaction_id"`
}
// ETHDownloader downloads regular eth transfers.
// ETHDownloader downloads regular eth transfers and tokens transfers.
type ETHDownloader struct {
chainClient chain.ClientInterface
accounts []common.Address
@ -137,12 +129,70 @@ func getTransferByHash(ctx context.Context, client chain.ClientInterface, signer
return transfer, nil
}
func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) (rst []Transfer, err error) {
func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Block, accounts []common.Address) ([]Transfer, error) {
startTs := time.Now()
rst := make([]Transfer, 0, len(blk.Transactions()))
receiptsByAddressAndTxHash := make(map[common.Address]map[common.Hash]*types.Receipt)
txsByAddressAndTxHash := make(map[common.Address]map[common.Hash]*types.Transaction)
addReceiptToCache := func(address common.Address, txHash common.Hash, receipt *types.Receipt) {
if receiptsByAddressAndTxHash[address] == nil {
receiptsByAddressAndTxHash[address] = make(map[common.Hash]*types.Receipt)
}
receiptsByAddressAndTxHash[address][txHash] = receipt
}
addTxToCache := func(address common.Address, txHash common.Hash, tx *types.Transaction) {
if txsByAddressAndTxHash[address] == nil {
txsByAddressAndTxHash[address] = make(map[common.Hash]*types.Transaction)
}
txsByAddressAndTxHash[address][txHash] = tx
}
getReceiptFromCache := func(address common.Address, txHash common.Hash) *types.Receipt {
if receiptsByAddressAndTxHash[address] == nil {
return nil
}
return receiptsByAddressAndTxHash[address][txHash]
}
getTxFromCache := func(address common.Address, txHash common.Hash) *types.Transaction {
if txsByAddressAndTxHash[address] == nil {
return nil
}
return txsByAddressAndTxHash[address][txHash]
}
getReceipt := func(address common.Address, txHash common.Hash) (receipt *types.Receipt, err error) {
receipt = getReceiptFromCache(address, txHash)
if receipt == nil {
receipt, err = d.fetchTransactionReceipt(ctx, txHash)
if err != nil {
return nil, err
}
addReceiptToCache(address, txHash, receipt)
}
return receipt, nil
}
getTx := func(address common.Address, txHash common.Hash) (tx *types.Transaction, err error) {
tx = getTxFromCache(address, txHash)
if tx == nil {
tx, err = d.fetchTransaction(ctx, txHash)
if err != nil {
return nil, err
}
addTxToCache(address, txHash, tx)
}
return tx, nil
}
for _, address := range accounts {
// During block discovery, we should have populated the DB with 1 item per Transaction containing
// erc20/erc721 transfers
// During block discovery, we should have populated the DB with 1 item per transfer log containing
// erc20/erc721/erc1155 transfers.
// ID is a hash of the tx hash and the log index. log_index is unique per ERC20/721 tx, but not per ERC1155 tx.
transactionsToLoad, err := d.db.GetTransactionsToLoad(d.chainClient.NetworkID(), address, blk.Number())
if err != nil {
return nil, err
@ -150,10 +200,22 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc
areSubTxsCheckedForTxHash := make(map[common.Hash]bool)
log.Debug("getTransfersInBlock", "block", blk.Number(), "transactionsToLoad", len(transactionsToLoad))
for _, t := range transactionsToLoad {
subtransactions, err := d.subTransactionsFromTransactionHash(ctx, t.Log.TxHash, address)
receipt, err := getReceipt(address, t.Log.TxHash)
if err != nil {
log.Error("can't fetch subTxs for erc20/erc721 transfer", "error", err)
return nil, err
}
tx, err := getTx(address, t.Log.TxHash)
if err != nil {
return nil, err
}
subtransactions, err := d.subTransactionsFromPreloaded(t, tx, receipt, blk)
if err != nil {
log.Error("can't fetch subTxs for erc20/erc721/erc1155 transfer", "error", err)
return nil, err
}
rst = append(rst, subtransactions...)
@ -188,12 +250,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc
}
if isPlainTransfer || mustCheckSubTxs {
receipt, err := d.chainClient.TransactionReceipt(ctx, tx.Hash())
if err != nil {
return nil, err
}
baseGasFee, err := d.chainClient.GetBaseFeeFromBlock(blk.Number())
receipt, err := getReceipt(address, tx.Hash())
if err != nil {
return nil, err
}
@ -201,7 +258,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc
// Since we've already got the receipt, check for subTxs of
// interest in case we haven't already.
if !areSubTxsCheckedForTxHash[tx.Hash()] {
subtransactions, err := d.subTransactionsFromTransactionData(tx, receipt, blk, baseGasFee, address)
subtransactions, err := d.subTransactionsFromTransactionData(address, from, tx, receipt, blk)
if err != nil {
log.Error("can't fetch subTxs for eth transfer", "error", err)
return nil, err
@ -224,7 +281,7 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc
From: from,
Receipt: receipt,
Log: nil,
BaseGasFees: baseGasFee,
BaseGasFees: blk.BaseFee().String(),
MultiTransactionID: NoMultiTransactionID})
}
}
@ -284,44 +341,40 @@ func (d *ERC20TransfersDownloader) outboundTopics(address common.Address) [][]co
}
func (d *ERC20TransfersDownloader) inboundERC20OutboundERC1155Topics(address common.Address) [][]common.Hash {
return [][]common.Hash{{d.signature, d.signatureErc1155Single}, {}, {d.paddedAddress(address)}}
return [][]common.Hash{{d.signature, d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {d.paddedAddress(address)}}
}
func (d *ERC20TransfersDownloader) inboundTopicsERC1155Single(address common.Address) [][]common.Hash {
return [][]common.Hash{{d.signatureErc1155Single}, {}, {}, {d.paddedAddress(address)}}
func (d *ERC20TransfersDownloader) inboundTopicsERC1155(address common.Address) [][]common.Hash {
return [][]common.Hash{{d.signatureErc1155Single, d.signatureErc1155Batch}, {}, {}, {d.paddedAddress(address)}}
}
func (d *ETHDownloader) subTransactionsFromTransactionHash(parent context.Context, txHash common.Hash, address common.Address) ([]Transfer, error) {
func (d *ETHDownloader) fetchTransactionReceipt(parent context.Context, txHash common.Hash) (*types.Receipt, error) {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
tx, _, err := d.chainClient.TransactionByHash(ctx, txHash)
cancel()
if err != nil {
return nil, err
}
ctx, cancel = context.WithTimeout(parent, 3*time.Second)
receipt, err := d.chainClient.TransactionReceipt(ctx, txHash)
cancel()
if err != nil {
return nil, err
}
return receipt, nil
}
ctx, cancel = context.WithTimeout(parent, 3*time.Second)
blk, err := d.chainClient.BlockByHash(ctx, receipt.BlockHash)
func (d *ETHDownloader) fetchTransaction(parent context.Context, txHash common.Hash) (*types.Transaction, error) {
ctx, cancel := context.WithTimeout(parent, 3*time.Second)
tx, _, err := d.chainClient.TransactionByHash(ctx, txHash) // TODO Save on requests by checking in the DB first
cancel()
if err != nil {
return nil, err
}
baseGasFee, err := d.chainClient.GetBaseFeeFromBlock(receipt.BlockNumber)
if err != nil {
return nil, err
}
return d.subTransactionsFromTransactionData(tx, receipt, blk, baseGasFee, address)
return tx, nil
}
func (d *ETHDownloader) subTransactionsFromTransactionData(tx *types.Transaction, receipt *types.Receipt, blk *types.Block, baseGasFee string, address common.Address) ([]Transfer, error) {
func (d *ETHDownloader) subTransactionsFromPreloaded(preloadedTx *PreloadedTransaction, tx *types.Transaction, receipt *types.Receipt, blk *types.Block) ([]Transfer, error) {
log.Debug("subTransactionsFromPreloaded start", "txHash", tx.Hash().Hex(), "address", preloadedTx.Address, "tokenID", preloadedTx.TokenID, "value", preloadedTx.Value)
address := preloadedTx.Address
txLog := preloadedTx.Log
rst := make([]Transfer, 0, 1)
from, err := types.Sender(d.signer, tx)
if err != nil {
if err == core.ErrTxTypeNotSupported {
@ -330,47 +383,63 @@ func (d *ETHDownloader) subTransactionsFromTransactionData(tx *types.Transaction
return nil, err
}
rst := make([]Transfer, 0, len(receipt.Logs))
for _, log := range receipt.Logs {
eventType := w_common.GetEventType(log)
eventType := w_common.GetEventType(preloadedTx.Log)
// Only add ERC20/ERC721/ERC1155 transfers from/to the given account
// Other types of events get always added
mustAppend := false
// from/to matching is already handled by getLogs filter
switch eventType {
case w_common.Erc20TransferEventType:
trFrom, trTo, _ := w_common.ParseErc20TransferLog(log)
if trFrom == address || trTo == address {
mustAppend = true
}
case w_common.Erc721TransferEventType:
trFrom, trTo, _ := w_common.ParseErc721TransferLog(log)
if trFrom == address || trTo == address {
mustAppend = true
}
case w_common.Erc1155TransferSingleEventType:
_, trFrom, trTo, _, _ := w_common.ParseErc1155TransferSingleLog(log)
if trFrom == address || trTo == address {
mustAppend = true
}
case w_common.UniswapV2SwapEventType, w_common.UniswapV3SwapEventType:
mustAppend = true
case w_common.HopBridgeTransferSentToL2EventType, w_common.HopBridgeTransferFromL1CompletedEventType:
mustAppend = true
case w_common.HopBridgeWithdrawalBondedEventType, w_common.HopBridgeTransferSentEventType:
mustAppend = true
}
if mustAppend {
case w_common.Erc20TransferEventType,
w_common.Erc721TransferEventType,
w_common.Erc1155TransferSingleEventType, w_common.Erc1155TransferBatchEventType:
log.Debug("subTransactionsFromPreloaded transfer", "eventType", eventType, "logIdx", txLog.Index, "txHash", tx.Hash().Hex(), "address", address.Hex(), "tokenID", preloadedTx.TokenID, "value", preloadedTx.Value, "baseFee", blk.BaseFee().String())
transfer := Transfer{
Type: w_common.EventTypeToSubtransactionType(eventType),
ID: getLogSubTxID(*log),
ID: preloadedTx.ID,
Address: address,
BlockNumber: new(big.Int).SetUint64(log.BlockNumber),
BlockHash: log.BlockHash,
BlockNumber: new(big.Int).SetUint64(txLog.BlockNumber),
BlockHash: txLog.BlockHash,
Loaded: true,
NetworkID: d.signer.ChainID().Uint64(),
From: from,
Log: log,
BaseGasFees: baseGasFee,
Log: txLog,
TokenID: preloadedTx.TokenID,
TokenValue: preloadedTx.Value,
BaseGasFees: blk.BaseFee().String(),
Transaction: tx,
Receipt: receipt,
Timestamp: blk.Time(),
MultiTransactionID: NoMultiTransactionID,
}
rst = append(rst, transfer)
}
log.Debug("subTransactionsFromPreloaded end", "txHash", tx.Hash().Hex(), "address", address.Hex(), "tokenID", preloadedTx.TokenID, "value", preloadedTx.Value)
return rst, nil
}
func (d *ETHDownloader) subTransactionsFromTransactionData(address, from common.Address, tx *types.Transaction, receipt *types.Receipt, blk *types.Block) ([]Transfer, error) {
log.Debug("subTransactionsFromTransactionData start", "txHash", tx.Hash().Hex(), "address", address)
rst := make([]Transfer, 0, 1)
for _, txLog := range receipt.Logs {
eventType := w_common.GetEventType(txLog)
switch eventType {
case w_common.UniswapV2SwapEventType, w_common.UniswapV3SwapEventType,
w_common.HopBridgeTransferSentToL2EventType, w_common.HopBridgeTransferFromL1CompletedEventType,
w_common.HopBridgeWithdrawalBondedEventType, w_common.HopBridgeTransferSentEventType:
transfer := Transfer{
Type: w_common.EventTypeToSubtransactionType(eventType),
ID: w_common.GetLogSubTxID(*txLog),
Address: address,
BlockNumber: new(big.Int).SetUint64(txLog.BlockNumber),
BlockHash: txLog.BlockHash,
Loaded: true,
NetworkID: d.signer.ChainID().Uint64(),
From: from,
Log: txLog,
BaseGasFees: blk.BaseFee().String(),
Transaction: tx,
Receipt: receipt,
Timestamp: blk.Time(),
@ -381,11 +450,13 @@ func (d *ETHDownloader) subTransactionsFromTransactionData(tx *types.Transaction
}
}
log.Debug("subTransactionsFromTransactionData end", "txHash", tx.Hash().Hex(), "address", address.Hex())
return rst, nil
}
func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs []types.Log, address common.Address) ([]*DBHeader, error) {
concurrent := NewConcurrentDownloader(parent, NoThreadLimit)
for i := range logs {
l := logs[i]
@ -393,32 +464,39 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [
continue
}
id := getLogSubTxID(l)
baseGasFee, err := d.client.GetBaseFeeFromBlock(new(big.Int).SetUint64(l.BlockNumber))
from, to, txIDs, tokenIDs, values, err := w_common.ParseTransferLog(l)
if err != nil {
return nil, err
log.Error("failed to parse transfer log", "log", l, "address", address, "error", err)
continue
}
logType := w_common.EventTypeToSubtransactionType(w_common.GetEventType(&l))
log.Debug("block from logs", "block", l.BlockNumber, "log", l, "logType", logType, "address", address, "id", id)
// Double check provider returned the correct log
if from != address && to != address {
log.Error("from/to address mismatch", "log", l, "address", address)
continue
}
if logType != w_common.Erc1155SingleTransfer && logType != w_common.Erc1155BatchTransfer {
logType = w_common.Erc20Transfer
eventType := w_common.GetEventType(&l)
logType := w_common.EventTypeToSubtransactionType(eventType)
for i, txID := range txIDs {
log.Debug("block from logs", "block", l.BlockNumber, "log", l, "logType", logType, "address", address, "txID", txID)
// For ERC20 there is no tokenID, so we use nil
var tokenID *big.Int
if len(tokenIDs) > i {
tokenID = tokenIDs[i]
}
header := &DBHeader{
Number: big.NewInt(int64(l.BlockNumber)),
Hash: l.BlockHash,
PreloadedTransactions: []*PreloadedTransaction{{
Address: address,
BlockNumber: big.NewInt(int64(l.BlockNumber)),
BlockHash: l.BlockHash,
ID: id,
From: address,
Loaded: false,
ID: txID,
Type: logType,
Log: &l,
BaseGasFees: baseGasFee,
TokenID: tokenID,
Value: values[i],
}},
Loaded: false,
}
@ -428,6 +506,7 @@ func (d *ERC20TransfersDownloader) blocksFromLogs(parent context.Context, logs [
return nil
})
}
}
select {
case <-concurrent.WaitAsync():
case <-parent.Done():
@ -478,7 +557,7 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro
inbound1155, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: from,
ToBlock: to,
Topics: d.inboundTopicsERC1155Single(address),
Topics: d.inboundTopicsERC1155(address),
})
if err != nil {
return nil, err

View File

@ -3,6 +3,7 @@ package transfer
import (
"bytes"
"database/sql"
"fmt"
"math/big"
"github.com/ethereum/go-ethereum/common"
@ -10,7 +11,8 @@ import (
"github.com/status-im/status-go/services/wallet/bigint"
)
const baseTransfersQuery = "SELECT hash, type, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log, network_id, base_gas_fee, COALESCE(multi_transaction_id, 0) FROM transfers"
const baseTransfersQuery = "SELECT hash, type, blk_hash, blk_number, timestamp, address, tx, sender, receipt, log, network_id, base_gas_fee, COALESCE(multi_transaction_id, 0) %s FROM transfers"
const preloadedTransfersQuery = "SELECT hash, type, address, log, token_id, amount_padded128hex FROM transfers"
type transfersQuery struct {
buf *bytes.Buffer
@ -21,7 +23,14 @@ type transfersQuery struct {
func newTransfersQuery() *transfersQuery {
newQuery := newEmptyQuery()
newQuery.buf.WriteString(baseTransfersQuery)
transfersQueryString := fmt.Sprintf(baseTransfersQuery, "")
newQuery.buf.WriteString(transfersQueryString)
return newQuery
}
func newTransfersQueryForPreloadedTransactions() *transfersQuery {
newQuery := newEmptyQuery()
newQuery.buf.WriteString(preloadedTransfersQuery)
return newQuery
}
@ -110,13 +119,20 @@ func (q *transfersQuery) FilterAddress(address common.Address) *transfersQuery {
return q
}
func (q *transfersQuery) FilterTransactionHash(hash common.Hash) *transfersQuery {
func (q *transfersQuery) FilterTransactionID(hash common.Hash) *transfersQuery {
q.addWhereSeparator(AndSeparator)
q.buf.WriteString(" hash = ?")
q.args = append(q.args, hash)
return q
}
func (q *transfersQuery) FilterTransactionHash(hash common.Hash) *transfersQuery {
q.addWhereSeparator(AndSeparator)
q.buf.WriteString(" tx_hash = ?")
q.args = append(q.args, hash)
return q
}
func (q *transfersQuery) FilterBlockHash(blockHash common.Hash) *transfersQuery {
q.addWhereSeparator(AndSeparator)
q.buf.WriteString(" blk_hash = ?")
@ -167,25 +183,48 @@ func (q *transfersQuery) TransferScan(rows *sql.Rows) (rst []Transfer, err error
return rst, nil
}
func (q *transfersQuery) PreloadedTransactionScan(rows *sql.Rows) (rst []PreloadedTransaction, err error) {
transfers, err := q.TransferScan(rows)
if err != nil {
return
func (q *transfersQuery) PreloadedTransactionScan(rows *sql.Rows) (rst []*PreloadedTransaction, err error) {
transfers := make([]Transfer, 0)
for rows.Next() {
transfer := Transfer{
Log: &types.Log{},
}
tokenValue := sql.NullString{}
tokenID := sql.RawBytes{}
err = rows.Scan(
&transfer.ID, &transfer.Type,
&transfer.Address,
&JSONBlob{transfer.Log},
&tokenID, &tokenValue)
if len(tokenID) > 0 {
transfer.TokenID = new(big.Int).SetBytes(tokenID)
}
rst = make([]PreloadedTransaction, 0, len(transfers))
if tokenValue.Valid {
var ok bool
transfer.TokenValue, ok = new(big.Int).SetString(tokenValue.String, 16)
if !ok {
panic("failed to parse token value")
}
}
if err != nil {
return nil, err
}
transfers = append(transfers, transfer)
}
rst = make([]*PreloadedTransaction, 0, len(transfers))
for _, transfer := range transfers {
preloadedTransaction := PreloadedTransaction{
preloadedTransaction := &PreloadedTransaction{
ID: transfer.ID,
Type: transfer.Type,
BlockHash: transfer.BlockHash,
BlockNumber: transfer.BlockNumber,
Address: transfer.Address,
From: transfer.From,
Log: transfer.Log,
NetworkID: transfer.NetworkID,
BaseGasFees: transfer.BaseGasFees,
TokenID: transfer.TokenID,
Value: transfer.TokenValue,
}
rst = append(rst, preloadedTransaction)