diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index ee9de5b8f..ba9bbad47 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -6,8 +6,6 @@ import ( "sync/atomic" "time" - "golang.org/x/exp/slices" - "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" @@ -27,11 +25,19 @@ import ( var findBlocksRetryInterval = 5 * time.Second +type nonceInfo struct { + nonce *int64 + blockNumber *big.Int +} + type findNewBlocksCommand struct { *findBlocksCommand - contractMaker *contracts.ContractMaker - iteration int - blockChainState *blockchainstate.BlockChainState + contractMaker *contracts.ContractMaker + iteration int + blockChainState *blockchainstate.BlockChainState + lastNonces map[common.Address]nonceInfo + nonceCheckIntervalIterations int + logsCheckIntervalIterations int } func (c *findNewBlocksCommand) Command() async.Command { @@ -107,23 +113,49 @@ func (c *findNewBlocksCommand) detectTransfers(parent context.Context, accounts return blockNum, addressesToCheck, nil } -func (c *findNewBlocksCommand) detectNonceChange(parent context.Context, from, to *big.Int, accounts []common.Address) ([]common.Address, error) { - addressesWithChange := []common.Address{} +func (c *findNewBlocksCommand) detectNonceChange(parent context.Context, to *big.Int, accounts []common.Address) (map[common.Address]*big.Int, error) { + addressesWithChange := map[common.Address]*big.Int{} for _, account := range accounts { - oldNonce, err := c.balanceCacher.NonceAt(parent, c.chainClient, account, from) + var oldNonce *int64 + + blockRange, err := c.blockRangeDAO.getBlockRange(c.chainClient.NetworkID(), account) if err != nil { - log.Error("findNewBlocksCommand can't get nonce", "error", err, "account", account, "chain", c.chainClient.NetworkID()) + log.Error("findNewBlocksCommand can't get block range", "error", err, "account", account, "chain", c.chainClient.NetworkID()) return nil, err } + lastNonceInfo, ok := c.lastNonces[account] + if !ok || lastNonceInfo.blockNumber.Cmp(blockRange.eth.LastKnown) != 0 { + log.Info("Fetching old nonce", "at", blockRange.eth.LastKnown, "acc", account) + + oldNonce, err = c.balanceCacher.NonceAt(parent, c.chainClient, account, blockRange.eth.LastKnown) + if err != nil { + log.Error("findNewBlocksCommand can't get nonce", "error", err, "account", account, "chain", c.chainClient.NetworkID()) + return nil, err + } + } else { + oldNonce = lastNonceInfo.nonce + } + newNonce, err := c.balanceCacher.NonceAt(parent, c.chainClient, account, to) if err != nil { log.Error("findNewBlocksCommand can't get nonce", "error", err, "account", account, "chain", c.chainClient.NetworkID()) return nil, err } + log.Info("Comparing nonces", "oldNonce", *oldNonce, "newNonce", *newNonce, "to", to, "acc", account) + if *newNonce != *oldNonce { - addressesWithChange = append(addressesWithChange, account) + addressesWithChange[account] = blockRange.eth.LastKnown + } + + if c.lastNonces == nil { + c.lastNonces = map[common.Address]nonceInfo{} + } + + c.lastNonces[account] = nonceInfo{ + nonce: newNonce, + blockNumber: to, } } @@ -180,23 +212,25 @@ func (c *findNewBlocksCommand) Run(parent context.Context) error { if err != nil { return err } - } else if c.iteration%nonceCheckIntervalIterations == 0 && len(accountsWithOutsideTransfers) > 0 { + } else if c.iteration%c.nonceCheckIntervalIterations == 0 && len(accountsWithOutsideTransfers) > 0 { log.Debug("findNewBlocksCommand nonce check", "accounts", accountsWithOutsideTransfers) - accountsWithNonceChanges, err := c.detectNonceChange(parent, c.fromBlockNumber, headNum, accountsWithOutsideTransfers) + accountsWithNonceChanges, err := c.detectNonceChange(parent, headNum, accountsWithOutsideTransfers) if err != nil { return err } if len(accountsWithNonceChanges) > 0 { log.Debug("findNewBlocksCommand detected nonce diff", "accounts", accountsWithNonceChanges) - err = c.findAndSaveEthBlocks(parent, c.fromBlockNumber, headNum, accountsWithNonceChanges) - if err != nil { - return err + for account, from := range accountsWithNonceChanges { + err = c.findAndSaveEthBlocks(parent, from, headNum, []common.Address{account}) + if err != nil { + return err + } } } for _, account := range accountsToCheck { - if slices.Contains(accountsWithNonceChanges, account) { + if _, ok := accountsWithNonceChanges[account]; ok { continue } err := c.markEthBlockRangeChecked(account, &BlockRange{nil, c.fromBlockNumber, headNum}) @@ -206,7 +240,7 @@ func (c *findNewBlocksCommand) Run(parent context.Context) error { } } - if len(accountsWithDetectedChanges) != 0 || c.iteration%logsCheckIntervalIterations == 0 { + if len(accountsWithDetectedChanges) != 0 || c.iteration%c.logsCheckIntervalIterations == 0 { err = c.findAndSaveTokenBlocks(parent, c.fromBlockNumber, headNum) if err != nil { return err @@ -1096,8 +1130,10 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Conte blocksLoadedCh: blocksLoadedCh, defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, }, - contractMaker: c.contractMaker, - blockChainState: c.blockChainState, + contractMaker: c.contractMaker, + blockChainState: c.blockChainState, + nonceCheckIntervalIterations: nonceCheckIntervalIterations, + logsCheckIntervalIterations: logsCheckIntervalIterations, } group := async.NewGroup(ctx) group.Add(newBlocksCmd.Command()) diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 36fbb9dd4..88e5e8332 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -1340,6 +1340,8 @@ func TestFetchNewBlocksCommand_findBlocksWithEthTransfers(t *testing.T) { blocksLoadedCh: blockChannel, defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, }, + nonceCheckIntervalIterations: nonceCheckIntervalIterations, + logsCheckIntervalIterations: logsCheckIntervalIterations, } tc.prepareBalanceHistory(int(tc.currentBlock)) tc.prepareTokenBalanceHistory(int(tc.currentBlock)) @@ -1351,6 +1353,112 @@ func TestFetchNewBlocksCommand_findBlocksWithEthTransfers(t *testing.T) { } } +func TestFetchNewBlocksCommand_nonceDetection(t *testing.T) { + balanceChanges := [][]int{ + {5, 1, 0}, + {6, 0, 1}, + } + + scanRange := 5 + address := common.HexToAddress("0x1234") + + tc := &TestClient{ + t: t, + balances: map[common.Address][][]int{address: balanceChanges}, + outgoingERC20Transfers: map[common.Address][]testERC20Transfer{}, + incomingERC20Transfers: map[common.Address][]testERC20Transfer{}, + callsCounter: map[string]int{}, + currentBlock: 0, + } + + //tc.printPreparedData = true + tc.prepareBalanceHistory(20) + + appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + + db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) + require.NoError(t, err) + tm := &TransactionManager{db, nil, nil, nil, nil, nil, nil, nil, nil, nil} + + mediaServer, err := server.NewMediaServer(appdb, nil, nil, db) + require.NoError(t, err) + + client, _ := statusRpc.NewClient(nil, 1, params.UpstreamRPCConfig{Enabled: false, URL: ""}, []params.Network{}, db) + client.SetClient(tc.NetworkID(), tc) + tokenManager := token.NewTokenManager(db, client, community.NewManager(appdb, nil, nil), network.NewManager(appdb), appdb, mediaServer, nil) + + wdb := NewDB(db) + blockChannel := make(chan []*DBHeader, 10) + + accDB, err := accounts.NewDB(appdb) + require.NoError(t, err) + + maker, _ := contracts.NewContractMaker(client) + + cmd := &findNewBlocksCommand{ + findBlocksCommand: &findBlocksCommand{ + accounts: []common.Address{address}, + db: wdb, + accountsDB: accDB, + blockRangeDAO: &BlockRangeSequentialDAO{wdb.client}, + chainClient: tc, + balanceCacher: balance.NewCacherWithTTL(5 * time.Minute), + feed: &event.Feed{}, + noLimit: false, + transactionManager: tm, + tokenManager: tokenManager, + blocksLoadedCh: blockChannel, + defaultNodeBlockChunkSize: scanRange, + fromBlockNumber: big.NewInt(0), + }, + blockChainState: blockchainstate.NewBlockChainState(), + contractMaker: maker, + nonceCheckIntervalIterations: 2, + logsCheckIntervalIterations: 2, + } + + acc := &accounts.Account{ + Address: ethtypes.BytesToAddress(address.Bytes()), + Type: accounts.AccountTypeWatch, + Name: address.String(), + ColorID: multicommon.CustomizationColorPrimary, + Emoji: "emoji", + } + err = accDB.SaveOrUpdateAccounts([]*accounts.Account{acc}, false) + require.NoError(t, err) + + ctx := context.Background() + tc.currentBlock = 3 + for i := 0; i < 3; i++ { + err := cmd.Run(ctx) + require.NoError(t, err) + close(blockChannel) + + foundBlocks := []*DBHeader{} + for { + bloks, ok := <-blockChannel + if !ok { + break + } + foundBlocks = append(foundBlocks, bloks...) + } + + numbers := []int64{} + for _, block := range foundBlocks { + numbers = append(numbers, block.Number.Int64()) + } + if i == 2 { + require.Equal(t, 2, len(foundBlocks), "blocks", numbers) + } else { + require.Equal(t, 0, len(foundBlocks), "no blocks expected to be found") + } + blockChannel = make(chan []*DBHeader, 10) + cmd.blocksLoadedCh = blockChannel + tc.currentBlock += uint64(scanRange) + } +} + func TestFetchNewBlocksCommand(t *testing.T) { appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) require.NoError(t, err) @@ -1432,8 +1540,10 @@ func TestFetchNewBlocksCommand(t *testing.T) { blocksLoadedCh: blockChannel, defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, }, - contractMaker: tokenManager.ContractMaker, - blockChainState: blockchainstate.NewBlockChainState(), + contractMaker: tokenManager.ContractMaker, + blockChainState: blockchainstate.NewBlockChainState(), + nonceCheckIntervalIterations: nonceCheckIntervalIterations, + logsCheckIntervalIterations: logsCheckIntervalIterations, } ctx := context.Background()