Fix nonce change detection (#4679)

This commit is contained in:
Roman Volosovskyi 2024-02-08 12:54:04 +01:00 committed by GitHub
parent c49a0fc314
commit 005e7e6ee4
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 167 additions and 21 deletions

View File

@ -6,8 +6,6 @@ import (
"sync/atomic"
"time"
"golang.org/x/exp/slices"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
@ -27,11 +25,19 @@ import (
var findBlocksRetryInterval = 5 * time.Second
type nonceInfo struct {
nonce *int64
blockNumber *big.Int
}
type findNewBlocksCommand struct {
*findBlocksCommand
contractMaker *contracts.ContractMaker
iteration int
blockChainState *blockchainstate.BlockChainState
contractMaker *contracts.ContractMaker
iteration int
blockChainState *blockchainstate.BlockChainState
lastNonces map[common.Address]nonceInfo
nonceCheckIntervalIterations int
logsCheckIntervalIterations int
}
func (c *findNewBlocksCommand) Command() async.Command {
@ -107,23 +113,49 @@ func (c *findNewBlocksCommand) detectTransfers(parent context.Context, accounts
return blockNum, addressesToCheck, nil
}
func (c *findNewBlocksCommand) detectNonceChange(parent context.Context, from, to *big.Int, accounts []common.Address) ([]common.Address, error) {
addressesWithChange := []common.Address{}
func (c *findNewBlocksCommand) detectNonceChange(parent context.Context, to *big.Int, accounts []common.Address) (map[common.Address]*big.Int, error) {
addressesWithChange := map[common.Address]*big.Int{}
for _, account := range accounts {
oldNonce, err := c.balanceCacher.NonceAt(parent, c.chainClient, account, from)
var oldNonce *int64
blockRange, err := c.blockRangeDAO.getBlockRange(c.chainClient.NetworkID(), account)
if err != nil {
log.Error("findNewBlocksCommand can't get nonce", "error", err, "account", account, "chain", c.chainClient.NetworkID())
log.Error("findNewBlocksCommand can't get block range", "error", err, "account", account, "chain", c.chainClient.NetworkID())
return nil, err
}
lastNonceInfo, ok := c.lastNonces[account]
if !ok || lastNonceInfo.blockNumber.Cmp(blockRange.eth.LastKnown) != 0 {
log.Info("Fetching old nonce", "at", blockRange.eth.LastKnown, "acc", account)
oldNonce, err = c.balanceCacher.NonceAt(parent, c.chainClient, account, blockRange.eth.LastKnown)
if err != nil {
log.Error("findNewBlocksCommand can't get nonce", "error", err, "account", account, "chain", c.chainClient.NetworkID())
return nil, err
}
} else {
oldNonce = lastNonceInfo.nonce
}
newNonce, err := c.balanceCacher.NonceAt(parent, c.chainClient, account, to)
if err != nil {
log.Error("findNewBlocksCommand can't get nonce", "error", err, "account", account, "chain", c.chainClient.NetworkID())
return nil, err
}
log.Info("Comparing nonces", "oldNonce", *oldNonce, "newNonce", *newNonce, "to", to, "acc", account)
if *newNonce != *oldNonce {
addressesWithChange = append(addressesWithChange, account)
addressesWithChange[account] = blockRange.eth.LastKnown
}
if c.lastNonces == nil {
c.lastNonces = map[common.Address]nonceInfo{}
}
c.lastNonces[account] = nonceInfo{
nonce: newNonce,
blockNumber: to,
}
}
@ -180,23 +212,25 @@ func (c *findNewBlocksCommand) Run(parent context.Context) error {
if err != nil {
return err
}
} else if c.iteration%nonceCheckIntervalIterations == 0 && len(accountsWithOutsideTransfers) > 0 {
} else if c.iteration%c.nonceCheckIntervalIterations == 0 && len(accountsWithOutsideTransfers) > 0 {
log.Debug("findNewBlocksCommand nonce check", "accounts", accountsWithOutsideTransfers)
accountsWithNonceChanges, err := c.detectNonceChange(parent, c.fromBlockNumber, headNum, accountsWithOutsideTransfers)
accountsWithNonceChanges, err := c.detectNonceChange(parent, headNum, accountsWithOutsideTransfers)
if err != nil {
return err
}
if len(accountsWithNonceChanges) > 0 {
log.Debug("findNewBlocksCommand detected nonce diff", "accounts", accountsWithNonceChanges)
err = c.findAndSaveEthBlocks(parent, c.fromBlockNumber, headNum, accountsWithNonceChanges)
if err != nil {
return err
for account, from := range accountsWithNonceChanges {
err = c.findAndSaveEthBlocks(parent, from, headNum, []common.Address{account})
if err != nil {
return err
}
}
}
for _, account := range accountsToCheck {
if slices.Contains(accountsWithNonceChanges, account) {
if _, ok := accountsWithNonceChanges[account]; ok {
continue
}
err := c.markEthBlockRangeChecked(account, &BlockRange{nil, c.fromBlockNumber, headNum})
@ -206,7 +240,7 @@ func (c *findNewBlocksCommand) Run(parent context.Context) error {
}
}
if len(accountsWithDetectedChanges) != 0 || c.iteration%logsCheckIntervalIterations == 0 {
if len(accountsWithDetectedChanges) != 0 || c.iteration%c.logsCheckIntervalIterations == 0 {
err = c.findAndSaveTokenBlocks(parent, c.fromBlockNumber, headNum)
if err != nil {
return err
@ -1096,8 +1130,10 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Conte
blocksLoadedCh: blocksLoadedCh,
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,
},
contractMaker: c.contractMaker,
blockChainState: c.blockChainState,
contractMaker: c.contractMaker,
blockChainState: c.blockChainState,
nonceCheckIntervalIterations: nonceCheckIntervalIterations,
logsCheckIntervalIterations: logsCheckIntervalIterations,
}
group := async.NewGroup(ctx)
group.Add(newBlocksCmd.Command())

View File

@ -1340,6 +1340,8 @@ func TestFetchNewBlocksCommand_findBlocksWithEthTransfers(t *testing.T) {
blocksLoadedCh: blockChannel,
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,
},
nonceCheckIntervalIterations: nonceCheckIntervalIterations,
logsCheckIntervalIterations: logsCheckIntervalIterations,
}
tc.prepareBalanceHistory(int(tc.currentBlock))
tc.prepareTokenBalanceHistory(int(tc.currentBlock))
@ -1351,6 +1353,112 @@ func TestFetchNewBlocksCommand_findBlocksWithEthTransfers(t *testing.T) {
}
}
func TestFetchNewBlocksCommand_nonceDetection(t *testing.T) {
balanceChanges := [][]int{
{5, 1, 0},
{6, 0, 1},
}
scanRange := 5
address := common.HexToAddress("0x1234")
tc := &TestClient{
t: t,
balances: map[common.Address][][]int{address: balanceChanges},
outgoingERC20Transfers: map[common.Address][]testERC20Transfer{},
incomingERC20Transfers: map[common.Address][]testERC20Transfer{},
callsCounter: map[string]int{},
currentBlock: 0,
}
//tc.printPreparedData = true
tc.prepareBalanceHistory(20)
appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
require.NoError(t, err)
db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{})
require.NoError(t, err)
tm := &TransactionManager{db, nil, nil, nil, nil, nil, nil, nil, nil, nil}
mediaServer, err := server.NewMediaServer(appdb, nil, nil, db)
require.NoError(t, err)
client, _ := statusRpc.NewClient(nil, 1, params.UpstreamRPCConfig{Enabled: false, URL: ""}, []params.Network{}, db)
client.SetClient(tc.NetworkID(), tc)
tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil)
wdb := NewDB(db)
blockChannel := make(chan []*DBHeader, 10)
accDB, err := accounts.NewDB(appdb)
require.NoError(t, err)
maker, _ := contracts.NewContractMaker(client)
cmd := &findNewBlocksCommand{
findBlocksCommand: &findBlocksCommand{
accounts: []common.Address{address},
db: wdb,
accountsDB: accDB,
blockRangeDAO: &BlockRangeSequentialDAO{wdb.client},
chainClient: tc,
balanceCacher: balance.NewCacherWithTTL(5 * time.Minute),
feed: &event.Feed{},
noLimit: false,
transactionManager: tm,
tokenManager: tokenManager,
blocksLoadedCh: blockChannel,
defaultNodeBlockChunkSize: scanRange,
fromBlockNumber: big.NewInt(0),
},
blockChainState: blockchainstate.NewBlockChainState(),
contractMaker: maker,
nonceCheckIntervalIterations: 2,
logsCheckIntervalIterations: 2,
}
acc := &accounts.Account{
Address: ethtypes.BytesToAddress(address.Bytes()),
Type: accounts.AccountTypeWatch,
Name: address.String(),
ColorID: multicommon.CustomizationColorPrimary,
Emoji: "emoji",
}
err = accDB.SaveOrUpdateAccounts([]*accounts.Account{acc}, false)
require.NoError(t, err)
ctx := context.Background()
tc.currentBlock = 3
for i := 0; i < 3; i++ {
err := cmd.Run(ctx)
require.NoError(t, err)
close(blockChannel)
foundBlocks := []*DBHeader{}
for {
bloks, ok := <-blockChannel
if !ok {
break
}
foundBlocks = append(foundBlocks, bloks...)
}
numbers := []int64{}
for _, block := range foundBlocks {
numbers = append(numbers, block.Number.Int64())
}
if i == 2 {
require.Equal(t, 2, len(foundBlocks), "blocks", numbers)
} else {
require.Equal(t, 0, len(foundBlocks), "no blocks expected to be found")
}
blockChannel = make(chan []*DBHeader, 10)
cmd.blocksLoadedCh = blockChannel
tc.currentBlock += uint64(scanRange)
}
}
func TestFetchNewBlocksCommand(t *testing.T) {
appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
require.NoError(t, err)
@ -1432,8 +1540,10 @@ func TestFetchNewBlocksCommand(t *testing.T) {
blocksLoadedCh: blockChannel,
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,
},
contractMaker: tokenManager.ContractMaker,
blockChainState: blockchainstate.NewBlockChainState(),
contractMaker: tokenManager.ContractMaker,
blockChainState: blockchainstate.NewBlockChainState(),
nonceCheckIntervalIterations: nonceCheckIntervalIterations,
logsCheckIntervalIterations: logsCheckIntervalIterations,
}
ctx := context.Background()