fix(wallet): handle errors in findBlocksCommand and

findNewBlocksCommand gracefully.
Add ErrorCounter type for async package.
Add tests

Closes https://github.com/status-im/status-desktop/issues/11074
This commit is contained in:
Ivan Belyakov 2024-01-17 12:46:59 +01:00 committed by IvanBelyakoff
parent 71ae7ca1a0
commit 95b148a247
4 changed files with 189 additions and 141 deletions

View File

@ -4,6 +4,8 @@ import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
)
type Command func(context.Context) error
@ -12,6 +14,10 @@ type Commander interface {
Command(inteval ...time.Duration) Command
}
type Runner interface {
Run(context.Context) error
}
// SingleShotCommand runs once.
type SingleShotCommand struct {
Interval time.Duration
@ -41,6 +47,7 @@ func (c SingleShotCommand) Run(ctx context.Context) error {
type FiniteCommand struct {
Interval time.Duration
Runable func(context.Context) error
OnExit *func(context.Context, error)
}
func (c FiniteCommand) Run(ctx context.Context) error {
@ -138,6 +145,20 @@ type AtomicGroup struct {
error error
}
type AtomicGroupKey string
func (d *AtomicGroup) SetName(name string) {
d.ctx = context.WithValue(d.ctx, AtomicGroupKey("name"), name)
}
func (d *AtomicGroup) Name() string {
val := d.ctx.Value(AtomicGroupKey("name"))
if val != nil {
return val.(string)
}
return ""
}
// Go spawns function in a goroutine and stores results or errors.
func (d *AtomicGroup) Add(cmd Command) {
d.wg.Add(1)
@ -149,6 +170,7 @@ func (d *AtomicGroup) Add(cmd Command) {
if err != nil {
// do not overwrite original error by context errors
if d.error != nil {
log.Info("async.Command failed", "error", err, "d.error", d.error, "group", d.Name())
return
}
d.error = err
@ -244,3 +266,85 @@ func (d *QueuedAtomicGroup) onFinish() {
d.mu.Unlock()
}
func NewErrorCounter(maxErrors int, msg string) *ErrorCounter {
return &ErrorCounter{maxErrors: maxErrors, msg: msg}
}
type ErrorCounter struct {
cnt int
maxErrors int
err error
msg string
}
// Returns false in case of counter overflow
func (ec *ErrorCounter) SetError(err error) bool {
log.Debug("ErrorCounter setError", "msg", ec.msg, "err", err, "cnt", ec.cnt)
ec.cnt++
// do not overwrite the first error
if ec.err == nil {
ec.err = err
}
if ec.cnt >= ec.maxErrors {
log.Error("ErrorCounter overflow", "msg", ec.msg)
return false
}
return true
}
func (ec *ErrorCounter) Error() error {
return ec.err
}
func (ec *ErrorCounter) MaxErrors() int {
return ec.maxErrors
}
type FiniteCommandWithErrorCounter struct {
FiniteCommand
*ErrorCounter
}
func (c FiniteCommandWithErrorCounter) Run(ctx context.Context) error {
f := func(ctx context.Context) (quit bool, err error) {
err = c.Runable(ctx)
if err == nil {
return true, err
}
if c.ErrorCounter.SetError(err) {
return false, err
} else {
return true, err
}
}
quit, err := f(ctx)
defer func() {
if c.OnExit != nil {
(*c.OnExit)(ctx, err)
}
}()
if quit {
return err
}
ticker := time.NewTicker(c.Interval)
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
quit, err := f(ctx)
if quit {
return err
}
}
}
}

View File

@ -0,0 +1,5 @@
package balance
type TestCacher struct {
cacherImpl
}

View File

@ -3,7 +3,7 @@ package transfer
import (
"context"
"math/big"
"sync"
"sync/atomic"
"time"
"golang.org/x/exp/slices"
@ -25,39 +25,7 @@ import (
"github.com/status-im/status-go/transactions"
)
func newErrorCounter(msg string) *errorCounter {
return &errorCounter{maxErrors: 3, err: nil, cnt: 0, msg: msg}
}
type errorCounter struct {
cnt int
maxErrors int
err error
msg string
}
// Returns false in case of counter overflow
func (ec *errorCounter) setError(err error) bool {
log.Debug("errorCounter setError", "msg", ec.msg, "err", err, "cnt", ec.cnt)
ec.cnt++
// do not overwrite the first error
if ec.err == nil {
ec.err = err
}
if ec.cnt >= ec.maxErrors {
log.Error("errorCounter overflow", "msg", ec.msg)
return false
}
return true
}
func (ec *errorCounter) Error() error {
return ec.err
}
var findBlocksRetryInterval = 5 * time.Second
type findNewBlocksCommand struct {
*findBlocksCommand
@ -68,9 +36,6 @@ type findNewBlocksCommand struct {
func (c *findNewBlocksCommand) Command() async.Command {
return async.InfiniteCommand{
// TODO - make it configurable based on chain block mining time
// NOTE(rasom): ^ it is unclear why each block has to be checked,
// that is rather undesirable, as it causes a lot of RPC requests
Interval: 2 * time.Minute,
Runable: c.Run,
}.Run
@ -171,7 +136,6 @@ var logsCheckIntervalIterations = 5
func (c *findNewBlocksCommand) Run(parent context.Context) error {
mnemonicWasNotShown, err := c.accountsDB.GetMnemonicWasNotShown()
if err != nil {
c.error = err
return err
}
@ -183,7 +147,6 @@ func (c *findNewBlocksCommand) Run(parent context.Context) error {
for _, account := range c.accounts {
acc, err := c.accountsDB.GetAccountByAddress(nodetypes.Address(account))
if err != nil {
c.error = err
return err
}
if mnemonicWasNotShown {
@ -212,16 +175,15 @@ func (c *findNewBlocksCommand) Run(parent context.Context) error {
c.blockChainState.SetLastBlockNumber(c.chainClient.NetworkID(), headNum.Uint64())
if len(accountsWithDetectedChanges) != 0 {
c.findAndSaveEthBlocks(parent, c.fromBlockNumber, headNum, accountsToCheck)
_ = c.findAndSaveEthBlocks(parent, c.fromBlockNumber, headNum, accountsToCheck)
} else if c.iteration%nonceCheckIntervalIterations == 0 && len(accountsWithOutsideTransfers) > 0 {
accountsWithNonceChanges, err := c.detectNonceChange(parent, c.fromBlockNumber, headNum, accountsWithOutsideTransfers)
if err != nil {
c.error = err
return err
}
if len(accountsWithNonceChanges) > 0 {
c.findAndSaveEthBlocks(parent, c.fromBlockNumber, headNum, accountsWithNonceChanges)
_ = c.findAndSaveEthBlocks(parent, c.fromBlockNumber, headNum, accountsWithNonceChanges)
}
for _, account := range accountsToCheck {
@ -230,14 +192,13 @@ func (c *findNewBlocksCommand) Run(parent context.Context) error {
}
err := c.markEthBlockRangeChecked(account, &BlockRange{nil, c.fromBlockNumber, headNum})
if err != nil {
c.error = err
return err
}
}
}
if len(accountsWithDetectedChanges) != 0 || c.iteration%logsCheckIntervalIterations == 0 {
c.findAndSaveTokenBlocks(parent, c.fromBlockNumber, headNum)
_ = c.findAndSaveTokenBlocks(parent, c.fromBlockNumber, headNum)
}
c.fromBlockNumber = headNum
c.iteration++
@ -245,20 +206,18 @@ func (c *findNewBlocksCommand) Run(parent context.Context) error {
return nil
}
func (c *findNewBlocksCommand) findAndSaveEthBlocks(parent context.Context, fromNum, headNum *big.Int, accounts []common.Address) {
func (c *findNewBlocksCommand) findAndSaveEthBlocks(parent context.Context, fromNum, headNum *big.Int, accounts []common.Address) error {
// Check ETH transfers for each account independently
mnemonicWasNotShown, err := c.accountsDB.GetMnemonicWasNotShown()
if err != nil {
c.error = err
return
return err
}
for _, account := range accounts {
if mnemonicWasNotShown {
acc, err := c.accountsDB.GetAccountByAddress(nodetypes.Address(account))
if err != nil {
c.error = err
return
return err
}
if acc.AddressWasNotShown {
log.Info("skip findNewBlocksCommand, mnemonic has not been shown and the address has not been shared yet", "address", account)
@ -270,8 +229,7 @@ func (c *findNewBlocksCommand) findAndSaveEthBlocks(parent context.Context, from
headers, startBlockNum, err := c.findBlocksWithEthTransfers(parent, account, fromNum, headNum)
if err != nil {
c.error = err
break
return err
}
if len(headers) > 0 {
@ -281,8 +239,7 @@ func (c *findNewBlocksCommand) findAndSaveEthBlocks(parent context.Context, from
err := c.db.SaveBlocks(c.chainClient.NetworkID(), headers)
if err != nil {
c.error = err
break
return err
}
c.blocksFound(headers)
@ -290,15 +247,16 @@ func (c *findNewBlocksCommand) findAndSaveEthBlocks(parent context.Context, from
err = c.markEthBlockRangeChecked(account, &BlockRange{startBlockNum, fromNum, headNum})
if err != nil {
c.error = err
break
return err
}
log.Debug("end findNewBlocksCommand", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", fromNum, "to", headNum)
}
return nil
}
func (c *findNewBlocksCommand) findAndSaveTokenBlocks(parent context.Context, fromNum, headNum *big.Int) {
func (c *findNewBlocksCommand) findAndSaveTokenBlocks(parent context.Context, fromNum, headNum *big.Int) error {
// Check token transfers for all accounts.
// Each account's last checked block can be different, so we can get duplicated headers,
// so we need to deduplicate them
@ -306,8 +264,7 @@ func (c *findNewBlocksCommand) findAndSaveTokenBlocks(parent context.Context, fr
erc20Headers, err := c.fastIndexErc20(parent, fromNum, headNum, incomingOnly)
if err != nil {
log.Error("findNewBlocksCommand fastIndexErc20", "err", err, "account", c.accounts, "chain", c.chainClient.NetworkID())
c.error = err
return
return err
}
if len(erc20Headers) > 0 {
@ -316,26 +273,20 @@ func (c *findNewBlocksCommand) findAndSaveTokenBlocks(parent context.Context, fr
// get not loaded headers from DB for all accs and blocks
preLoadedTransactions, err := c.db.GetTransactionsToLoad(c.chainClient.NetworkID(), common.Address{}, nil)
if err != nil {
c.error = err
return
return err
}
tokenBlocksFiltered := filterNewPreloadedTransactions(erc20Headers, preLoadedTransactions)
err = c.db.SaveBlocks(c.chainClient.NetworkID(), tokenBlocksFiltered)
if err != nil {
c.error = err
return
return err
}
c.blocksFound(tokenBlocksFiltered)
}
err = c.markTokenBlockRangeChecked(c.accounts, fromNum, headNum)
if err != nil {
c.error = err
return
}
return c.markTokenBlockRangeChecked(c.accounts, fromNum, headNum)
}
func (c *findNewBlocksCommand) markTokenBlockRangeChecked(accounts []common.Address, from, to *big.Int) error {
@ -344,7 +295,6 @@ func (c *findNewBlocksCommand) markTokenBlockRangeChecked(accounts []common.Addr
for _, account := range accounts {
err := c.blockRangeDAO.updateTokenRange(c.chainClient.NetworkID(), account, &BlockRange{LastKnown: to})
if err != nil {
c.error = err
log.Error("findNewBlocksCommand upsertTokenRange", "error", err)
return err
}
@ -447,13 +397,15 @@ type findBlocksCommand struct {
resFromBlock *Block
startBlockNumber *big.Int
reachedETHHistoryStart bool
error error
}
func (c *findBlocksCommand) Command() async.Command {
return async.FiniteCommand{
Interval: 5 * time.Second,
return async.FiniteCommandWithErrorCounter{
FiniteCommand: async.FiniteCommand{
Interval: findBlocksRetryInterval,
Runable: c.Run,
},
ErrorCounter: async.NewErrorCounter(3, "findBlocksCommand"), // totally 9 retries because the caller command retries 3 times
}.Run
}
@ -586,14 +538,12 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
account := c.accounts[0] // For now this command supports only 1 account
mnemonicWasNotShown, err := c.accountsDB.GetMnemonicWasNotShown()
if err != nil {
c.error = err
return err
}
if mnemonicWasNotShown {
account, err := c.accountsDB.GetAccountByAddress(nodetypes.BytesToAddress(account.Bytes()))
if err != nil {
c.error = err
return err
}
if account.AddressWasNotShown {
@ -621,18 +571,16 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
if c.fromBlockNumber.Cmp(zero) == 0 && c.startBlockNumber != nil && c.startBlockNumber.Cmp(zero) == 1 {
headers, err = c.checkERC20Tail(parent, account)
if err != nil {
c.error = err
log.Error("findBlocksCommand checkERC20Tail", "err", err, "account", account, "chain", c.chainClient.NetworkID())
break
}
}
} else {
headers, _ = c.checkRange(parent, from, to)
}
if c.error != nil {
log.Error("findBlocksCommand checkRange", "error", c.error, "account", account,
"chain", c.chainClient.NetworkID(), "from", from, "to", to)
headers, err = c.checkRange(parent, from, to)
if err != nil {
break
}
}
if len(headers) > 0 {
log.Debug("findBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", to,
@ -641,8 +589,6 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
err = c.db.SaveBlocks(c.chainClient.NetworkID(), headers)
if err != nil {
c.error = err
// return err
break
}
@ -682,9 +628,9 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
to = nextTo
}
log.Debug("end findBlocksCommand", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit)
log.Debug("end findBlocksCommand", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "err", err)
return nil
return err
}
func (c *findBlocksCommand) blocksFound(headers []*DBHeader) {
@ -697,7 +643,6 @@ func (c *findBlocksCommand) markEthBlockRangeChecked(account common.Address, blo
err := c.blockRangeDAO.upsertEthRange(c.chainClient.NetworkID(), account, blockRange)
if err != nil {
c.error = err
log.Error("findBlocksCommand upsertRange", "error", err)
return err
}
@ -715,9 +660,7 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to
if err != nil {
log.Error("findBlocksCommand checkRange fastIndex", "err", err, "account", account,
"chain", c.chainClient.NetworkID())
c.error = err
// return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers
return nil, nil
return nil, err
}
log.Debug("findBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", account,
"startBlock", startBlock, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit)
@ -727,9 +670,7 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to
erc20Headers, err := c.fastIndexErc20(parent, newFromBlock.Number, to, false)
if err != nil {
log.Error("findBlocksCommand checkRange fastIndexErc20", "err", err, "account", account, "chain", c.chainClient.NetworkID())
c.error = err
// return err
return nil, nil
return nil, err
}
allHeaders := append(ethHeaders, erc20Headers...)
@ -860,9 +801,9 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber
func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) {
go func() {
defer func() {
c.decStarted()
c.decLoops()
}()
c.incStarted()
c.incLoops()
log.Debug("loadTransfersLoop start", "chain", c.chainClient.NetworkID())
@ -909,7 +850,6 @@ func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, a
tokenManager: tokenManager,
blocksLoadedCh: make(chan []*DBHeader, 100),
omitHistory: omitHistory,
errorCounter: *newErrorCounter("loadBlocksAndTransfersCommand"),
contractMaker: tokenManager.ContractMaker,
blockChainState: blockChainState,
}
@ -935,27 +875,20 @@ type loadBlocksAndTransfersCommand struct {
// Not to be set by the caller
transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime
loops int
errorCounter
mu sync.Mutex
loops atomic.Int32
onExit func(ctx context.Context, err error)
}
func (c *loadBlocksAndTransfersCommand) incStarted() {
c.mu.Lock()
defer c.mu.Unlock()
c.loops++
func (c *loadBlocksAndTransfersCommand) incLoops() {
c.loops.Add(1)
}
func (c *loadBlocksAndTransfersCommand) decStarted() {
c.mu.Lock()
defer c.mu.Unlock()
c.loops--
func (c *loadBlocksAndTransfersCommand) decLoops() {
c.loops.Add(-1)
}
func (c *loadBlocksAndTransfersCommand) isStarted() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.loops > 0
return c.loops.Load() > 0
}
func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) (err error) {
@ -968,19 +901,19 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) (err error)
// Infinite processes (to be restarted on error):
// fetching new blocks
// fetching transfers for new blocks
ctx, cancel := context.WithCancel(parent)
if c.onExit == nil {
c.onExit = func(ctx context.Context, err error) { // is called on final exit
log.Debug("loadBlocksAndTransfersCommand onExit", "chain", c.chainClient.NetworkID(), "accounts", c.accounts, "error", err)
cancel()
}
}
finiteGroup := async.NewAtomicGroup(ctx)
finiteGroup.SetName("finiteGroup")
defer func() {
finiteGroup.Stop()
finiteGroup.Wait()
// if there was an error, and errors overflowed, stop the command
if err != nil && !c.setError(err) {
log.Error("loadBlocksAndTransfersCommand", "error", c.Error(), "err", err)
err = nil // stop the commands
cancel() // stop inner loops
}
}()
fromNum := big.NewInt(0)
@ -1013,13 +946,13 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) (err error)
log.Debug("loadBlocksAndTransfers command cancelled", "chain", c.chainClient.NetworkID(), "accounts", c.accounts, "error", ctx.Err())
case <-finiteGroup.WaitAsync():
err = finiteGroup.Error() // if there was an error, rerun the command
log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts, "error", err)
log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts, "error", err, "group", finiteGroup.Name())
}
return err
}
func (c *loadBlocksAndTransfersCommand) Command(interval ...time.Duration) async.Command {
func (c *loadBlocksAndTransfersCommand) Runner(interval ...time.Duration) async.Runner {
// 30s - default interval for Infura's delay returned in error. That should increase chances
// for request to succeed with the next attempt for now until we have a proper retry mechanism
intvl := 30 * time.Second
@ -1027,10 +960,18 @@ func (c *loadBlocksAndTransfersCommand) Command(interval ...time.Duration) async
intvl = interval[0]
}
return async.FiniteCommand{
return async.FiniteCommandWithErrorCounter{
FiniteCommand: async.FiniteCommand{
Interval: intvl,
Runable: c.Run,
}.Run
OnExit: &c.onExit,
},
ErrorCounter: async.NewErrorCounter(3, "loadBlocksAndTransfersCommand"),
}
}
func (c *loadBlocksAndTransfersCommand) Command(interval ...time.Duration) async.Command {
return c.Runner(interval...).Run
}
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(group *async.AtomicGroup, accounts []common.Address, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) (err error) {
@ -1117,8 +1058,6 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn
group.Add(fbc.Command())
}
log.Debug("fetchHistoryBlocks end", "chainID", c.chainClient.NetworkID(), "account", account)
return nil
}
@ -1127,9 +1066,9 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Conte
go func() {
defer func() {
c.decStarted()
c.decLoops()
}()
c.incStarted()
c.incLoops()
newBlocksCmd := &findNewBlocksCommand{
findBlocksCommand: &findBlocksCommand{

View File

@ -1522,24 +1522,24 @@ func TestLoadBlocksAndTransfersCommand_StopOnErrorsOverflow(t *testing.T) {
cmd := &loadBlocksAndTransfersCommand{
chainClient: tc,
errorCounter: *newErrorCounter("testLoadBlocksAndTransfersCommand"),
contractMaker: maker,
}
ctx := context.Background()
group := async.NewGroup(ctx)
group.Add(cmd.Command(1 * time.Millisecond))
runner := cmd.Runner(1 * time.Millisecond)
group.Add(runner.Run)
select {
case <-ctx.Done():
t.Log("Done")
case <-group.WaitAsync():
t.Log("Command finished", "error", cmd.Error())
require.Equal(t, cmd.maxErrors, tc.callsCounter["HeaderByNumber"])
errorCounter := runner.(async.FiniteCommandWithErrorCounter).ErrorCounter
require.Equal(t, errorCounter.MaxErrors(), tc.callsCounter["HeaderByNumber"])
_, expectedErr := tc.HeaderByNumber(ctx, nil)
require.Error(t, expectedErr, cmd.Error())
require.Error(t, expectedErr, errorCounter.Error())
}
}
@ -1585,15 +1585,16 @@ func TestLoadBlocksAndTransfersCommand_StopOnErrorsOverflowWhenStarted(t *testin
ctx := context.Background()
group := async.NewGroup(ctx)
group.Add(cmd.Command(1 * time.Millisecond))
runner := cmd.Runner(1 * time.Millisecond)
group.Add(runner.Run)
select {
case <-ctx.Done():
t.Log("Done")
case <-group.WaitAsync():
t.Log("Command finished", "error", cmd.Error())
errorCounter := runner.(async.FiniteCommandWithErrorCounter).ErrorCounter
_, expectedErr := cmd.blockRangeDAO.getBlockRange(0, common.Address{})
require.Error(t, expectedErr, cmd.Error())
require.Error(t, expectedErr, errorCounter.Error())
require.NoError(t, utils.Eventually(func() error {
if !cmd.isStarted() {
return nil
@ -1611,7 +1612,6 @@ func (b *BlockRangeSequentialDAOMockSuccess) getBlockRange(chainID uint64, addre
return newEthTokensBlockRanges(), nil
}
/*
func TestLoadBlocksAndTransfersCommand_FiniteFinishedInfiniteRunning(t *testing.T) {
appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
require.NoError(t, err)
@ -1646,15 +1646,16 @@ func TestLoadBlocksAndTransfersCommand_FiniteFinishedInfiniteRunning(t *testing.
ctx, cancel := context.WithCancel(context.Background())
group := async.NewGroup(ctx)
group.Add(cmd.Command(1 * time.Millisecond))
runner := cmd.Runner(1 * time.Millisecond)
group.Add(runner.Run)
select {
case <-ctx.Done():
cancel() // linter is not happy if cancel is not called on all code paths
t.Log("Done")
case <-group.WaitAsync():
t.Log("Command finished", "error", cmd.Error())
require.NoError(t, cmd.Error())
errorCounter := runner.(async.FiniteCommandWithErrorCounter).ErrorCounter
require.NoError(t, errorCounter.Error())
require.True(t, cmd.isStarted())
cancel()
@ -1666,4 +1667,3 @@ func TestLoadBlocksAndTransfersCommand_FiniteFinishedInfiniteRunning(t *testing.
}, 100*time.Millisecond, 10*time.Millisecond))
}
}
*/