fix(wallet): Made an interface for BlockRangesSequentialDAO to

mock it in tests.
Made a configurable timeout interval for Commander interface.
Added tests to verify loadBlocksAndTransfers command is stopped
correctly on max errors limit reached
This commit is contained in:
Ivan Belyakov 2023-12-11 14:29:10 +01:00 committed by IvanBelyakoff
parent 670954b71b
commit 81073b208e
4 changed files with 301 additions and 72 deletions

View File

@ -9,7 +9,7 @@ import (
type Command func(context.Context) error
type Commander interface {
Command() Command
Command(inteval ...time.Duration) Command
}
// SingleShotCommand runs once.

View File

@ -9,6 +9,13 @@ import (
"github.com/status-im/status-go/services/wallet/bigint"
)
type BlockRangeDAOer interface {
getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, err error)
upsertRange(chainID uint64, account common.Address, newBlockRange *ethTokensBlockRanges) (err error)
updateTokenRange(chainID uint64, account common.Address, newBlockRange *BlockRange) (err error)
upsertEthRange(chainID uint64, account common.Address, newBlockRange *BlockRange) (err error)
}
type BlockRangeSequentialDAO struct {
db *sql.DB
}

View File

@ -3,6 +3,7 @@ package transfer
import (
"context"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
@ -19,14 +20,38 @@ import (
"github.com/status-im/status-go/transactions"
)
var trLoopCnt int = 0
var fetchNewBlocksCnt int = 0
func newErrorCounter(msg string) *errorCounter {
return &errorCounter{maxErrors: 3, err: nil, cnt: 0, msg: msg}
}
func verifyOnce(cnt *int, msg string) {
if *cnt > 0 {
panic("verifyOnce, function: " + msg)
type errorCounter struct {
cnt int
maxErrors int
err error
msg string
}
// Returns false in case of counter overflow
func (ec *errorCounter) setError(err error) bool {
log.Debug("errorCounter setError", "msg", ec.msg, "err", err, "cnt", ec.cnt)
ec.cnt++
// do not overwrite the first error
if ec.err == nil {
ec.err = err
}
*cnt++
if ec.cnt >= ec.maxErrors {
log.Error("errorCounter overflow", "msg", ec.msg)
return false
}
return true
}
func (ec *errorCounter) Error() error {
return ec.err
}
type findNewBlocksCommand struct {
@ -86,7 +111,12 @@ func (c *findNewBlocksCommand) findAndSaveEthBlocks(parent context.Context, from
log.Debug("start findNewBlocksCommand", "account", account, "chain", c.chainClient.NetworkID(), "noLimit", c.noLimit, "from", fromNum, "to", headNum)
headers, startBlockNum, _ := c.findBlocksWithEthTransfers(parent, account, fromNum, headNum)
headers, startBlockNum, err := c.findBlocksWithEthTransfers(parent, account, fromNum, headNum)
if err != nil {
c.error = err
break
}
if len(headers) > 0 {
log.Debug("findNewBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", headNum,
"balance", c.balanceCacher.Cache().GetBalance(account, c.chainClient.NetworkID(), headNum),
@ -101,7 +131,7 @@ func (c *findNewBlocksCommand) findAndSaveEthBlocks(parent context.Context, from
c.blocksFound(headers)
}
err := c.markEthBlockRangeChecked(account, &BlockRange{startBlockNum, fromNum, headNum})
err = c.markEthBlockRangeChecked(account, &BlockRange{startBlockNum, fromNum, headNum})
if err != nil {
c.error = err
break
@ -211,9 +241,7 @@ func (c *findNewBlocksCommand) findBlocksWithEthTransfers(parent context.Context
if err != nil {
log.Error("findNewBlocksCommand checkRange fastIndex", "err", err, "account", account,
"chain", c.chainClient.NetworkID())
c.error = err
// return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers
return nil, nil, nil
return nil, nil, err
}
log.Debug("findNewBlocksCommand checkRange", "chainID", c.chainClient.NetworkID(), "account", account,
"startBlock", startBlockNum, "newFromBlock", newFromBlock.Number, "toBlockNumber", to, "noLimit", c.noLimit)
@ -246,7 +274,7 @@ type findBlocksCommand struct {
accounts []common.Address
db *Database
accountsDB *accounts.Database
blockRangeDAO *BlockRangeSequentialDAO
blockRangeDAO BlockRangeDAOer
chainClient chain.ClientInterface
balanceCacher balance.Cacher
feed *event.Feed
@ -565,7 +593,7 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to
return
}
func loadBlockRangeInfo(chainID uint64, account common.Address, blockDAO *BlockRangeSequentialDAO) (
func loadBlockRangeInfo(chainID uint64, account common.Address, blockDAO BlockRangeDAOer) (
*ethTokensBlockRanges, error) {
blockRange, err := blockDAO.getBlockRange(chainID, account)
@ -589,7 +617,7 @@ func areAllHistoryBlocksLoaded(blockInfo *BlockRange) bool {
return false
}
func areAllHistoryBlocksLoadedForAddress(blockRangeDAO *BlockRangeSequentialDAO, chainID uint64,
func areAllHistoryBlocksLoadedForAddress(blockRangeDAO BlockRangeDAOer, chainID uint64,
address common.Address) (bool, error) {
blockRange, err := blockRangeDAO.getBlockRange(chainID, address)
@ -673,38 +701,41 @@ func (c *findBlocksCommand) fastIndexErc20(ctx context.Context, fromBlockNumber
}
}
func loadTransfersLoop(ctx context.Context, blockDAO *BlockDAO, db *Database,
chainClient chain.ClientInterface, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker,
tokenManager *token.Manager, feed *event.Feed, blocksLoadedCh <-chan []*DBHeader) {
// Start transfers loop to load transfers for new blocks
func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) {
go func() {
defer func() {
c.decStarted()
}()
c.incStarted()
log.Debug("loadTransfersLoop start", "chain", chainClient.NetworkID())
log.Debug("loadTransfersLoop start", "chain", c.chainClient.NetworkID())
verifyOnce(&trLoopCnt, "loadTransfersLoop")
for {
select {
case <-ctx.Done():
log.Debug("startTransfersLoop done", "chain", c.chainClient.NetworkID(), "error", ctx.Err())
return
case dbHeaders := <-c.blocksLoadedCh:
log.Debug("loadTransfersOnDemand transfers received", "chain", c.chainClient.NetworkID(), "headers", len(dbHeaders))
for {
select {
case <-ctx.Done():
log.Info("loadTransfersLoop done", "chain", chainClient.NetworkID(), "error", ctx.Err())
return
case dbHeaders := <-blocksLoadedCh:
log.Debug("loadTransfersOnDemand transfers received", "chain", chainClient.NetworkID(), "headers", len(dbHeaders))
blocksByAddress := map[common.Address][]*big.Int{}
// iterate over headers and group them by address
for _, dbHeader := range dbHeaders {
blocksByAddress[dbHeader.Address] = append(blocksByAddress[dbHeader.Address], dbHeader.Number)
}
blocksByAddress := map[common.Address][]*big.Int{}
// iterate over headers and group them by address
for _, dbHeader := range dbHeaders {
blocksByAddress[dbHeader.Address] = append(blocksByAddress[dbHeader.Address], dbHeader.Number)
go func() {
_ = loadTransfers(ctx, c.blockDAO, c.db, c.chainClient, noBlockLimit,
blocksByAddress, c.transactionManager, c.pendingTxManager, c.tokenManager, c.feed)
}()
}
go func() {
_ = loadTransfers(ctx, blockDAO, db, chainClient, noBlockLimit,
blocksByAddress, transactionManager, pendingTxManager, tokenManager, feed)
}()
}
}
}()
}
func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, accountsDB *accounts.Database,
blockDAO *BlockDAO, blockRangesSeqDAO *BlockRangeSequentialDAO, chainClient chain.ClientInterface, feed *event.Feed,
blockDAO *BlockDAO, blockRangesSeqDAO BlockRangeDAOer, chainClient chain.ClientInterface, feed *event.Feed,
transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker,
tokenManager *token.Manager, balanceCacher balance.Cacher, omitHistory bool) *loadBlocksAndTransfersCommand {
@ -722,6 +753,7 @@ func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, a
tokenManager: tokenManager,
blocksLoadedCh: make(chan []*DBHeader, 100),
omitHistory: omitHistory,
errorCounter: *newErrorCounter("loadBlocksAndTransfersCommand"),
}
}
@ -729,7 +761,7 @@ type loadBlocksAndTransfersCommand struct {
accounts []common.Address
db *Database
accountsDB *accounts.Database
blockRangeDAO *BlockRangeSequentialDAO
blockRangeDAO BlockRangeDAOer
blockDAO *BlockDAO
chainClient chain.ClientInterface
feed *event.Feed
@ -743,11 +775,30 @@ type loadBlocksAndTransfersCommand struct {
// Not to be set by the caller
transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime
started bool
loops int
errorCounter
mu sync.Mutex
}
// func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error {
func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error {
func (c *loadBlocksAndTransfersCommand) incStarted() {
c.mu.Lock()
defer c.mu.Unlock()
c.loops++
}
func (c *loadBlocksAndTransfersCommand) decStarted() {
c.mu.Lock()
defer c.mu.Unlock()
c.loops--
}
func (c *loadBlocksAndTransfersCommand) isStarted() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.loops > 0
}
func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) (err error) {
log.Debug("start load all transfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts)
// Finite processes (to be restarted on error, but stopped on success):
@ -758,33 +809,40 @@ func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error {
// fetching new blocks
// fetching transfers for new blocks
ctx, cancel := context.WithCancel(parent)
finiteGroup := async.NewAtomicGroup(ctx)
defer func() {
finiteGroup.Stop()
finiteGroup.Wait()
// if there was an error, and errors overflowed, stop the command
if err != nil && !c.setError(err) {
log.Error("loadBlocksAndTransfersCommand", "error", c.Error(), "err", err)
err = nil // stop the commands
cancel() // stop inner loops
}
}()
fromNum := big.NewInt(0)
headNum, err := getHeadBlockNumber(ctx, c.chainClient)
if err != nil {
return err
}
group := async.NewAtomicGroup(ctx)
defer func() {
group.Stop()
group.Wait()
}()
// It will start loadTransfersCommand which will run until success when all transfers from DB are loaded
err = c.startFetchingTransfersForLoadedBlocks(group)
err = c.startFetchingTransfersForLoadedBlocks(finiteGroup)
if err != nil {
log.Error("loadBlocksAndTransfersCommand fetchTransfersForLoadedBlocks", "error", err)
return err
}
if !c.started {
c.started = true
if !c.isStarted() {
c.startTransfersLoop(ctx)
c.startFetchingNewBlocks(ctx, c.accounts, headNum, c.blocksLoadedCh)
}
// It will start findBlocksCommands which will run until success when all blocks are loaded
err = c.fetchHistoryBlocks(group, c.accounts, fromNum, headNum, c.blocksLoadedCh)
err = c.fetchHistoryBlocks(finiteGroup, c.accounts, fromNum, headNum, c.blocksLoadedCh)
if err != nil {
log.Error("loadBlocksAndTransfersCommand fetchHistoryBlocks", "error", err)
return err
@ -792,27 +850,29 @@ func (c *loadBlocksAndTransfersCommand) Run(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-group.WaitAsync():
log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts)
return nil
log.Debug("loadBlocksAndTransfers command cancelled", "chain", c.chainClient.NetworkID(), "accounts", c.accounts, "error", ctx.Err())
case <-finiteGroup.WaitAsync():
err = finiteGroup.Error() // if there was an error, rerun the command
log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.NetworkID(), "accounts", c.accounts, "error", err)
}
return nil
return err
}
func (c *loadBlocksAndTransfersCommand) Command() async.Command {
return async.InfiniteCommand{
Interval: 5 * time.Second,
func (c *loadBlocksAndTransfersCommand) Command(interval ...time.Duration) async.Command {
// 30s - default interval for Infura's delay returned in error. That should increase chances
// for request to succeed with the next attempt for now until we have a proper retry mechanism
intvl := 30 * time.Second
if len(interval) > 0 {
intvl = interval[0]
}
return async.FiniteCommand{
Interval: intvl,
Runable: c.Run,
}.Run
}
// Start transfers loop to load transfers for new blocks
func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) {
go loadTransfersLoop(ctx, c.blockDAO, c.db, c.chainClient, c.transactionManager,
c.pendingTxManager, c.tokenManager, c.feed, c.blocksLoadedCh)
}
func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(group *async.AtomicGroup, accounts []common.Address, fromNum, toNum *big.Int, blocksLoadedCh chan []*DBHeader) (err error) {
for _, account := range accounts {
err = c.fetchHistoryBlocksForAccount(group, account, fromNum, toNum, c.blocksLoadedCh)
@ -837,8 +897,7 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn
blockRange, err := loadBlockRangeInfo(c.chainClient.NetworkID(), account, c.blockRangeDAO)
if err != nil {
log.Error("fetchHistoryBlocks loadBlockRangeInfo", "error", err)
// c.error = err
return err // Will keep spinning forever nomatter what
return err
}
ranges := [][]*big.Int{}
@ -906,9 +965,12 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn
func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Context, addresses []common.Address, fromNum *big.Int, blocksLoadedCh chan<- []*DBHeader) {
log.Debug("startFetchingNewBlocks start", "chainID", c.chainClient.NetworkID(), "accounts", addresses)
verifyOnce(&fetchNewBlocksCnt, "startFetchingNewBlocks")
go func() {
defer func() {
c.decStarted()
}()
c.incStarted()
newBlocksCmd := &findNewBlocksCommand{
findBlocksCommand: &findBlocksCommand{
accounts: addresses,
@ -931,9 +993,9 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Conte
// No need to wait for the group since it is infinite
<-ctx.Done()
}()
log.Debug("startFetchingNewBlocks end", "chainID", c.chainClient.NetworkID(), "accounts", addresses, "error", ctx.Err())
log.Debug("startFetchingNewBlocks end", "chainID", c.chainClient.NetworkID(), "accounts", addresses, "error", ctx.Err())
}()
}
func (c *loadBlocksAndTransfersCommand) getBlocksToLoad() (map[common.Address][]*big.Int, error) {

View File

@ -10,6 +10,7 @@ import (
"testing"
"time"
"github.com/pkg/errors"
"github.com/stretchr/testify/mock"
"golang.org/x/exp/slices" // since 1.21, this is in the standard library
@ -29,6 +30,7 @@ import (
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/t/utils"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/params"
@ -1337,3 +1339,161 @@ func TestFetchNewBlocksCommand(t *testing.T) {
// We must check all the logs for all accounts with a single iteration of eth_getLogs call
require.Equal(t, 3, tc.callsCounter["FilterLogs"], "calls to FilterLogs")
}
type TestClientWithError struct {
*TestClient
}
func (tc *TestClientWithError) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) {
tc.incCounter("HeaderByNumber")
if tc.traceAPICalls {
tc.t.Log("HeaderByNumber", number)
}
return nil, errors.New("Network error")
}
func TestLoadBlocksAndTransfersCommand_StopOnErrorsOverflow(t *testing.T) {
tc := &TestClientWithError{
&TestClient{
t: t,
callsCounter: map[string]int{},
},
}
cmd := &loadBlocksAndTransfersCommand{
chainClient: tc,
errorCounter: *newErrorCounter("testLoadBlocksAndTransfersCommand"),
}
ctx := context.Background()
group := async.NewGroup(ctx)
group.Add(cmd.Command(1 * time.Millisecond))
select {
case <-ctx.Done():
t.Log("Done")
case <-group.WaitAsync():
t.Log("Command finished", "error", cmd.Error())
require.Equal(t, cmd.maxErrors, tc.callsCounter["HeaderByNumber"])
_, expectedErr := tc.HeaderByNumber(ctx, nil)
require.Error(t, expectedErr, cmd.Error())
}
}
type BlockRangeSequentialDAOMockError struct {
*BlockRangeSequentialDAO
}
func (b *BlockRangeSequentialDAOMockError) getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, err error) {
return nil, errors.New("DB error")
}
func TestLoadBlocksAndTransfersCommand_StopOnErrorsOverflowWhenStarted(t *testing.T) {
appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
require.NoError(t, err)
db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{})
require.NoError(t, err)
wdb := NewDB(db)
tc := &TestClient{
t: t,
callsCounter: map[string]int{},
}
accDB, err := accounts.NewDB(appdb)
require.NoError(t, err)
cmd := &loadBlocksAndTransfersCommand{
accounts: []common.Address{common.HexToAddress("0x1234")},
chainClient: tc,
blockDAO: &BlockDAO{db},
blockRangeDAO: &BlockRangeSequentialDAOMockError{
&BlockRangeSequentialDAO{
wdb.client,
},
},
accountsDB: accDB,
}
ctx := context.Background()
group := async.NewGroup(ctx)
group.Add(cmd.Command(1 * time.Millisecond))
select {
case <-ctx.Done():
t.Log("Done")
case <-group.WaitAsync():
t.Log("Command finished", "error", cmd.Error())
_, expectedErr := cmd.blockRangeDAO.getBlockRange(0, common.Address{})
require.Error(t, expectedErr, cmd.Error())
require.NoError(t, utils.Eventually(func() error {
if !cmd.isStarted() {
return nil
}
return errors.New("command is still running")
}, 100*time.Millisecond, 10*time.Millisecond))
}
}
type BlockRangeSequentialDAOMockSuccess struct {
*BlockRangeSequentialDAO
}
func (b *BlockRangeSequentialDAOMockSuccess) getBlockRange(chainID uint64, address common.Address) (blockRange *ethTokensBlockRanges, err error) {
return newEthTokensBlockRanges(), nil
}
func TestLoadBlocksAndTransfersCommand_FiniteFinishedInfiniteRunning(t *testing.T) {
appdb, err := helpers.SetupTestMemorySQLDB(appdatabase.DbInitializer{})
require.NoError(t, err)
db, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{})
require.NoError(t, err)
wdb := NewDB(db)
tc := &TestClient{
t: t,
callsCounter: map[string]int{},
}
accDB, err := accounts.NewDB(appdb)
require.NoError(t, err)
cmd := &loadBlocksAndTransfersCommand{
accounts: []common.Address{common.HexToAddress("0x1234")},
chainClient: tc,
blockDAO: &BlockDAO{db},
blockRangeDAO: &BlockRangeSequentialDAOMockSuccess{
&BlockRangeSequentialDAO{
wdb.client,
},
},
accountsDB: accDB,
}
ctx, cancel := context.WithCancel(context.Background())
group := async.NewGroup(ctx)
group.Add(cmd.Command(1 * time.Millisecond))
select {
case <-ctx.Done():
cancel() // linter is not happy if cancel is not called on all code paths
t.Log("Done")
case <-group.WaitAsync():
t.Log("Command finished", "error", cmd.Error())
require.NoError(t, cmd.Error())
require.True(t, cmd.isStarted())
cancel()
require.NoError(t, utils.Eventually(func() error {
if !cmd.isStarted() {
return nil
}
return errors.New("command is still running")
}, 100*time.Millisecond, 10*time.Millisecond))
}
}