[#4603] Get rid of fetchLatestBlockNumberCommand (#4614)

This commit is contained in:
Roman Volosovskyi 2024-01-25 13:05:59 +01:00 committed by GitHub
parent 73afe62782
commit 9b7eec0edb
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 104 additions and 165 deletions

View File

@ -1,149 +0,0 @@
package wallet
import (
"context"
"sync"
"time"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/common"
)
const (
fetchLatestBlockNumbersInterval = 10 * time.Minute
)
type fetchLatestBlockNumberCommand struct {
state *BlockChainState
rpcClient *rpc.Client
accountsDB *accounts.Database
}
func (c *fetchLatestBlockNumberCommand) Command() async.Command {
return async.InfiniteCommand{
Interval: fetchLatestBlockNumbersInterval,
Runable: c.Run,
}.Run
}
func (c *fetchLatestBlockNumberCommand) Run(parent context.Context) (err error) {
log.Debug("start fetchLatestBlockNumberCommand")
networks, err := c.rpcClient.NetworkManager.Get(false)
if err != nil {
return nil
}
areTestNetworksEnabled, err := c.accountsDB.GetTestNetworksEnabled()
if err != nil {
return
}
ctx := context.Background()
for _, network := range networks {
if network.IsTest != areTestNetworksEnabled {
continue
}
_, _ = c.state.fetchLatestBlockNumber(ctx, network.ChainID)
}
return nil
}
type LatestBlockData struct {
blockNumber uint64
timestamp time.Time
blockDuration time.Duration
}
type BlockChainState struct {
rpcClient *rpc.Client
accountsDB *accounts.Database
blkMu sync.RWMutex
latestBlockNumbers map[uint64]LatestBlockData
group *async.Group
cancelFn context.CancelFunc
sinceFn func(time.Time) time.Duration
}
func NewBlockChainState(rpcClient *rpc.Client, accountsDb *accounts.Database) *BlockChainState {
return &BlockChainState{
rpcClient: rpcClient,
accountsDB: accountsDb,
blkMu: sync.RWMutex{},
latestBlockNumbers: make(map[uint64]LatestBlockData),
sinceFn: time.Since,
}
}
func (s *BlockChainState) Start() {
ctx, cancel := context.WithCancel(context.Background())
s.cancelFn = cancel
s.group = async.NewGroup(ctx)
command := &fetchLatestBlockNumberCommand{
state: s,
accountsDB: s.accountsDB,
rpcClient: s.rpcClient,
}
s.group.Add(command.Command())
}
func (s *BlockChainState) Stop() {
if s.cancelFn != nil {
s.cancelFn()
s.cancelFn = nil
}
if s.group != nil {
s.group.Stop()
s.group.Wait()
s.group = nil
}
}
func (s *BlockChainState) GetEstimatedLatestBlockNumber(ctx context.Context, chainID uint64) (uint64, error) {
blockNumber, ok := s.estimateLatestBlockNumber(chainID)
if ok {
return blockNumber, nil
}
return s.fetchLatestBlockNumber(ctx, chainID)
}
func (s *BlockChainState) fetchLatestBlockNumber(ctx context.Context, chainID uint64) (uint64, error) {
client, err := s.rpcClient.EthClient(chainID)
if err != nil {
return 0, err
}
blockNumber, err := client.BlockNumber(ctx)
if err != nil {
return 0, err
}
blockDuration, found := common.AverageBlockDurationForChain[common.ChainID(chainID)]
if !found {
blockDuration = common.AverageBlockDurationForChain[common.ChainID(common.UnknownChainID)]
}
s.setLatestBlockDataForChain(chainID, LatestBlockData{
blockNumber: blockNumber,
timestamp: time.Now(),
blockDuration: blockDuration,
})
return blockNumber, nil
}
func (s *BlockChainState) setLatestBlockDataForChain(chainID uint64, latestBlockData LatestBlockData) {
s.blkMu.Lock()
defer s.blkMu.Unlock()
s.latestBlockNumbers[chainID] = latestBlockData
}
func (s *BlockChainState) estimateLatestBlockNumber(chainID uint64) (uint64, bool) {
s.blkMu.RLock()
defer s.blkMu.RUnlock()
blockData, ok := s.latestBlockNumbers[chainID]
if !ok {
return 0, false
}
timeDiff := s.sinceFn(blockData.timestamp)
return blockData.blockNumber + uint64((timeDiff / blockData.blockDuration)), true
}

View File

@ -0,0 +1,63 @@
package blockchainstate
import (
"context"
"sync"
"time"
"github.com/status-im/status-go/services/wallet/common"
)
type LatestBlockData struct {
blockNumber uint64
timestamp time.Time
blockDuration time.Duration
}
type BlockChainState struct {
blkMu sync.RWMutex
latestBlockNumbers map[uint64]LatestBlockData
sinceFn func(time.Time) time.Duration
}
func NewBlockChainState() *BlockChainState {
return &BlockChainState{
blkMu: sync.RWMutex{},
latestBlockNumbers: make(map[uint64]LatestBlockData),
sinceFn: time.Since,
}
}
func (s *BlockChainState) GetEstimatedLatestBlockNumber(ctx context.Context, chainID uint64) (uint64, error) {
blockNumber, _ := s.estimateLatestBlockNumber(chainID)
return blockNumber, nil
}
func (s *BlockChainState) SetLastBlockNumber(chainID uint64, blockNumber uint64) {
blockDuration, found := common.AverageBlockDurationForChain[common.ChainID(chainID)]
if !found {
blockDuration = common.AverageBlockDurationForChain[common.ChainID(common.UnknownChainID)]
}
s.setLatestBlockDataForChain(chainID, LatestBlockData{
blockNumber: blockNumber,
timestamp: time.Now(),
blockDuration: blockDuration,
})
}
func (s *BlockChainState) setLatestBlockDataForChain(chainID uint64, latestBlockData LatestBlockData) {
s.blkMu.Lock()
defer s.blkMu.Unlock()
s.latestBlockNumbers[chainID] = latestBlockData
}
func (s *BlockChainState) estimateLatestBlockNumber(chainID uint64) (uint64, bool) {
s.blkMu.RLock()
defer s.blkMu.RUnlock()
blockData, ok := s.latestBlockNumbers[chainID]
if !ok {
return 0, false
}
timeDiff := s.sinceFn(blockData.timestamp)
return blockData.blockNumber + uint64((timeDiff / blockData.blockDuration)), true
}

View File

@ -1,4 +1,4 @@
package wallet
package blockchainstate
import (
"testing"
@ -14,7 +14,7 @@ func mockupSince(t time.Time) time.Duration {
}
func setupTestState(t *testing.T) (s *BlockChainState) {
state := NewBlockChainState(nil, nil)
state := NewBlockChainState()
state.sinceFn = mockupSince
return state
}

View File

@ -21,6 +21,7 @@ import (
"github.com/status-im/status-go/services/stickers"
"github.com/status-im/status-go/services/wallet/activity"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/blockchainstate"
"github.com/status-im/status-go/services/wallet/collectibles"
"github.com/status-im/status-go/services/wallet/community"
"github.com/status-im/status-go/services/wallet/currency"
@ -104,8 +105,9 @@ func NewService(
tokenManager := token.NewTokenManager(db, rpcClient, communityManager, rpcClient.NetworkManager, appDB, mediaServer, feed)
savedAddressesManager := &SavedAddressesManager{db: db}
transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB, pendingTxManager, feed)
blockChainState := blockchainstate.NewBlockChainState()
transferController := transfer.NewTransferController(db, accountsDB, rpcClient, accountFeed, feed, transactionManager, pendingTxManager,
tokenManager, balanceCacher)
tokenManager, balanceCacher, blockChainState)
transferController.Start()
cryptoCompare := cryptocompare.NewClient()
coingecko := coingecko.NewClient()
@ -113,7 +115,6 @@ func NewService(
reader := NewReader(rpcClient, tokenManager, marketManager, communityManager, accountsDB, NewPersistence(db), feed)
history := history.NewService(db, accountsDB, feed, rpcClient, tokenManager, marketManager, balanceCacher.Cache())
currency := currency.NewService(db, feed, tokenManager, marketManager)
blockChainState := NewBlockChainState(rpcClient, accountsDB)
openseaHTTPClient := opensea.NewHTTPClient()
openseaV2Client := opensea.NewClientV2(config.WalletConfig.OpenseaAPIKey, openseaHTTPClient)
@ -211,7 +212,7 @@ type Service struct {
currency *currency.Service
activity *activity.Service
decoder *Decoder
blockChainState *BlockChainState
blockChainState *blockchainstate.BlockChainState
keycardPairings *KeycardPairings
walletConnect *walletconnect.Service
}
@ -223,7 +224,6 @@ func (s *Service) Start() error {
err := s.signals.Start()
s.history.Start()
s.collectibles.Start()
s.blockChainState.Start()
s.started = true
return err
}
@ -243,7 +243,6 @@ func (s *Service) Stop() error {
s.history.Stop()
s.activity.Stop()
s.collectibles.Stop()
s.blockChainState.Stop()
s.started = false
log.Info("wallet stopped")
return nil

View File

@ -19,6 +19,7 @@ import (
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/blockchainstate"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions"
@ -62,6 +63,7 @@ type findNewBlocksCommand struct {
*findBlocksCommand
contractMaker *contracts.ContractMaker
iteration int
blockChainState *blockchainstate.BlockChainState
}
func (c *findNewBlocksCommand) Command() async.Command {
@ -184,6 +186,8 @@ func (c *findNewBlocksCommand) Run(parent context.Context) error {
return err
}
c.blockChainState.SetLastBlockNumber(c.chainClient.NetworkID(), headNum.Uint64())
if len(accountsWithDetectedChanges) != 0 {
c.findAndSaveEthBlocks(parent, c.fromBlockNumber, headNum, accountsToCheck)
} else if c.iteration%nonceCheckIntervalIterations == 0 && len(accountsWithOutsideTransfers) > 0 {
@ -856,7 +860,8 @@ func (c *loadBlocksAndTransfersCommand) startTransfersLoop(ctx context.Context)
func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, accountsDB *accounts.Database,
blockDAO *BlockDAO, blockRangesSeqDAO BlockRangeDAOer, chainClient chain.ClientInterface, feed *event.Feed,
transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker,
tokenManager *token.Manager, balanceCacher balance.Cacher, omitHistory bool) *loadBlocksAndTransfersCommand {
tokenManager *token.Manager, balanceCacher balance.Cacher, omitHistory bool,
blockChainState *blockchainstate.BlockChainState) *loadBlocksAndTransfersCommand {
return &loadBlocksAndTransfersCommand{
accounts: accounts,
@ -874,6 +879,7 @@ func newLoadBlocksAndTransfersCommand(accounts []common.Address, db *Database, a
omitHistory: omitHistory,
errorCounter: *newErrorCounter("loadBlocksAndTransfersCommand"),
contractMaker: tokenManager.ContractMaker,
blockChainState: blockChainState,
}
}
@ -893,6 +899,7 @@ type loadBlocksAndTransfersCommand struct {
blocksLoadedCh chan []*DBHeader
omitHistory bool
contractMaker *contracts.ContractMaker
blockChainState *blockchainstate.BlockChainState
// Not to be set by the caller
transfersLoaded map[common.Address]bool // For event RecentHistoryReady to be sent only once per account during app lifetime
@ -1109,6 +1116,7 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(ctx context.Conte
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,
},
contractMaker: c.contractMaker,
blockChainState: c.blockChainState,
}
group := async.NewGroup(ctx)
group.Add(newBlocksCmd.Command())

View File

@ -33,6 +33,7 @@ import (
"github.com/status-im/status-go/server"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/blockchainstate"
"github.com/status-im/status-go/services/wallet/community"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/t/utils"
@ -1432,6 +1433,7 @@ func TestFetchNewBlocksCommand(t *testing.T) {
defaultNodeBlockChunkSize: DefaultNodeBlockChunkSize,
},
contractMaker: tokenManager.ContractMaker,
blockChainState: blockchainstate.NewBlockChainState(),
}
ctx := context.Background()

View File

@ -17,6 +17,7 @@ import (
"github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/services/accounts/accountsevent"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/blockchainstate"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/transactions"
)
@ -35,11 +36,12 @@ type Controller struct {
pendingTxManager *transactions.PendingTxTracker
tokenManager *token.Manager
balanceCacher balance.Cacher
blockChainState *blockchainstate.BlockChainState
}
func NewTransferController(db *sql.DB, accountsDB *statusaccounts.Database, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed,
transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager,
balanceCacher balance.Cacher) *Controller {
balanceCacher balance.Cacher, blockChainState *blockchainstate.BlockChainState) *Controller {
blockDAO := &BlockDAO{db}
return &Controller{
@ -54,6 +56,7 @@ func NewTransferController(db *sql.DB, accountsDB *statusaccounts.Database, rpcC
pendingTxManager: pendingTxManager,
tokenManager: tokenManager,
balanceCacher: balanceCacher,
blockChainState: blockChainState,
}
}
@ -111,7 +114,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add
}
c.reactor = NewReactor(c.db, c.blockDAO, c.blockRangesSeqDAO, c.accountsDB, c.TransferFeed, c.transactionManager,
c.pendingTxManager, c.tokenManager, c.balanceCacher, omitHistory)
c.pendingTxManager, c.tokenManager, c.balanceCacher, omitHistory, c.blockChainState)
err = c.reactor.start(chainClients, accounts)
if err != nil {

View File

@ -13,6 +13,7 @@ import (
"github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/services/accounts/accountsevent"
"github.com/status-im/status-go/services/wallet/blockchainstate"
"github.com/status-im/status-go/t/helpers"
"github.com/status-im/status-go/walletdatabase"
)
@ -29,6 +30,7 @@ func TestController_watchAccountsChanges(t *testing.T) {
accountFeed := &event.Feed{}
bcstate := blockchainstate.NewBlockChainState()
c := NewTransferController(
walletDB,
accountsDB,
@ -39,6 +41,7 @@ func TestController_watchAccountsChanges(t *testing.T) {
nil, // pendingTxManager
nil, // tokenManager
nil, // balanceCacher
bcstate,
)
address := common.HexToAddress("0x1234")
@ -139,6 +142,7 @@ func TestController_cleanupAccountLeftovers(t *testing.T) {
require.NoError(t, err)
require.Len(t, storedAccs, 1)
bcstate := blockchainstate.NewBlockChainState()
c := NewTransferController(
walletDB,
accountsDB,
@ -149,6 +153,7 @@ func TestController_cleanupAccountLeftovers(t *testing.T) {
nil, // pendingTxManager
nil, // tokenManager
nil, // balanceCacher
bcstate,
)
chainID := uint64(777)
// Insert blocks

View File

@ -11,6 +11,7 @@ import (
"github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/blockchainstate"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/transactions"
)
@ -58,11 +59,12 @@ type Reactor struct {
strategy HistoryFetcher
balanceCacher balance.Cacher
omitHistory bool
blockChainState *blockchainstate.BlockChainState
}
func NewReactor(db *Database, blockDAO *BlockDAO, blockRangesSeqDAO *BlockRangeSequentialDAO, accountsDB *accounts.Database, feed *event.Feed, tm *TransactionManager,
pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager,
balanceCacher balance.Cacher, omitHistory bool) *Reactor {
balanceCacher balance.Cacher, omitHistory bool, blockChainState *blockchainstate.BlockChainState) *Reactor {
return &Reactor{
db: db,
accountsDB: accountsDB,
@ -74,6 +76,7 @@ func NewReactor(db *Database, blockDAO *BlockDAO, blockRangesSeqDAO *BlockRangeS
tokenManager: tokenManager,
balanceCacher: balanceCacher,
omitHistory: omitHistory,
blockChainState: blockChainState,
}
}
@ -113,6 +116,7 @@ func (r *Reactor) createFetchStrategy(chainClients map[uint64]chain.ClientInterf
accounts,
r.balanceCacher,
r.omitHistory,
r.blockChainState,
)
}

View File

@ -12,6 +12,7 @@ import (
"github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/blockchainstate"
"github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions"
@ -24,6 +25,7 @@ func NewSequentialFetchStrategy(db *Database, blockDAO *BlockDAO, blockRangesSeq
accounts []common.Address,
balanceCacher balance.Cacher,
omitHistory bool,
blockChainState *blockchainstate.BlockChainState,
) *SequentialFetchStrategy {
return &SequentialFetchStrategy{
@ -39,6 +41,7 @@ func NewSequentialFetchStrategy(db *Database, blockDAO *BlockDAO, blockRangesSeq
accounts: accounts,
balanceCacher: balanceCacher,
omitHistory: omitHistory,
blockChainState: blockChainState,
}
}
@ -57,13 +60,14 @@ type SequentialFetchStrategy struct {
accounts []common.Address
balanceCacher balance.Cacher
omitHistory bool
blockChainState *blockchainstate.BlockChainState
}
func (s *SequentialFetchStrategy) newCommand(chainClient chain.ClientInterface,
accounts []common.Address) async.Commander {
return newLoadBlocksAndTransfersCommand(accounts, s.db, s.accountsDB, s.blockDAO, s.blockRangesSeqDAO, chainClient, s.feed,
s.transactionManager, s.pendingTxManager, s.tokenManager, s.balanceCacher, s.omitHistory)
s.transactionManager, s.pendingTxManager, s.tokenManager, s.balanceCacher, s.omitHistory, s.blockChainState)
}
func (s *SequentialFetchStrategy) start() error {