diff --git a/services/wallet/async/async.go b/services/wallet/async/async.go index 8aaf5de5d..718b96b2e 100644 --- a/services/wallet/async/async.go +++ b/services/wallet/async/async.go @@ -9,7 +9,7 @@ import ( type Command func(context.Context) error type Commander interface { - Command() Command + Command(inteval ...time.Duration) Command } // SingleShotCommand runs once. diff --git a/services/wallet/transfer/block_ranges_sequential_dao.go b/services/wallet/transfer/block_ranges_sequential_dao.go index 4be143b0d..e3dcf77ab 100644 --- a/services/wallet/transfer/block_ranges_sequential_dao.go +++ b/services/wallet/transfer/block_ranges_sequential_dao.go @@ -9,6 +9,13 @@ import ( "github.com/status-im/status-go/services/wallet/bigint" ) +type BlockRangeDAOer interface { + getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, err error) + upsertRange(chainID uint64, account common.Address, newBlockRange *ethTokensBlockRanges) (err error) + updateTokenRange(chainID uint64, account common.Address, newBlockRange *BlockRange) (err error) + upsertEthRange(chainID uint64, account common.Address, newBlockRange *BlockRange) (err error) +} + type BlockRangeSequentialDAO struct { db *sql.DB } diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index 040d4fdd3..5bdb2660c 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -3,6 +3,7 @@ package transfer import ( "context" "math/big" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -19,14 +20,38 @@ import ( "github.com/status-im/status-go/transactions" ) -var trLoopCnt int = 0 -var fetchNewBlocksCnt int = 0 +func newErrorCounter(msg string) *errorCounter { + return &errorCounter{maxErrors: 3, err: nil, cnt: 0, msg: msg} +} -func verifyOnce(cnt *int, msg string) { - if *cnt > 0 { - panic("verifyOnce, function: " + msg) +type errorCounter struct { + cnt int + maxErrors int + err error + msg string +} + +// Returns false in case of counter overflow +func (ec *errorCounter) setError(err error) bool { + log.Debug("errorCounter setError", "msg", ec.msg, "err", err, "cnt", ec.cnt) + + ec.cnt++ + + // do not overwrite the first error + if ec.err == nil { + ec.err = err } - *cnt++ + + if ec.cnt >= ec.maxErrors { + log.Error("errorCounter overflow", "msg", ec.msg) + return false + } + + return true +} + +func (ec *errorCounter) Error() error { + return ec.err } type findNewBlocksCommand struct { @@ -86,7 +111,12 @@ func (c *findNewBlocksCommand) findAndSaveEthBlocks(parent context.Context, from log.Debug("start findNewBlocksCommand", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", fromNum, "to", headNum) - headers, startBlockNum, _ := c.findBlocksWithEthTransfers(parent, account, fromNum, headNum) + headers, startBlockNum, err := c.findBlocksWithEthTransfers(parent, account, fromNum, headNum) + if err != nil { + c.error = err + break + } + if len(headers) > 0 { log.Debug("findNewBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", headNum, "balance", c.balanceCacher.Cache().GetBalance(account, c.chainClient.NetworkID(), headNum), @@ -101,7 +131,7 @@ func (c *findNewBlocksCommand) findAndSaveEthBlocks(parent context.Context, from c.blocksFound(headers) } - err := c.markEthBlockRangeChecked(account, &BlockRange{startBlockNum, fromNum, headNum}) + err = c.markEthBlockRangeChecked(account, &BlockRange{startBlockNum, fromNum, headNum}) if err != nil { c.error = err break @@ -211,9 +241,7 @@ func (c *findNewBlocksCommand) findBlocksWithEthTransfers(parent context.Context if err != nil { log.Error("findNewBlocksCommand checkRange fastIndex", "err", err, "account", account, "chain", c.chainClient.NetworkID()) - c.error = err - // return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers - return nil, nil, nil + return nil, nil, err } log.Debug("findNewBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", account, "startBlock", startBlockNum, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit) @@ -246,7 +274,7 @@ type findBlocksCommand struct { accounts []common.Address db *Database accountsDB *accounts.Database - blockRangeDAO *BlockRangeSequentialDAO + blockRangeDAO BlockRangeDAOer chainClient chain.ClientInterface balanceCacher balance.Cacher feed *event.Feed @@ -565,7 +593,7 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to return } -func loadBlockRangeInfo(chainID uint64, account common.Address, blockDAO *BlockRangeSequentialDAO) ( +func loadBlockRangeInfo(chainID uint64, account common.Address, blockDAO BlockRangeDAOer) ( *ethTokensBlockRanges, error) { blockRange, err := blockDAO.getBlockRange(chainID, account) @@ -589,7 +617,7 @@ func areAllHistoryBlocksLoaded(blockInfo *BlockRange) bool { return false } -func areAllHistoryBlocksLoadedForAddress(blockRangeDAO *BlockRangeSequentialDAO, chainID uint64, +func areAllHistoryBlocksLoadedForAddress(blockRangeDAO BlockRangeDAOer, chainID uint64, address common.Address) (bool, error) { blockRange, err := blockRangeDAO.getBlockRange(chainID, address) @@ -673,38 +701,41 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber } } -func loadTransfersLoop(ctx context.Context, blockDAO *BlockDAO, db *Database, - chainClient chain.ClientInterface, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, - tokenManager *token.Manager, feed *event.Feed, blocksLoadedCh <-chan []*DBHeader) { +// Start transfers loop to load transfers for new blocks +func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) { + go func() { + defer func() { + c.decStarted() + }() + c.incStarted() - log.Debug("loadTransfersLoop start", "chain", chainClient.NetworkID()) + log.Debug("loadTransfersLoop start", "chain", c.chainClient.NetworkID()) - verifyOnce(&trLoopCnt, "loadTransfersLoop") + for { + select { + case <-ctx.Done(): + log.Debug("startTransfersLoop done", "chain", c.chainClient.NetworkID(), "error", ctx.Err()) + return + case dbHeaders := <-c.blocksLoadedCh: + log.Debug("loadTransfersOnDemand transfers received", "chain", c.chainClient.NetworkID(), "headers", len(dbHeaders)) - for { - select { - case <-ctx.Done(): - log.Info("loadTransfersLoop done", "chain", chainClient.NetworkID(), "error", ctx.Err()) - return - case dbHeaders := <-blocksLoadedCh: - log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.NetworkID(), "headers", len(dbHeaders)) + blocksByAddress := map[common.Address][]*big.Int{} + // iterate over headers and group them by address + for _, dbHeader := range dbHeaders { + blocksByAddress[dbHeader.Address] = append(blocksByAddress[dbHeader.Address], dbHeader.Number) + } - blocksByAddress := map[common.Address][]*big.Int{} - // iterate over headers and group them by address - for _, dbHeader := range dbHeaders { - blocksByAddress[dbHeader.Address] = append(blocksByAddress[dbHeader.Address], dbHeader.Number) + go func() { + _ = loadTransfers(ctx, c.blockDAO, c.db, c.chainClient, noBlockLimit, + blocksByAddress, c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed) + }() } - - go func() { - _ = loadTransfers(ctx, blockDAO, db, chainClient, noBlockLimit, - blocksByAddress, transactionManager, pendingTxManager, tokenManager, feed) - }() } - } + }() } func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, accountsDB *accounts.Database, - blockDAO *BlockDAO, blockRangesSeqDAO *BlockRangeSequentialDAO, chainClient chain.ClientInterface, feed *event.Feed, + blockDAO *BlockDAO, blockRangesSeqDAO BlockRangeDAOer, chainClient chain.ClientInterface, feed *event.Feed, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, balanceCacher balance.Cacher, omitHistory bool) *loadBlocksAndTransfersCommand { @@ -722,6 +753,7 @@ func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, a tokenManager: tokenManager, blocksLoadedCh: make(chan []*DBHeader, 100), omitHistory: omitHistory, + errorCounter: *newErrorCounter("loadBlocksAndTransfersCommand"), } } @@ -729,7 +761,7 @@ type loadBlocksAndTransfersCommand struct { accounts []common.Address db *Database accountsDB *accounts.Database - blockRangeDAO *BlockRangeSequentialDAO + blockRangeDAO BlockRangeDAOer blockDAO *BlockDAO chainClient chain.ClientInterface feed *event.Feed @@ -743,11 +775,30 @@ type loadBlocksAndTransfersCommand struct { // Not to be set by the caller transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime - started bool + loops int + errorCounter + mu sync.Mutex } -// func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error { -func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error { +func (c *loadBlocksAndTransfersCommand) incStarted() { + c.mu.Lock() + defer c.mu.Unlock() + c.loops++ +} + +func (c *loadBlocksAndTransfersCommand) decStarted() { + c.mu.Lock() + defer c.mu.Unlock() + c.loops-- +} + +func (c *loadBlocksAndTransfersCommand) isStarted() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.loops > 0 +} + +func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) (err error) { log.Debug("start load all transfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts) // Finite processes (to be restarted on error, but stopped on success): @@ -758,33 +809,40 @@ func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error { // fetching new blocks // fetching transfers for new blocks + ctx, cancel := context.WithCancel(parent) + finiteGroup := async.NewAtomicGroup(ctx) + defer func() { + finiteGroup.Stop() + finiteGroup.Wait() + + // if there was an error, and errors overflowed, stop the command + if err != nil && !c.setError(err) { + log.Error("loadBlocksAndTransfersCommand", "error", c.Error(), "err", err) + err = nil // stop the commands + cancel() // stop inner loops + } + }() + fromNum := big.NewInt(0) headNum, err := getHeadBlockNumber(ctx, c.chainClient) if err != nil { return err } - group := async.NewAtomicGroup(ctx) - defer func() { - group.Stop() - group.Wait() - }() - // It will start loadTransfersCommand which will run until success when all transfers from DB are loaded - err = c.startFetchingTransfersForLoadedBlocks(group) + err = c.startFetchingTransfersForLoadedBlocks(finiteGroup) if err != nil { log.Error("loadBlocksAndTransfersCommand fetchTransfersForLoadedBlocks", "error", err) return err } - if !c.started { - c.started = true + if !c.isStarted() { c.startTransfersLoop(ctx) c.startFetchingNewBlocks(ctx, c.accounts, headNum, c.blocksLoadedCh) } // It will start findBlocksCommands which will run until success when all blocks are loaded - err = c.fetchHistoryBlocks(group, c.accounts, fromNum, headNum, c.blocksLoadedCh) + err = c.fetchHistoryBlocks(finiteGroup, c.accounts, fromNum, headNum, c.blocksLoadedCh) if err != nil { log.Error("loadBlocksAndTransfersCommand fetchHistoryBlocks", "error", err) return err @@ -792,27 +850,29 @@ func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error { select { case <-ctx.Done(): - return ctx.Err() - case <-group.WaitAsync(): - log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts) - return nil + log.Debug("loadBlocksAndTransfers command cancelled", "chain", c.chainClient.NetworkID(), "accounts", c.accounts, "error", ctx.Err()) + case <-finiteGroup.WaitAsync(): + err = finiteGroup.Error() // if there was an error, rerun the command + log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts, "error", err) } - return nil + + return err } -func (c *loadBlocksAndTransfersCommand) Command() async.Command { - return async.InfiniteCommand{ - Interval: 5 * time.Second, +func (c *loadBlocksAndTransfersCommand) Command(interval ...time.Duration) async.Command { + // 30s - default interval for Infura's delay returned in error. That should increase chances + // for request to succeed with the next attempt for now until we have a proper retry mechanism + intvl := 30 * time.Second + if len(interval) > 0 { + intvl = interval[0] + } + + return async.FiniteCommand{ + Interval: intvl, Runable: c.Run, }.Run } -// Start transfers loop to load transfers for new blocks -func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) { - go loadTransfersLoop(ctx, c.blockDAO, c.db, c.chainClient, c.transactionManager, - c.pendingTxManager, c.tokenManager, c.feed, c.blocksLoadedCh) -} - func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(group *async.AtomicGroup, accounts []common.Address, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) (err error) { for _, account := range accounts { err = c.fetchHistoryBlocksForAccount(group, account, fromNum, toNum, c.blocksLoadedCh) @@ -837,8 +897,7 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn blockRange, err := loadBlockRangeInfo(c.chainClient.NetworkID(), account, c.blockRangeDAO) if err != nil { log.Error("fetchHistoryBlocks loadBlockRangeInfo", "error", err) - // c.error = err - return err // Will keep spinning forever nomatter what + return err } ranges := [][]*big.Int{} @@ -906,9 +965,12 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Context, addresses []common.Address, fromNum *big.Int, blocksLoadedCh chan<- []*DBHeader) { log.Debug("startFetchingNewBlocks start", "chainID", c.chainClient.NetworkID(), "accounts", addresses) - verifyOnce(&fetchNewBlocksCnt, "startFetchingNewBlocks") - go func() { + defer func() { + c.decStarted() + }() + c.incStarted() + newBlocksCmd := &findNewBlocksCommand{ findBlocksCommand: &findBlocksCommand{ accounts: addresses, @@ -931,9 +993,9 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Conte // No need to wait for the group since it is infinite <-ctx.Done() - }() - log.Debug("startFetchingNewBlocks end", "chainID", c.chainClient.NetworkID(), "accounts", addresses, "error", ctx.Err()) + log.Debug("startFetchingNewBlocks end", "chainID", c.chainClient.NetworkID(), "accounts", addresses, "error", ctx.Err()) + }() } func (c *loadBlocksAndTransfersCommand) getBlocksToLoad() (map[common.Address][]*big.Int, error) { diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 98fabb0e1..46e5ef63c 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -10,6 +10,7 @@ import ( "testing" "time" + "github.com/pkg/errors" "github.com/stretchr/testify/mock" "golang.org/x/exp/slices" // since 1.21, this is in the standard library @@ -29,6 +30,7 @@ import ( "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/t/helpers" + "github.com/status-im/status-go/t/utils" "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/params" @@ -1337,3 +1339,161 @@ func TestFetchNewBlocksCommand(t *testing.T) { // We must check all the logs for all accounts with a single iteration of eth_getLogs call require.Equal(t, 3, tc.callsCounter["FilterLogs"], "calls to FilterLogs") } + +type TestClientWithError struct { + *TestClient +} + +func (tc *TestClientWithError) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + tc.incCounter("HeaderByNumber") + if tc.traceAPICalls { + tc.t.Log("HeaderByNumber", number) + } + + return nil, errors.New("Network error") +} + +func TestLoadBlocksAndTransfersCommand_StopOnErrorsOverflow(t *testing.T) { + tc := &TestClientWithError{ + &TestClient{ + t: t, + callsCounter: map[string]int{}, + }, + } + + cmd := &loadBlocksAndTransfersCommand{ + chainClient: tc, + errorCounter: *newErrorCounter("testLoadBlocksAndTransfersCommand"), + } + + ctx := context.Background() + group := async.NewGroup(ctx) + + group.Add(cmd.Command(1 * time.Millisecond)) + + select { + case <-ctx.Done(): + t.Log("Done") + case <-group.WaitAsync(): + t.Log("Command finished", "error", cmd.Error()) + require.Equal(t, cmd.maxErrors, tc.callsCounter["HeaderByNumber"]) + + _, expectedErr := tc.HeaderByNumber(ctx, nil) + require.Error(t, expectedErr, cmd.Error()) + } +} + +type BlockRangeSequentialDAOMockError struct { + *BlockRangeSequentialDAO +} + +func (b *BlockRangeSequentialDAOMockError) getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, err error) { + return nil, errors.New("DB error") +} + +func TestLoadBlocksAndTransfersCommand_StopOnErrorsOverflowWhenStarted(t *testing.T) { + appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + + db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) + require.NoError(t, err) + + wdb := NewDB(db) + tc := &TestClient{ + t: t, + callsCounter: map[string]int{}, + } + accDB, err := accounts.NewDB(appdb) + require.NoError(t, err) + + cmd := &loadBlocksAndTransfersCommand{ + accounts: []common.Address{common.HexToAddress("0x1234")}, + chainClient: tc, + blockDAO: &BlockDAO{db}, + blockRangeDAO: &BlockRangeSequentialDAOMockError{ + &BlockRangeSequentialDAO{ + wdb.client, + }, + }, + accountsDB: accDB, + } + + ctx := context.Background() + group := async.NewGroup(ctx) + + group.Add(cmd.Command(1 * time.Millisecond)) + + select { + case <-ctx.Done(): + t.Log("Done") + case <-group.WaitAsync(): + t.Log("Command finished", "error", cmd.Error()) + _, expectedErr := cmd.blockRangeDAO.getBlockRange(0, common.Address{}) + require.Error(t, expectedErr, cmd.Error()) + require.NoError(t, utils.Eventually(func() error { + if !cmd.isStarted() { + return nil + } + return errors.New("command is still running") + }, 100*time.Millisecond, 10*time.Millisecond)) + } +} + +type BlockRangeSequentialDAOMockSuccess struct { + *BlockRangeSequentialDAO +} + +func (b *BlockRangeSequentialDAOMockSuccess) getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, err error) { + return newEthTokensBlockRanges(), nil +} + +func TestLoadBlocksAndTransfersCommand_FiniteFinishedInfiniteRunning(t *testing.T) { + appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) + require.NoError(t, err) + + db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) + require.NoError(t, err) + + wdb := NewDB(db) + tc := &TestClient{ + t: t, + callsCounter: map[string]int{}, + } + accDB, err := accounts.NewDB(appdb) + require.NoError(t, err) + + cmd := &loadBlocksAndTransfersCommand{ + accounts: []common.Address{common.HexToAddress("0x1234")}, + chainClient: tc, + blockDAO: &BlockDAO{db}, + blockRangeDAO: &BlockRangeSequentialDAOMockSuccess{ + &BlockRangeSequentialDAO{ + wdb.client, + }, + }, + accountsDB: accDB, + } + + ctx, cancel := context.WithCancel(context.Background()) + group := async.NewGroup(ctx) + + group.Add(cmd.Command(1 * time.Millisecond)) + + select { + case <-ctx.Done(): + cancel() // linter is not happy if cancel is not called on all code paths + t.Log("Done") + case <-group.WaitAsync(): + t.Log("Command finished", "error", cmd.Error()) + require.NoError(t, cmd.Error()) + require.True(t, cmd.isStarted()) + + cancel() + require.NoError(t, utils.Eventually(func() error { + if !cmd.isStarted() { + return nil + } + return errors.New("command is still running") + }, 100*time.Millisecond, 10*time.Millisecond)) + } +}