[#3930] Make sure eth_getLogs for outgoing transfers are not executed for erc20 tail

This commit is contained in:
Roman Volosovskyi 2023-10-05 11:11:47 +02:00
parent bb7273cf6f
commit 6a110ca3df
No known key found for this signature in database
GPG Key ID: 0238A4B5ECEE70DE
5 changed files with 151 additions and 58 deletions

View File

@ -793,7 +793,7 @@ func (c *findAndCheckBlockRangeCommand) fastIndexErc20(ctx context.Context, from
commands := make([]*erc20HistoricalCommand, len(c.accounts))
for i, address := range c.accounts {
erc20 := &erc20HistoricalCommand{
erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{address}, types.LatestSignerForChainID(c.chainClient.ToBigInt())),
erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{address}, types.LatestSignerForChainID(c.chainClient.ToBigInt()), false),
chainClient: c.chainClient,
feed: c.feed,
address: address,

View File

@ -132,7 +132,7 @@ func (c *findBlocksCommand) ERC20ScanByBalance(parent context.Context, fromBlock
if fromBalance.Cmp(toBalance) != 0 {
diff := new(big.Int).Sub(to, from)
if diff.Cmp(batchSize) <= 0 {
headers, err := c.fastIndexErc20(parent, from, to)
headers, err := c.fastIndexErc20(parent, from, to, true)
if err != nil {
return nil, err
}
@ -314,7 +314,7 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to
// There could be incoming ERC20 transfers which don't change the balance
// and nonce of ETH account, so we keep looking for them
erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to)
erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to, false)
if err != nil {
log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err, "account", c.account, "chain", c.chainClient.NetworkID())
c.error = err
@ -424,13 +424,13 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cache
// run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header.
func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber *big.Int,
toBlockNumber *big.Int) ([]*DBHeader, error) {
toBlockNumber *big.Int, incomingOnly bool) ([]*DBHeader, error) {
start := time.Now()
group := async.NewGroup(ctx)
erc20 := &erc20HistoricalCommand{
erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{c.account}, types.LatestSignerForChainID(c.chainClient.ToBigInt())),
erc20: NewERC20TransfersDownloader(c.chainClient, []common.Address{c.account}, types.LatestSignerForChainID(c.chainClient.ToBigInt()), incomingOnly),
chainClient: c.chainClient,
feed: c.feed,
address: c.account,

View File

@ -5,6 +5,7 @@ import (
"math/big"
"sort"
"strings"
"sync"
"testing"
"time"
@ -42,37 +43,72 @@ type TestClient struct {
nonceHistory map[uint64]uint64
traceAPICalls bool
printPreparedData bool
rw sync.RWMutex
callsCounter map[string]int
}
func (tc TestClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
func (tc *TestClient) incCounter(method string) {
tc.rw.Lock()
defer tc.rw.Unlock()
tc.callsCounter[method] = tc.callsCounter[method] + 1
}
func (tc *TestClient) getCounter() int {
tc.rw.RLock()
defer tc.rw.RUnlock()
cnt := 0
for _, v := range tc.callsCounter {
cnt += v
}
return cnt
}
func (tc *TestClient) printCounter() {
total := tc.getCounter()
tc.rw.RLock()
defer tc.rw.RUnlock()
tc.t.Log("========================================= Total calls", total)
for k, v := range tc.callsCounter {
tc.t.Log(k, v)
}
tc.t.Log("=========================================")
}
func (tc *TestClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
if tc.traceAPICalls {
tc.t.Log("BatchCallContext")
}
return nil
}
func (tc TestClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
func (tc *TestClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) {
tc.incCounter("HeaderByHash")
if tc.traceAPICalls {
tc.t.Log("HeaderByHash")
}
return nil, nil
}
func (tc TestClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
func (tc *TestClient) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) {
tc.incCounter("BlockByHash")
if tc.traceAPICalls {
tc.t.Log("BlockByHash")
}
return nil, nil
}
func (tc TestClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
func (tc *TestClient) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
tc.incCounter("BlockByNumber")
if tc.traceAPICalls {
tc.t.Log("BlockByNumber")
}
return nil, nil
}
func (tc TestClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
func (tc *TestClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) {
tc.incCounter("NonceAt")
nonce := tc.nonceHistory[blockNumber.Uint64()]
if tc.traceAPICalls {
tc.t.Log("NonceAt", blockNumber, "result:", nonce)
@ -80,7 +116,8 @@ func (tc TestClient) NonceAt(ctx context.Context, account common.Address, blockN
return nonce, nil
}
func (tc TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
func (tc *TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) {
tc.incCounter("FilterLogs")
if tc.traceAPICalls {
tc.t.Log("FilterLogs")
}
@ -104,7 +141,8 @@ func (tc TestClient) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]
return logs, nil
}
func (tc TestClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
func (tc *TestClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) {
tc.incCounter("BalanceAt")
balance := tc.balanceHistory[blockNumber.Uint64()]
if tc.traceAPICalls {
@ -113,7 +151,7 @@ func (tc TestClient) BalanceAt(ctx context.Context, account common.Address, bloc
return balance, nil
}
func (tc TestClient) tokenBalanceAt(token common.Address, blockNumber *big.Int) *big.Int {
func (tc *TestClient) tokenBalanceAt(token common.Address, blockNumber *big.Int) *big.Int {
balance := tc.tokenBalanceHistory[token][blockNumber.Uint64()]
if tc.traceAPICalls {
@ -123,6 +161,7 @@ func (tc TestClient) tokenBalanceAt(token common.Address, blockNumber *big.Int)
}
func (tc *TestClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
tc.incCounter("HeaderByNumber")
if tc.traceAPICalls {
tc.t.Log("HeaderByNumber", number)
}
@ -134,7 +173,8 @@ func (tc *TestClient) HeaderByNumber(ctx context.Context, number *big.Int) (*typ
return header, nil
}
func (tc TestClient) FullTransactionByBlockNumberAndIndex(ctx context.Context, blockNumber *big.Int, index uint) (*chain.FullTransaction, error) {
func (tc *TestClient) FullTransactionByBlockNumberAndIndex(ctx context.Context, blockNumber *big.Int, index uint) (*chain.FullTransaction, error) {
tc.incCounter("FullTransactionByBlockNumberAndIndex")
if tc.traceAPICalls {
tc.t.Log("FullTransactionByBlockNumberAndIndex")
}
@ -150,18 +190,19 @@ func (tc TestClient) FullTransactionByBlockNumberAndIndex(ctx context.Context, b
return tx, nil
}
func (tc TestClient) GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) {
func (tc *TestClient) GetBaseFeeFromBlock(blockNumber *big.Int) (string, error) {
tc.incCounter("GetBaseFeeFromBlock")
if tc.traceAPICalls {
tc.t.Log("GetBaseFeeFromBloc")
tc.t.Log("GetBaseFeeFromBlock")
}
return "", nil
}
func (tc TestClient) NetworkID() uint64 {
func (tc *TestClient) NetworkID() uint64 {
return 777333
}
func (tc TestClient) ToBigInt() *big.Int {
func (tc *TestClient) ToBigInt() *big.Int {
if tc.traceAPICalls {
tc.t.Log("ToBigInt")
}
@ -170,7 +211,8 @@ func (tc TestClient) ToBigInt() *big.Int {
var ethscanAddress = common.HexToAddress("0x0000000000000000000000000000000000777333")
func (tc TestClient) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) {
func (tc *TestClient) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) {
tc.incCounter("CodeAt")
if tc.traceAPICalls {
tc.t.Log("CodeAt", contract, blockNumber)
}
@ -182,7 +224,8 @@ func (tc TestClient) CodeAt(ctx context.Context, contract common.Address, blockN
return nil, nil
}
func (tc TestClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) {
func (tc *TestClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) {
tc.incCounter("CallContract")
if tc.traceAPICalls {
tc.t.Log("CallContract", call, blockNumber, call.To)
}
@ -223,8 +266,8 @@ func (tc TestClient) CallContract(ctx context.Context, call ethereum.CallMsg, bl
return output, nil
}
if *call.To == tokenTXXAddress {
balance := tc.tokenBalanceAt(tokenTXXAddress, blockNumber)
if *call.To == tokenTXXAddress || *call.To == tokenTXZAddress {
balance := tc.tokenBalanceAt(*call.To, blockNumber)
parsed, err := abi.JSON(strings.NewReader(ierc20.IERC20ABI))
if err != nil {
@ -314,34 +357,37 @@ func (tc *TestClient) prepareTokenBalanceHistory(toBlock int) {
}
}
func (tc TestClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
func (tc *TestClient) CallContext(ctx context.Context, result interface{}, method string, args ...interface{}) error {
tc.incCounter("CallContext")
if tc.traceAPICalls {
tc.t.Log("CallContext")
}
return nil
}
func (tc TestClient) GetWalletNotifier() func(chainId uint64, message string) {
func (tc *TestClient) GetWalletNotifier() func(chainId uint64, message string) {
if tc.traceAPICalls {
tc.t.Log("GetWalletNotifier")
}
return nil
}
func (tc TestClient) SetWalletNotifier(notifier func(chainId uint64, message string)) {
func (tc *TestClient) SetWalletNotifier(notifier func(chainId uint64, message string)) {
if tc.traceAPICalls {
tc.t.Log("SetWalletNotifier")
}
}
func (tc TestClient) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) {
func (tc *TestClient) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) {
tc.incCounter("EstimateGas")
if tc.traceAPICalls {
tc.t.Log("EstimateGas")
}
return 0, nil
}
func (tc TestClient) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) {
func (tc *TestClient) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) {
tc.incCounter("PendingCodeAt")
if tc.traceAPICalls {
tc.t.Log("PendingCodeAt")
}
@ -349,7 +395,8 @@ func (tc TestClient) PendingCodeAt(ctx context.Context, account common.Address)
return nil, nil
}
func (tc TestClient) PendingCallContract(ctx context.Context, call ethereum.CallMsg) ([]byte, error) {
func (tc *TestClient) PendingCallContract(ctx context.Context, call ethereum.CallMsg) ([]byte, error) {
tc.incCounter("PendingCallContract")
if tc.traceAPICalls {
tc.t.Log("PendingCallContract")
}
@ -357,7 +404,8 @@ func (tc TestClient) PendingCallContract(ctx context.Context, call ethereum.Call
return nil, nil
}
func (tc TestClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
func (tc *TestClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) {
tc.incCounter("PendingNonceAt")
if tc.traceAPICalls {
tc.t.Log("PendingNonceAt")
}
@ -365,7 +413,8 @@ func (tc TestClient) PendingNonceAt(ctx context.Context, account common.Address)
return 0, nil
}
func (tc TestClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
func (tc *TestClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
tc.incCounter("SuggestGasPrice")
if tc.traceAPICalls {
tc.t.Log("SuggestGasPrice")
}
@ -373,7 +422,8 @@ func (tc TestClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) {
return nil, nil
}
func (tc TestClient) SendTransaction(ctx context.Context, tx *types.Transaction) error {
func (tc *TestClient) SendTransaction(ctx context.Context, tx *types.Transaction) error {
tc.incCounter("SendTransaction")
if tc.traceAPICalls {
tc.t.Log("SendTransaction")
}
@ -381,7 +431,8 @@ func (tc TestClient) SendTransaction(ctx context.Context, tx *types.Transaction)
return nil
}
func (tc TestClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
func (tc *TestClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
tc.incCounter("SuggestGasTipCap")
if tc.traceAPICalls {
tc.t.Log("SuggestGasTipCap")
}
@ -389,7 +440,8 @@ func (tc TestClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) {
return nil, nil
}
func (tc TestClient) BatchCallContextIgnoringLocalHandlers(ctx context.Context, b []rpc.BatchElem) error {
func (tc *TestClient) BatchCallContextIgnoringLocalHandlers(ctx context.Context, b []rpc.BatchElem) error {
tc.incCounter("BatchCallContextIgnoringLocalHandlers")
if tc.traceAPICalls {
tc.t.Log("BatchCallContextIgnoringLocalHandlers")
}
@ -397,7 +449,8 @@ func (tc TestClient) BatchCallContextIgnoringLocalHandlers(ctx context.Context,
return nil
}
func (tc TestClient) CallContextIgnoringLocalHandlers(ctx context.Context, result interface{}, method string, args ...interface{}) error {
func (tc *TestClient) CallContextIgnoringLocalHandlers(ctx context.Context, result interface{}, method string, args ...interface{}) error {
tc.incCounter("CallContextIgnoringLocalHandlers")
if tc.traceAPICalls {
tc.t.Log("CallContextIgnoringLocalHandlers")
}
@ -405,7 +458,8 @@ func (tc TestClient) CallContextIgnoringLocalHandlers(ctx context.Context, resul
return nil
}
func (tc TestClient) CallRaw(data string) string {
func (tc *TestClient) CallRaw(data string) string {
tc.incCounter("CallRaw")
if tc.traceAPICalls {
tc.t.Log("CallRaw")
}
@ -413,11 +467,12 @@ func (tc TestClient) CallRaw(data string) string {
return ""
}
func (tc TestClient) GetChainID() *big.Int {
func (tc *TestClient) GetChainID() *big.Int {
return big.NewInt(1)
}
func (tc TestClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
func (tc *TestClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) {
tc.incCounter("SubscribeFilterLogs")
if tc.traceAPICalls {
tc.t.Log("SubscribeFilterLogs")
}
@ -425,7 +480,8 @@ func (tc TestClient) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQ
return nil, nil
}
func (tc TestClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
func (tc *TestClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) {
tc.incCounter("TransactionReceipt")
if tc.traceAPICalls {
tc.t.Log("TransactionReceipt")
}
@ -433,7 +489,8 @@ func (tc TestClient) TransactionReceipt(ctx context.Context, txHash common.Hash)
return nil, nil
}
func (tc TestClient) TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) {
func (tc *TestClient) TransactionByHash(ctx context.Context, txHash common.Hash) (*types.Transaction, bool, error) {
tc.incCounter("TransactionByHash")
if tc.traceAPICalls {
tc.t.Log("TransactionByHash")
}
@ -441,20 +498,21 @@ func (tc TestClient) TransactionByHash(ctx context.Context, txHash common.Hash)
return nil, false, nil
}
func (tc TestClient) BlockNumber(ctx context.Context) (uint64, error) {
func (tc *TestClient) BlockNumber(ctx context.Context) (uint64, error) {
tc.incCounter("BlockNumber")
if tc.traceAPICalls {
tc.t.Log("BlockNumber")
}
return 0, nil
}
func (tc TestClient) SetIsConnected(value bool) {
func (tc *TestClient) SetIsConnected(value bool) {
if tc.traceAPICalls {
tc.t.Log("SetIsConnected")
}
}
func (tc TestClient) GetIsConnected() bool {
func (tc *TestClient) GetIsConnected() bool {
if tc.traceAPICalls {
tc.t.Log("GetIsConnected")
}
@ -478,6 +536,7 @@ type findBlockCase struct {
outgoingERC20Transfers []testERC20Transfer
incomingERC20Transfers []testERC20Transfer
label string
expectedCalls map[string]int
}
func transferInEachBlock() [][]int {
@ -505,12 +564,26 @@ func getCases() []findBlockCase {
},
toBlock: 100,
expectedBlocksFound: 6,
expectedCalls: map[string]int{
"BalanceAt": 27,
//TODO(rasom) NonceAt is flaky, sometimes it's called 18 times, sometimes 17
//to be investigated
//"NonceAt": 18,
"FilterLogs": 10,
"HeaderByNumber": 5,
},
}
case100transfers := findBlockCase{
balanceChanges: transferInEachBlock(),
toBlock: 100,
expectedBlocksFound: 100,
expectedCalls: map[string]int{
"BalanceAt": 101,
"NonceAt": 0,
"FilterLogs": 10,
"HeaderByNumber": 100,
},
}
case3 := findBlockCase{
@ -574,6 +647,10 @@ func getCases() []findBlockCase {
{big.NewInt(80), tokenTXXAddress, big.NewInt(1)},
{big.NewInt(6), tokenTXXAddress, big.NewInt(1)},
},
expectedCalls: map[string]int{
"FilterLogs": 3,
"CallContract": 3,
},
}
cases = append(cases, case1)
@ -586,12 +663,13 @@ func getCases() []findBlockCase {
cases = append(cases, case7emptyHistoryWithOneERC20Transfer)
cases = append(cases, case8emptyHistoryWithERC20Transfers)
//cases = append([]findBlockCase{}, case8emptyHistoryWithERC20Transfers)
//cases = append([]findBlockCase{}, case1)
return cases
}
var tokenTXXAddress = common.HexToAddress("0x53211")
var tokenTXZAddress = common.HexToAddress("0x73211")
func TestFindBlocksCommand(t *testing.T) {
for idx, testCase := range getCases() {
@ -609,6 +687,7 @@ func TestFindBlocksCommand(t *testing.T) {
balances: testCase.balanceChanges,
outgoingERC20Transfers: testCase.outgoingERC20Transfers,
incomingERC20Transfers: testCase.incomingERC20Transfers,
callsCounter: map[string]int{},
}
//tc.traceAPICalls = true
//tc.printPreparedData = true
@ -668,6 +747,14 @@ func TestFindBlocksCommand(t *testing.T) {
numbers = append(numbers, block.Number.Int64())
}
if tc.traceAPICalls {
tc.printCounter()
}
for name, cnt := range testCase.expectedCalls {
require.Equal(t, cnt, tc.callsCounter[name], "calls to "+name)
}
sort.Slice(numbers, func(i, j int) bool { return numbers[i] < numbers[j] })
require.Equal(t, testCase.expectedBlocksFound, len(foundBlocks), testCase.label, "found blocks", numbers)
}

View File

@ -236,14 +236,15 @@ func (d *ETHDownloader) getTransfersInBlock(ctx context.Context, blk *types.Bloc
}
// NewERC20TransfersDownloader returns new instance.
func NewERC20TransfersDownloader(client chain.ClientInterface, accounts []common.Address, signer types.Signer) *ERC20TransfersDownloader {
func NewERC20TransfersDownloader(client chain.ClientInterface, accounts []common.Address, signer types.Signer, incomingOnly bool) *ERC20TransfersDownloader {
signature := w_common.GetEventSignatureHash(w_common.Erc20_721TransferEventSignature)
return &ERC20TransfersDownloader{
client: client,
accounts: accounts,
signature: signature,
signer: signer,
client: client,
accounts: accounts,
incomingOnly: incomingOnly,
signature: signature,
signer: signer,
}
}
@ -253,8 +254,9 @@ func NewERC20TransfersDownloader(client chain.ClientInterface, accounts []common
// database gets implemented, differentiation between erc20 and erc721 will handled
// in the controller.
type ERC20TransfersDownloader struct {
client chain.ClientInterface
accounts []common.Address
client chain.ClientInterface
accounts []common.Address
incomingOnly bool
// hash of the Transfer event signature
signature common.Hash
@ -417,14 +419,18 @@ func (d *ERC20TransfersDownloader) GetHeadersInRange(parent context.Context, fro
log.Debug("get erc20 transfers in range start", "chainID", d.client.NetworkID(), "from", from, "to", to)
headers := []*DBHeader{}
ctx := context.Background()
var err error
for _, address := range d.accounts {
outbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: from,
ToBlock: to,
Topics: d.outboundTopics(address),
})
if err != nil {
return nil, err
outbound := []types.Log{}
if !d.incomingOnly {
outbound, err = d.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: from,
ToBlock: to,
Topics: d.outboundTopics(address),
})
if err != nil {
return nil, err
}
}
inbound, err := d.client.FilterLogs(ctx, ethereum.FilterQuery{
FromBlock: from,

View File

@ -103,7 +103,7 @@ func (s *OnDemandFetchStrategy) newControlCommand(chainClient chain.ClientInterf
signer: signer,
db: s.db,
},
erc20: NewERC20TransfersDownloader(chainClient, accounts, signer),
erc20: NewERC20TransfersDownloader(chainClient, accounts, signer, false),
feed: s.feed,
errorsCount: 0,
transactionManager: s.transactionManager,