From 9b7eec0edb650d4b1300c6edaaf0c82f1bf64233 Mon Sep 17 00:00:00 2001 From: Roman Volosovskyi Date: Thu, 25 Jan 2024 13:05:59 +0100 Subject: [PATCH] [#4603] Get rid of fetchLatestBlockNumberCommand (#4614) --- services/wallet/blockchainstate.go | 149 ------------------ .../wallet/blockchainstate/blockchainstate.go | 63 ++++++++ .../blockchainstate_test.go | 4 +- services/wallet/service.go | 9 +- .../wallet/transfer/commands_sequential.go | 16 +- .../transfer/commands_sequential_test.go | 4 +- services/wallet/transfer/controller.go | 7 +- services/wallet/transfer/controller_test.go | 5 + services/wallet/transfer/reactor.go | 6 +- .../transfer/sequential_fetch_strategy.go | 6 +- 10 files changed, 104 insertions(+), 165 deletions(-) delete mode 100644 services/wallet/blockchainstate.go create mode 100644 services/wallet/blockchainstate/blockchainstate.go rename services/wallet/{ => blockchainstate}/blockchainstate_test.go (95%) diff --git a/services/wallet/blockchainstate.go b/services/wallet/blockchainstate.go deleted file mode 100644 index fcb59b0f6..000000000 --- a/services/wallet/blockchainstate.go +++ /dev/null @@ -1,149 +0,0 @@ -package wallet - -import ( - "context" - "sync" - "time" - - "github.com/ethereum/go-ethereum/log" - - "github.com/status-im/status-go/multiaccounts/accounts" - "github.com/status-im/status-go/rpc" - "github.com/status-im/status-go/services/wallet/async" - "github.com/status-im/status-go/services/wallet/common" -) - -const ( - fetchLatestBlockNumbersInterval = 10 * time.Minute -) - -type fetchLatestBlockNumberCommand struct { - state *BlockChainState - rpcClient *rpc.Client - accountsDB *accounts.Database -} - -func (c *fetchLatestBlockNumberCommand) Command() async.Command { - return async.InfiniteCommand{ - Interval: fetchLatestBlockNumbersInterval, - Runable: c.Run, - }.Run -} - -func (c *fetchLatestBlockNumberCommand) Run(parent context.Context) (err error) { - log.Debug("start fetchLatestBlockNumberCommand") - - networks, err := c.rpcClient.NetworkManager.Get(false) - if err != nil { - return nil - } - areTestNetworksEnabled, err := c.accountsDB.GetTestNetworksEnabled() - if err != nil { - return - } - ctx := context.Background() - for _, network := range networks { - if network.IsTest != areTestNetworksEnabled { - continue - } - _, _ = c.state.fetchLatestBlockNumber(ctx, network.ChainID) - } - return nil -} - -type LatestBlockData struct { - blockNumber uint64 - timestamp time.Time - blockDuration time.Duration -} - -type BlockChainState struct { - rpcClient *rpc.Client - accountsDB *accounts.Database - blkMu sync.RWMutex - latestBlockNumbers map[uint64]LatestBlockData - group *async.Group - cancelFn context.CancelFunc - sinceFn func(time.Time) time.Duration -} - -func NewBlockChainState(rpcClient *rpc.Client, accountsDb *accounts.Database) *BlockChainState { - return &BlockChainState{ - rpcClient: rpcClient, - accountsDB: accountsDb, - blkMu: sync.RWMutex{}, - latestBlockNumbers: make(map[uint64]LatestBlockData), - sinceFn: time.Since, - } -} - -func (s *BlockChainState) Start() { - ctx, cancel := context.WithCancel(context.Background()) - s.cancelFn = cancel - s.group = async.NewGroup(ctx) - - command := &fetchLatestBlockNumberCommand{ - state: s, - accountsDB: s.accountsDB, - rpcClient: s.rpcClient, - } - s.group.Add(command.Command()) -} - -func (s *BlockChainState) Stop() { - if s.cancelFn != nil { - s.cancelFn() - s.cancelFn = nil - } - if s.group != nil { - s.group.Stop() - s.group.Wait() - s.group = nil - } -} - -func (s *BlockChainState) GetEstimatedLatestBlockNumber(ctx context.Context, chainID uint64) (uint64, error) { - blockNumber, ok := s.estimateLatestBlockNumber(chainID) - if ok { - return blockNumber, nil - } - return s.fetchLatestBlockNumber(ctx, chainID) -} - -func (s *BlockChainState) fetchLatestBlockNumber(ctx context.Context, chainID uint64) (uint64, error) { - client, err := s.rpcClient.EthClient(chainID) - if err != nil { - return 0, err - } - blockNumber, err := client.BlockNumber(ctx) - if err != nil { - return 0, err - } - blockDuration, found := common.AverageBlockDurationForChain[common.ChainID(chainID)] - if !found { - blockDuration = common.AverageBlockDurationForChain[common.ChainID(common.UnknownChainID)] - } - s.setLatestBlockDataForChain(chainID, LatestBlockData{ - blockNumber: blockNumber, - timestamp: time.Now(), - blockDuration: blockDuration, - }) - return blockNumber, nil -} - -func (s *BlockChainState) setLatestBlockDataForChain(chainID uint64, latestBlockData LatestBlockData) { - s.blkMu.Lock() - defer s.blkMu.Unlock() - s.latestBlockNumbers[chainID] = latestBlockData -} - -func (s *BlockChainState) estimateLatestBlockNumber(chainID uint64) (uint64, bool) { - s.blkMu.RLock() - defer s.blkMu.RUnlock() - blockData, ok := s.latestBlockNumbers[chainID] - if !ok { - return 0, false - } - timeDiff := s.sinceFn(blockData.timestamp) - return blockData.blockNumber + uint64((timeDiff / blockData.blockDuration)), true -} diff --git a/services/wallet/blockchainstate/blockchainstate.go b/services/wallet/blockchainstate/blockchainstate.go new file mode 100644 index 000000000..d21f0b786 --- /dev/null +++ b/services/wallet/blockchainstate/blockchainstate.go @@ -0,0 +1,63 @@ +package blockchainstate + +import ( + "context" + "sync" + "time" + + "github.com/status-im/status-go/services/wallet/common" +) + +type LatestBlockData struct { + blockNumber uint64 + timestamp time.Time + blockDuration time.Duration +} + +type BlockChainState struct { + blkMu sync.RWMutex + latestBlockNumbers map[uint64]LatestBlockData + sinceFn func(time.Time) time.Duration +} + +func NewBlockChainState() *BlockChainState { + return &BlockChainState{ + blkMu: sync.RWMutex{}, + latestBlockNumbers: make(map[uint64]LatestBlockData), + sinceFn: time.Since, + } +} + +func (s *BlockChainState) GetEstimatedLatestBlockNumber(ctx context.Context, chainID uint64) (uint64, error) { + blockNumber, _ := s.estimateLatestBlockNumber(chainID) + return blockNumber, nil +} + +func (s *BlockChainState) SetLastBlockNumber(chainID uint64, blockNumber uint64) { + blockDuration, found := common.AverageBlockDurationForChain[common.ChainID(chainID)] + if !found { + blockDuration = common.AverageBlockDurationForChain[common.ChainID(common.UnknownChainID)] + } + s.setLatestBlockDataForChain(chainID, LatestBlockData{ + blockNumber: blockNumber, + timestamp: time.Now(), + blockDuration: blockDuration, + }) +} + +func (s *BlockChainState) setLatestBlockDataForChain(chainID uint64, latestBlockData LatestBlockData) { + s.blkMu.Lock() + defer s.blkMu.Unlock() + s.latestBlockNumbers[chainID] = latestBlockData +} + +func (s *BlockChainState) estimateLatestBlockNumber(chainID uint64) (uint64, bool) { + s.blkMu.RLock() + defer s.blkMu.RUnlock() + blockData, ok := s.latestBlockNumbers[chainID] + if !ok { + return 0, false + } + timeDiff := s.sinceFn(blockData.timestamp) + return blockData.blockNumber + uint64((timeDiff / blockData.blockDuration)), true +} diff --git a/services/wallet/blockchainstate_test.go b/services/wallet/blockchainstate/blockchainstate_test.go similarity index 95% rename from services/wallet/blockchainstate_test.go rename to services/wallet/blockchainstate/blockchainstate_test.go index 30e8356a7..eebc5c8f4 100644 --- a/services/wallet/blockchainstate_test.go +++ b/services/wallet/blockchainstate/blockchainstate_test.go @@ -1,4 +1,4 @@ -package wallet +package blockchainstate import ( "testing" @@ -14,7 +14,7 @@ func mockupSince(t time.Time) time.Duration { } func setupTestState(t *testing.T) (s *BlockChainState) { - state := NewBlockChainState(nil, nil) + state := NewBlockChainState() state.sinceFn = mockupSince return state } diff --git a/services/wallet/service.go b/services/wallet/service.go index a94603129..996c3f67d 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -21,6 +21,7 @@ import ( "github.com/status-im/status-go/services/stickers" "github.com/status-im/status-go/services/wallet/activity" "github.com/status-im/status-go/services/wallet/balance" + "github.com/status-im/status-go/services/wallet/blockchainstate" "github.com/status-im/status-go/services/wallet/collectibles" "github.com/status-im/status-go/services/wallet/community" "github.com/status-im/status-go/services/wallet/currency" @@ -104,8 +105,9 @@ func NewService( tokenManager := token.NewTokenManager(db, rpcClient, communityManager, rpcClient.NetworkManager, appDB, mediaServer, feed) savedAddressesManager := &SavedAddressesManager{db: db} transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB, pendingTxManager, feed) + blockChainState := blockchainstate.NewBlockChainState() transferController := transfer.NewTransferController(db, accountsDB, rpcClient, accountFeed, feed, transactionManager, pendingTxManager, - tokenManager, balanceCacher) + tokenManager, balanceCacher, blockChainState) transferController.Start() cryptoCompare := cryptocompare.NewClient() coingecko := coingecko.NewClient() @@ -113,7 +115,6 @@ func NewService( reader := NewReader(rpcClient, tokenManager, marketManager, communityManager, accountsDB, NewPersistence(db), feed) history := history.NewService(db, accountsDB, feed, rpcClient, tokenManager, marketManager, balanceCacher.Cache()) currency := currency.NewService(db, feed, tokenManager, marketManager) - blockChainState := NewBlockChainState(rpcClient, accountsDB) openseaHTTPClient := opensea.NewHTTPClient() openseaV2Client := opensea.NewClientV2(config.WalletConfig.OpenseaAPIKey, openseaHTTPClient) @@ -211,7 +212,7 @@ type Service struct { currency *currency.Service activity *activity.Service decoder *Decoder - blockChainState *BlockChainState + blockChainState *blockchainstate.BlockChainState keycardPairings *KeycardPairings walletConnect *walletconnect.Service } @@ -223,7 +224,6 @@ func (s *Service) Start() error { err := s.signals.Start() s.history.Start() s.collectibles.Start() - s.blockChainState.Start() s.started = true return err } @@ -243,7 +243,6 @@ func (s *Service) Stop() error { s.history.Stop() s.activity.Stop() s.collectibles.Stop() - s.blockChainState.Stop() s.started = false log.Info("wallet stopped") return nil diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index fed81e588..ba8225846 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -19,6 +19,7 @@ import ( "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/balance" + "github.com/status-im/status-go/services/wallet/blockchainstate" "github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/transactions" @@ -60,8 +61,9 @@ func (ec *errorCounter) Error() error { type findNewBlocksCommand struct { *findBlocksCommand - contractMaker *contracts.ContractMaker - iteration int + contractMaker *contracts.ContractMaker + iteration int + blockChainState *blockchainstate.BlockChainState } func (c *findNewBlocksCommand) Command() async.Command { @@ -184,6 +186,8 @@ func (c *findNewBlocksCommand) Run(parent context.Context) error { return err } + c.blockChainState.SetLastBlockNumber(c.chainClient.NetworkID(), headNum.Uint64()) + if len(accountsWithDetectedChanges) != 0 { c.findAndSaveEthBlocks(parent, c.fromBlockNumber, headNum, accountsToCheck) } else if c.iteration%nonceCheckIntervalIterations == 0 && len(accountsWithOutsideTransfers) > 0 { @@ -856,7 +860,8 @@ func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context) func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, accountsDB *accounts.Database, blockDAO *BlockDAO, blockRangesSeqDAO BlockRangeDAOer, chainClient chain.ClientInterface, feed *event.Feed, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, - tokenManager *token.Manager, balanceCacher balance.Cacher, omitHistory bool) *loadBlocksAndTransfersCommand { + tokenManager *token.Manager, balanceCacher balance.Cacher, omitHistory bool, + blockChainState *blockchainstate.BlockChainState) *loadBlocksAndTransfersCommand { return &loadBlocksAndTransfersCommand{ accounts: accounts, @@ -874,6 +879,7 @@ func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, a omitHistory: omitHistory, errorCounter: *newErrorCounter("loadBlocksAndTransfersCommand"), contractMaker: tokenManager.ContractMaker, + blockChainState: blockChainState, } } @@ -893,6 +899,7 @@ type loadBlocksAndTransfersCommand struct { blocksLoadedCh chan []*DBHeader omitHistory bool contractMaker *contracts.ContractMaker + blockChainState *blockchainstate.BlockChainState // Not to be set by the caller transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime @@ -1108,7 +1115,8 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Conte blocksLoadedCh: blocksLoadedCh, defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, }, - contractMaker: c.contractMaker, + contractMaker: c.contractMaker, + blockChainState: c.blockChainState, } group := async.NewGroup(ctx) group.Add(newBlocksCmd.Command()) diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index c6010565f..0f2afc806 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -33,6 +33,7 @@ import ( "github.com/status-im/status-go/server" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/balance" + "github.com/status-im/status-go/services/wallet/blockchainstate" "github.com/status-im/status-go/services/wallet/community" "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/t/utils" @@ -1431,7 +1432,8 @@ func TestFetchNewBlocksCommand(t *testing.T) { blocksLoadedCh: blockChannel, defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize, }, - contractMaker: tokenManager.ContractMaker, + contractMaker: tokenManager.ContractMaker, + blockChainState: blockchainstate.NewBlockChainState(), } ctx := context.Background() diff --git a/services/wallet/transfer/controller.go b/services/wallet/transfer/controller.go index 082e336e6..90d805664 100644 --- a/services/wallet/transfer/controller.go +++ b/services/wallet/transfer/controller.go @@ -17,6 +17,7 @@ import ( "github.com/status-im/status-go/rpc" "github.com/status-im/status-go/services/accounts/accountsevent" "github.com/status-im/status-go/services/wallet/balance" + "github.com/status-im/status-go/services/wallet/blockchainstate" "github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/transactions" ) @@ -35,11 +36,12 @@ type Controller struct { pendingTxManager *transactions.PendingTxTracker tokenManager *token.Manager balanceCacher balance.Cacher + blockChainState *blockchainstate.BlockChainState } func NewTransferController(db *sql.DB, accountsDB *statusaccounts.Database, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, - balanceCacher balance.Cacher) *Controller { + balanceCacher balance.Cacher, blockChainState *blockchainstate.BlockChainState) *Controller { blockDAO := &BlockDAO{db} return &Controller{ @@ -54,6 +56,7 @@ func NewTransferController(db *sql.DB, accountsDB *statusaccounts.Database, rpcC pendingTxManager: pendingTxManager, tokenManager: tokenManager, balanceCacher: balanceCacher, + blockChainState: blockChainState, } } @@ -111,7 +114,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add } c.reactor = NewReactor(c.db, c.blockDAO, c.blockRangesSeqDAO, c.accountsDB, c.TransferFeed, c.transactionManager, - c.pendingTxManager, c.tokenManager, c.balanceCacher, omitHistory) + c.pendingTxManager, c.tokenManager, c.balanceCacher, omitHistory, c.blockChainState) err = c.reactor.start(chainClients, accounts) if err != nil { diff --git a/services/wallet/transfer/controller_test.go b/services/wallet/transfer/controller_test.go index 296e13505..b1ca7d167 100644 --- a/services/wallet/transfer/controller_test.go +++ b/services/wallet/transfer/controller_test.go @@ -13,6 +13,7 @@ import ( "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/services/accounts/accountsevent" + "github.com/status-im/status-go/services/wallet/blockchainstate" "github.com/status-im/status-go/t/helpers" "github.com/status-im/status-go/walletdatabase" ) @@ -29,6 +30,7 @@ func TestController_watchAccountsChanges(t *testing.T) { accountFeed := &event.Feed{} + bcstate := blockchainstate.NewBlockChainState() c := NewTransferController( walletDB, accountsDB, @@ -39,6 +41,7 @@ func TestController_watchAccountsChanges(t *testing.T) { nil, // pendingTxManager nil, // tokenManager nil, // balanceCacher + bcstate, ) address := common.HexToAddress("0x1234") @@ -139,6 +142,7 @@ func TestController_cleanupAccountLeftovers(t *testing.T) { require.NoError(t, err) require.Len(t, storedAccs, 1) + bcstate := blockchainstate.NewBlockChainState() c := NewTransferController( walletDB, accountsDB, @@ -149,6 +153,7 @@ func TestController_cleanupAccountLeftovers(t *testing.T) { nil, // pendingTxManager nil, // tokenManager nil, // balanceCacher + bcstate, ) chainID := uint64(777) // Insert blocks diff --git a/services/wallet/transfer/reactor.go b/services/wallet/transfer/reactor.go index f93e4d1e6..e9405dcf6 100644 --- a/services/wallet/transfer/reactor.go +++ b/services/wallet/transfer/reactor.go @@ -11,6 +11,7 @@ import ( "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/services/wallet/balance" + "github.com/status-im/status-go/services/wallet/blockchainstate" "github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/transactions" ) @@ -58,11 +59,12 @@ type Reactor struct { strategy HistoryFetcher balanceCacher balance.Cacher omitHistory bool + blockChainState *blockchainstate.BlockChainState } func NewReactor(db *Database, blockDAO *BlockDAO, blockRangesSeqDAO *BlockRangeSequentialDAO, accountsDB *accounts.Database, feed *event.Feed, tm *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, - balanceCacher balance.Cacher, omitHistory bool) *Reactor { + balanceCacher balance.Cacher, omitHistory bool, blockChainState *blockchainstate.BlockChainState) *Reactor { return &Reactor{ db: db, accountsDB: accountsDB, @@ -74,6 +76,7 @@ func NewReactor(db *Database, blockDAO *BlockDAO, blockRangesSeqDAO *BlockRangeS tokenManager: tokenManager, balanceCacher: balanceCacher, omitHistory: omitHistory, + blockChainState: blockChainState, } } @@ -113,6 +116,7 @@ func (r *Reactor) createFetchStrategy(chainClients map[uint64]chain.ClientInterf accounts, r.balanceCacher, r.omitHistory, + r.blockChainState, ) } diff --git a/services/wallet/transfer/sequential_fetch_strategy.go b/services/wallet/transfer/sequential_fetch_strategy.go index 1d5d895d7..7e99eca51 100644 --- a/services/wallet/transfer/sequential_fetch_strategy.go +++ b/services/wallet/transfer/sequential_fetch_strategy.go @@ -12,6 +12,7 @@ import ( "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/balance" + "github.com/status-im/status-go/services/wallet/blockchainstate" "github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/transactions" @@ -24,6 +25,7 @@ func NewSequentialFetchStrategy(db *Database, blockDAO *BlockDAO, blockRangesSeq accounts []common.Address, balanceCacher balance.Cacher, omitHistory bool, + blockChainState *blockchainstate.BlockChainState, ) *SequentialFetchStrategy { return &SequentialFetchStrategy{ @@ -39,6 +41,7 @@ func NewSequentialFetchStrategy(db *Database, blockDAO *BlockDAO, blockRangesSeq accounts: accounts, balanceCacher: balanceCacher, omitHistory: omitHistory, + blockChainState: blockChainState, } } @@ -57,13 +60,14 @@ type SequentialFetchStrategy struct { accounts []common.Address balanceCacher balance.Cacher omitHistory bool + blockChainState *blockchainstate.BlockChainState } func (s *SequentialFetchStrategy) newCommand(chainClient chain.ClientInterface, accounts []common.Address) async.Commander { return newLoadBlocksAndTransfersCommand(accounts, s.db, s.accountsDB, s.blockDAO, s.blockRangesSeqDAO, chainClient, s.feed, - s.transactionManager, s.pendingTxManager, s.tokenManager, s.balanceCacher, s.omitHistory) + s.transactionManager, s.pendingTxManager, s.tokenManager, s.balanceCacher, s.omitHistory, s.blockChainState) } func (s *SequentialFetchStrategy) start() error {