From f6c219c83932052211ce67c8cae0c5b08f70e55a Mon Sep 17 00:00:00 2001 From: Ivan Belyakov Date: Fri, 19 Jan 2024 13:43:25 +0100 Subject: [PATCH] fix(wallet): Made loadBlocksAndTransfers command FiniteCommand to keep restarting on error non-stop, removed some tests accordingly. Fixed a flaky test for loadBlocksAndTransfers command. Added tests for async.AtomicGroup. Made tranfersCommand FiniteCommandWithErrorCounter to prevent infinite restart. --- services/wallet/async/async.go | 12 +- services/wallet/async/async_test.go | 52 +++++++ services/wallet/transfer/commands.go | 29 +++- .../wallet/transfer/commands_sequential.go | 46 +++--- .../transfer/commands_sequential_test.go | 138 ++++++------------ 5 files changed, 141 insertions(+), 136 deletions(-) create mode 100644 services/wallet/async/async_test.go diff --git a/services/wallet/async/async.go b/services/wallet/async/async.go index c4b90a919..cdfb3a8d2 100644 --- a/services/wallet/async/async.go +++ b/services/wallet/async/async.go @@ -47,7 +47,6 @@ func (c SingleShotCommand) Run(ctx context.Context) error { type FiniteCommand struct { Interval time.Duration Runable func(context.Context) error - OnExit *func(context.Context, error) } func (c FiniteCommand) Run(ctx context.Context) error { @@ -134,7 +133,7 @@ func NewAtomicGroup(parent context.Context) *AtomicGroup { return ag } -// AtomicGroup terminates as soon as first goroutine terminates.. +// AtomicGroup terminates as soon as first goroutine terminates with error. type AtomicGroup struct { ctx context.Context cancel func() @@ -319,18 +318,11 @@ func (c FiniteCommandWithErrorCounter) Run(ctx context.Context) error { if c.ErrorCounter.SetError(err) { return false, err - } else { - return true, err } + return true, err } quit, err := f(ctx) - defer func() { - if c.OnExit != nil { - (*c.OnExit)(ctx, err) - } - }() - if quit { return err } diff --git a/services/wallet/async/async_test.go b/services/wallet/async/async_test.go new file mode 100644 index 000000000..573a01a26 --- /dev/null +++ b/services/wallet/async/async_test.go @@ -0,0 +1,52 @@ +package async + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestAtomicGroupTerminatesOnOneCommandFailed(t *testing.T) { + ctx := context.Background() + group := NewAtomicGroup(ctx) + + err := errors.New("error") + group.Add(func(ctx context.Context) error { + return err // failure + }) + group.Add(func(ctx context.Context) error { + <-ctx.Done() + return nil + }) + + group.Wait() + require.Equal(t, err, group.Error()) +} + +func TestAtomicGroupWaitsAllToFinish(t *testing.T) { + ctx := context.Background() + group := NewAtomicGroup(ctx) + + finished := false + group.Add(func(ctx context.Context) error { + time.Sleep(1 * time.Millisecond) + return nil // success + }) + group.Add(func(ctx context.Context) error { + for { + select { + case <-ctx.Done(): + return nil + case <-time.After(3 * time.Millisecond): + finished = true + return nil + } + } + }) + + group.Wait() + require.True(t, finished) +} diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index 3e6ea8aaf..652175924 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -57,6 +57,8 @@ var ( goerliArbitrumChainID = uint64(421613) goerliOptimismChainID = uint64(420) binanceTestChainID = uint64(97) + + transfersRetryInterval = 5 * time.Second ) type ethHistoricalCommand struct { @@ -195,11 +197,22 @@ type transfersCommand struct { fetchedTransfers []Transfer } -func (c *transfersCommand) Command() async.Command { - return async.FiniteCommand{ - Interval: 5 * time.Second, - Runable: c.Run, - }.Run +func (c *transfersCommand) Runner(interval ...time.Duration) async.Runner { + intvl := transfersRetryInterval + if len(interval) > 0 { + intvl = interval[0] + } + return async.FiniteCommandWithErrorCounter{ + FiniteCommand: async.FiniteCommand{ + Interval: intvl, + Runable: c.Run, + }, + ErrorCounter: async.NewErrorCounter(5, "transfersCommand"), + } +} + +func (c *transfersCommand) Command(interval ...time.Duration) async.Command { + return c.Runner(interval...).Run } func (c *transfersCommand) Run(ctx context.Context) (err error) { @@ -529,6 +542,11 @@ func (c *loadTransfersCommand) Command() async.Command { }.Run } +// This command always returs nil, even if there is an error in one of the commands. +// `transferCommand`s retry until maxError, but this command doesn't retry. +// In case some transfer is not loaded after max retries, it will be retried only after restart of the app. +// Currently there is no implementation to keep retrying until success. I think this should be implemented +// in `transferCommand` with exponential backoff instead of `loadTransfersCommand` (issue #4608). func (c *loadTransfersCommand) Run(parent context.Context) (err error) { return loadTransfers(parent, c.blockDAO, c.db, c.chainClient, c.blocksLimit, c.blocksByAddress, c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed) @@ -566,7 +584,6 @@ func loadTransfers(ctx context.Context, blockDAO *BlockDAO, db *Database, group.Add(transfers.Command()) } - // loadTransfers command will be restarted in case of error, but if context is cancelled, we should stop select { case <-ctx.Done(): log.Debug("loadTransfers cancelled", "chain", chainClient.NetworkID(), "error", ctx.Err()) diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index 0d98ff4c7..643ac7325 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -399,14 +399,22 @@ type findBlocksCommand struct { reachedETHHistoryStart bool } -func (c *findBlocksCommand) Command() async.Command { +func (c *findBlocksCommand) Runner(interval ...time.Duration) async.Runner { + intvl := findBlocksRetryInterval + if len(interval) > 0 { + intvl = interval[0] + } return async.FiniteCommandWithErrorCounter{ FiniteCommand: async.FiniteCommand{ - Interval: findBlocksRetryInterval, + Interval: intvl, Runable: c.Run, }, - ErrorCounter: async.NewErrorCounter(3, "findBlocksCommand"), // totally 9 retries because the caller command retries 3 times - }.Run + ErrorCounter: async.NewErrorCounter(3, "findBlocksCommand"), + } +} + +func (c *findBlocksCommand) Command(interval ...time.Duration) async.Command { + return c.Runner(interval...).Run } type ERC20BlockRange struct { @@ -799,11 +807,11 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber // Start transfers loop to load transfers for new blocks func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) { + c.incLoops() go func() { defer func() { c.decLoops() }() - c.incLoops() log.Debug("loadTransfersLoop start", "chain", c.chainClient.NetworkID()) @@ -876,7 +884,7 @@ type loadBlocksAndTransfersCommand struct { // Not to be set by the caller transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime loops atomic.Int32 - onExit func(ctx context.Context, err error) + // onExit func(ctx context.Context, err error) } func (c *loadBlocksAndTransfersCommand) incLoops() { @@ -894,21 +902,15 @@ func (c *loadBlocksAndTransfersCommand) isStarted() bool { func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) (err error) { log.Debug("start load all transfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts) - // Finite processes (to be restarted on error, but stopped on success): + // Finite processes (to be restarted on error, but stopped on success or context cancel): // fetching transfers for loaded blocks // fetching history blocks - // Infinite processes (to be restarted on error): + // Infinite processes (to be restarted on error), but stopped on context cancel: // fetching new blocks // fetching transfers for new blocks - ctx, cancel := context.WithCancel(parent) - if c.onExit == nil { - c.onExit = func(ctx context.Context, err error) { // is called on final exit - log.Debug("loadBlocksAndTransfersCommand onExit", "chain", c.chainClient.NetworkID(), "accounts", c.accounts, "error", err) - cancel() - } - } + ctx := parent finiteGroup := async.NewAtomicGroup(ctx) finiteGroup.SetName("finiteGroup") defer func() { @@ -922,7 +924,7 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) (err error) return err } - // It will start loadTransfersCommand which will run until success when all transfers from DB are loaded + // It will start loadTransfersCommand which will run until all transfers from DB are loaded or any one failed to load err = c.startFetchingTransfersForLoadedBlocks(finiteGroup) if err != nil { log.Error("loadBlocksAndTransfersCommand fetchTransfersForLoadedBlocks", "error", err) @@ -960,13 +962,9 @@ func (c *loadBlocksAndTransfersCommand) Runner(interval ...time.Duration) async. intvl = interval[0] } - return async.FiniteCommandWithErrorCounter{ - FiniteCommand: async.FiniteCommand{ - Interval: intvl, - Runable: c.Run, - OnExit: &c.onExit, - }, - ErrorCounter: async.NewErrorCounter(3, "loadBlocksAndTransfersCommand"), + return async.FiniteCommand{ + Interval: intvl, + Runable: c.Run, } } @@ -1064,11 +1062,11 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Context, addresses []common.Address, fromNum *big.Int, blocksLoadedCh chan<- []*DBHeader) { log.Debug("startFetchingNewBlocks start", "chainID", c.chainClient.NetworkID(), "accounts", addresses) + c.incLoops() go func() { defer func() { c.decLoops() }() - c.incLoops() newBlocksCmd := &findNewBlocksCommand{ findBlocksCommand: &findBlocksCommand{ diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 3f5fe923a..36fbb9dd4 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -1498,51 +1498,15 @@ type TestClientWithError struct { *TestClient } -func (tc *TestClientWithError) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { - tc.incCounter("HeaderByNumber") +func (tc *TestClientWithError) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + tc.incCounter("BlockByNumber") if tc.traceAPICalls { - tc.t.Log("HeaderByNumber", number) + tc.t.Log("BlockByNumber", number) } return nil, errors.New("Network error") } -func TestLoadBlocksAndTransfersCommand_StopOnErrorsOverflow(t *testing.T) { - tc := &TestClientWithError{ - &TestClient{ - t: t, - callsCounter: map[string]int{}, - }, - } - - db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) - require.NoError(t, err) - client, _ := statusRpc.NewClient(nil, 1, params.UpstreamRPCConfig{Enabled: false, URL: ""}, []params.Network{}, db) - maker, _ := contracts.NewContractMaker(client) - - cmd := &loadBlocksAndTransfersCommand{ - chainClient: tc, - contractMaker: maker, - } - - ctx := context.Background() - group := async.NewGroup(ctx) - - runner := cmd.Runner(1 * time.Millisecond) - group.Add(runner.Run) - - select { - case <-ctx.Done(): - t.Log("Done") - case <-group.WaitAsync(): - errorCounter := runner.(async.FiniteCommandWithErrorCounter).ErrorCounter - require.Equal(t, errorCounter.MaxErrors(), tc.callsCounter["HeaderByNumber"]) - - _, expectedErr := tc.HeaderByNumber(ctx, nil) - require.Error(t, expectedErr, errorCounter.Error()) - } -} - type BlockRangeSequentialDAOMockError struct { *BlockRangeSequentialDAO } @@ -1551,59 +1515,6 @@ func (b *BlockRangeSequentialDAOMockError) getBlockRange(chainID uint64, address return nil, errors.New("DB error") } -func TestLoadBlocksAndTransfersCommand_StopOnErrorsOverflowWhenStarted(t *testing.T) { - appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{}) - require.NoError(t, err) - - db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{}) - require.NoError(t, err) - - client, _ := statusRpc.NewClient(nil, 1, params.UpstreamRPCConfig{Enabled: false, URL: ""}, []params.Network{}, db) - maker, _ := contracts.NewContractMaker(client) - - wdb := NewDB(db) - tc := &TestClient{ - t: t, - callsCounter: map[string]int{}, - } - accDB, err := accounts.NewDB(appdb) - require.NoError(t, err) - - cmd := &loadBlocksAndTransfersCommand{ - accounts: []common.Address{common.HexToAddress("0x1234")}, - chainClient: tc, - blockDAO: &BlockDAO{db}, - blockRangeDAO: &BlockRangeSequentialDAOMockError{ - &BlockRangeSequentialDAO{ - wdb.client, - }, - }, - accountsDB: accDB, - contractMaker: maker, - } - - ctx := context.Background() - group := async.NewGroup(ctx) - - runner := cmd.Runner(1 * time.Millisecond) - group.Add(runner.Run) - - select { - case <-ctx.Done(): - t.Log("Done") - case <-group.WaitAsync(): - errorCounter := runner.(async.FiniteCommandWithErrorCounter).ErrorCounter - _, expectedErr := cmd.blockRangeDAO.getBlockRange(0, common.Address{}) - require.Error(t, expectedErr, errorCounter.Error()) - require.NoError(t, utils.Eventually(func() error { - if !cmd.isStarted() { - return nil - } - return errors.New("command is still running") - }, 100*time.Millisecond, 10*time.Millisecond)) - } -} - type BlockRangeSequentialDAOMockSuccess struct { *BlockRangeSequentialDAO } @@ -1646,18 +1557,16 @@ func TestLoadBlocksAndTransfersCommand_FiniteFinishedInfiniteRunning(t *testing. ctx, cancel := context.WithCancel(context.Background()) group := async.NewGroup(ctx) - runner := cmd.Runner(1 * time.Millisecond) - group.Add(runner.Run) + group.Add(cmd.Command(1 * time.Millisecond)) select { case <-ctx.Done(): cancel() // linter is not happy if cancel is not called on all code paths t.Log("Done") case <-group.WaitAsync(): - errorCounter := runner.(async.FiniteCommandWithErrorCounter).ErrorCounter - require.NoError(t, errorCounter.Error()) require.True(t, cmd.isStarted()) + // Test that it stops if canceled cancel() require.NoError(t, utils.Eventually(func() error { if !cmd.isStarted() { @@ -1667,3 +1576,40 @@ func TestLoadBlocksAndTransfersCommand_FiniteFinishedInfiniteRunning(t *testing. }, 100*time.Millisecond, 10*time.Millisecond)) } } + +func TestTransfersCommand_RetryAndQuitOnMaxError(t *testing.T) { + tc := &TestClientWithError{ + &TestClient{ + t: t, + callsCounter: map[string]int{}, + }, + } + + address := common.HexToAddress("0x1234") + cmd := &transfersCommand{ + chainClient: tc, + address: address, + eth: ÐDownloader{ + chainClient: tc, + accounts: []common.Address{address}, + }, + blockNums: []*big.Int{big.NewInt(1)}, + } + + ctx := context.Background() + group := async.NewGroup(ctx) + + runner := cmd.Runner(1 * time.Millisecond) + group.Add(runner.Run) + + select { + case <-ctx.Done(): + t.Log("Done") + case <-group.WaitAsync(): + errorCounter := runner.(async.FiniteCommandWithErrorCounter).ErrorCounter + require.Equal(t, errorCounter.MaxErrors(), tc.callsCounter["BlockByNumber"]) + + _, expectedErr := tc.BlockByNumber(context.TODO(), nil) + require.Error(t, expectedErr, errorCounter.Error()) + } +}