feat(wallet): move balance cache to a common place and make it a

parameter to share between transfers and balance history in an upcoming
commit.
Had to refactor its interface for that reason.
This commit is contained in:
Ivan Belyakov 2023-09-04 07:34:09 +02:00 committed by IvanBelyakoff
parent f73f3e9f82
commit 24bf9aada5
9 changed files with 134 additions and 96 deletions

View File

@ -1,4 +1,4 @@
package transfer package balance
import ( import (
"context" "context"
@ -7,6 +7,8 @@ import (
"sync" "sync"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/status-im/status-go/rpc/chain"
) )
type nonceRange struct { type nonceRange struct {
@ -15,13 +17,32 @@ type nonceRange struct {
min *big.Int min *big.Int
} }
type BalanceCache interface { // Reader interface for reading balance at a specified address.
BalanceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*big.Int, error) type Reader interface {
NonceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*int64, error) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
FullTransactionByBlockNumberAndIndex(ctx context.Context, blockNumber *big.Int, index uint) (*chain.FullTransaction, error)
}
// Cacher interface for caching balance to BalanceCache. Requires BalanceReader to fetch balance.
type Cacher interface {
BalanceAt(ctx context.Context, client Reader, account common.Address, blockNumber *big.Int) (*big.Int, error)
NonceAt(ctx context.Context, client Reader, account common.Address, blockNumber *big.Int) (*int64, error)
Clear()
Cache() CacheIface
}
// Interface for cache of balances.
type CacheIface interface {
GetBalance(account common.Address, blockNumber *big.Int) *big.Int
GetNonce(account common.Address, blockNumber *big.Int) *int64
AddBalance(account common.Address, blockNumber *big.Int, balance *big.Int)
AddNonce(account common.Address, blockNumber *big.Int, nonce *int64)
Clear() Clear()
} }
type balanceCache struct { type Cache struct {
// balances maps an address to a map of a block number and the balance of this particular address // balances maps an address to a map of a block number and the balance of this particular address
balances map[common.Address]map[uint64]*big.Int // we don't care about block number overflow as we use cache only for comparing balances when fetching, not for UI balances map[common.Address]map[uint64]*big.Int // we don't care about block number overflow as we use cache only for comparing balances when fetching, not for UI
nonces map[common.Address]map[uint64]*int64 // we don't care about block number overflow as we use cache only for comparing balances when fetching, not for UI nonces map[common.Address]map[uint64]*int64 // we don't care about block number overflow as we use cache only for comparing balances when fetching, not for UI
@ -30,8 +51,8 @@ type balanceCache struct {
rw sync.RWMutex rw sync.RWMutex
} }
func newBalanceCache() *balanceCache { func NewCache() *Cache {
return &balanceCache{ return &Cache{
balances: make(map[common.Address]map[uint64]*big.Int), balances: make(map[common.Address]map[uint64]*big.Int),
nonces: make(map[common.Address]map[uint64]*int64), nonces: make(map[common.Address]map[uint64]*int64),
nonceRanges: make(map[common.Address]map[int64]nonceRange), nonceRanges: make(map[common.Address]map[int64]nonceRange),
@ -39,7 +60,10 @@ func newBalanceCache() *balanceCache {
} }
} }
func (b *balanceCache) Clear() { func (b *Cache) Clear() {
b.rw.Lock()
defer b.rw.Unlock()
for address, cache := range b.balances { for address, cache := range b.balances {
if len(cache) == 0 { if len(cache) == 0 {
continue continue
@ -84,14 +108,14 @@ func (b *balanceCache) Clear() {
b.sortedRanges = make(map[common.Address][]nonceRange) b.sortedRanges = make(map[common.Address][]nonceRange)
} }
func (b *balanceCache) ReadCachedBalance(account common.Address, blockNumber *big.Int) *big.Int { func (b *Cache) GetBalance(account common.Address, blockNumber *big.Int) *big.Int {
b.rw.RLock() b.rw.RLock()
defer b.rw.RUnlock() defer b.rw.RUnlock()
return b.balances[account][blockNumber.Uint64()] return b.balances[account][blockNumber.Uint64()]
} }
func (b *balanceCache) addBalanceToCache(account common.Address, blockNumber *big.Int, balance *big.Int) { func (b *Cache) AddBalance(account common.Address, blockNumber *big.Int, balance *big.Int) {
b.rw.Lock() b.rw.Lock()
defer b.rw.Unlock() defer b.rw.Unlock()
@ -102,8 +126,8 @@ func (b *balanceCache) addBalanceToCache(account common.Address, blockNumber *bi
b.balances[account][blockNumber.Uint64()] = balance b.balances[account][blockNumber.Uint64()] = balance
} }
func (b *balanceCache) BalanceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*big.Int, error) { func (b *Cache) BalanceAt(ctx context.Context, client Reader, account common.Address, blockNumber *big.Int) (*big.Int, error) {
cachedBalance := b.ReadCachedBalance(account, blockNumber) cachedBalance := b.GetBalance(account, blockNumber)
if cachedBalance != nil { if cachedBalance != nil {
return cachedBalance, nil return cachedBalance, nil
} }
@ -111,19 +135,23 @@ func (b *balanceCache) BalanceAt(ctx context.Context, client BalanceReader, acco
if err != nil { if err != nil {
return nil, err return nil, err
} }
b.addBalanceToCache(account, blockNumber, balance) b.AddBalance(account, blockNumber, balance)
return balance, nil return balance, nil
} }
func (b *balanceCache) ReadCachedNonce(account common.Address, blockNumber *big.Int) *int64 { func (b *Cache) GetNonce(account common.Address, blockNumber *big.Int) *int64 {
b.rw.RLock() b.rw.RLock()
defer b.rw.RUnlock() defer b.rw.RUnlock()
return b.nonces[account][blockNumber.Uint64()] return b.nonces[account][blockNumber.Uint64()]
} }
func (b *balanceCache) sortRanges(account common.Address) { func (b *Cache) Cache() CacheIface {
return b
}
func (b *Cache) sortRanges(account common.Address) {
keys := make([]int, 0, len(b.nonceRanges[account])) keys := make([]int, 0, len(b.nonceRanges[account]))
for k := range b.nonceRanges[account] { for k := range b.nonceRanges[account] {
keys = append(keys, int(k)) keys = append(keys, int(k))
@ -140,7 +168,7 @@ func (b *balanceCache) sortRanges(account common.Address) {
b.sortedRanges[account] = ranges b.sortedRanges[account] = ranges
} }
func (b *balanceCache) findNonceInRange(account common.Address, block *big.Int) *int64 { func (b *Cache) findNonceInRange(account common.Address, block *big.Int) *int64 {
b.rw.RLock() b.rw.RLock()
defer b.rw.RUnlock() defer b.rw.RUnlock()
@ -162,7 +190,7 @@ func (b *balanceCache) findNonceInRange(account common.Address, block *big.Int)
return nil return nil
} }
func (b *balanceCache) updateNonceRange(account common.Address, blockNumber *big.Int, nonce *int64) { func (b *Cache) updateNonceRange(account common.Address, blockNumber *big.Int, nonce *int64) {
_, exists := b.nonceRanges[account] _, exists := b.nonceRanges[account]
if !exists { if !exists {
b.nonceRanges[account] = make(map[int64]nonceRange) b.nonceRanges[account] = make(map[int64]nonceRange)
@ -189,7 +217,7 @@ func (b *balanceCache) updateNonceRange(account common.Address, blockNumber *big
} }
} }
func (b *balanceCache) addNonceToCache(account common.Address, blockNumber *big.Int, nonce *int64) { func (b *Cache) AddNonce(account common.Address, blockNumber *big.Int, nonce *int64) {
b.rw.Lock() b.rw.Lock()
defer b.rw.Unlock() defer b.rw.Unlock()
@ -201,8 +229,8 @@ func (b *balanceCache) addNonceToCache(account common.Address, blockNumber *big.
b.updateNonceRange(account, blockNumber, nonce) b.updateNonceRange(account, blockNumber, nonce)
} }
func (b *balanceCache) NonceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*int64, error) { func (b *Cache) NonceAt(ctx context.Context, client Reader, account common.Address, blockNumber *big.Int) (*int64, error) {
cachedNonce := b.ReadCachedNonce(account, blockNumber) cachedNonce := b.GetNonce(account, blockNumber)
if cachedNonce != nil { if cachedNonce != nil {
return cachedNonce, nil return cachedNonce, nil
} }
@ -216,7 +244,7 @@ func (b *balanceCache) NonceAt(ctx context.Context, client BalanceReader, accoun
return nil, err return nil, err
} }
int64Nonce := int64(nonce) int64Nonce := int64(nonce)
b.addNonceToCache(account, blockNumber, &int64Nonce) b.AddNonce(account, blockNumber, &int64Nonce)
return &int64Nonce, nil return &int64Nonce, nil
} }

View File

@ -19,6 +19,7 @@ import (
"github.com/status-im/status-go/services/ens" "github.com/status-im/status-go/services/ens"
"github.com/status-im/status-go/services/stickers" "github.com/status-im/status-go/services/stickers"
"github.com/status-im/status-go/services/wallet/activity" "github.com/status-im/status-go/services/wallet/activity"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/collectibles" "github.com/status-im/status-go/services/wallet/collectibles"
"github.com/status-im/status-go/services/wallet/currency" "github.com/status-im/status-go/services/wallet/currency"
"github.com/status-im/status-go/services/wallet/history" "github.com/status-im/status-go/services/wallet/history"
@ -91,11 +92,14 @@ func NewService(
ChainID: chainID, ChainID: chainID,
}) })
}) })
balanceCache := balance.NewCache()
tokenManager := token.NewTokenManager(db, rpcClient, rpcClient.NetworkManager) tokenManager := token.NewTokenManager(db, rpcClient, rpcClient.NetworkManager)
savedAddressesManager := &SavedAddressesManager{db: db} savedAddressesManager := &SavedAddressesManager{db: db}
transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB, pendingTxManager, feed) transactionManager := transfer.NewTransactionManager(db, gethManager, transactor, config, accountsDB, pendingTxManager, feed)
transferController := transfer.NewTransferController(db, rpcClient, accountFeed, feed, transactionManager, pendingTxManager, transferController := transfer.NewTransferController(db, rpcClient, accountFeed, feed, transactionManager, pendingTxManager,
tokenManager, config.WalletConfig.LoadAllTransfers) tokenManager, balanceCache, config.WalletConfig.LoadAllTransfers)
cryptoCompare := cryptocompare.NewClient() cryptoCompare := cryptocompare.NewClient()
coingecko := coingecko.NewClient() coingecko := coingecko.NewClient()
marketManager := market.NewManager(cryptoCompare, coingecko, feed) marketManager := market.NewManager(cryptoCompare, coingecko, feed)

View File

@ -14,6 +14,7 @@ import (
"github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
w_common "github.com/status-im/status-go/services/wallet/common" w_common "github.com/status-im/status-go/services/wallet/common"
"github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/services/wallet/walletevent"
@ -53,13 +54,13 @@ var (
) )
type ethHistoricalCommand struct { type ethHistoricalCommand struct {
address common.Address address common.Address
chainClient *chain.ClientWithFallback chainClient *chain.ClientWithFallback
balanceCache *balanceCache balanceCacher balance.Cacher
feed *event.Feed feed *event.Feed
foundHeaders []*DBHeader foundHeaders []*DBHeader
error error error error
noLimit bool noLimit bool
from *Block from *Block
to, resultingFrom, startBlock *big.Int to, resultingFrom, startBlock *big.Int
@ -81,13 +82,13 @@ func (c *ethHistoricalCommand) Run(ctx context.Context) (err error) {
start := time.Now() start := time.Now()
if c.from.Number != nil && c.from.Balance != nil { if c.from.Number != nil && c.from.Balance != nil {
c.balanceCache.addBalanceToCache(c.address, c.from.Number, c.from.Balance) c.balanceCacher.Cache().AddBalance(c.address, c.from.Number, c.from.Balance)
} }
if c.from.Number != nil && c.from.Nonce != nil { if c.from.Number != nil && c.from.Nonce != nil {
c.balanceCache.addNonceToCache(c.address, c.from.Number, c.from.Nonce) c.balanceCacher.Cache().AddNonce(c.address, c.from.Number, c.from.Nonce)
} }
from, headers, startBlock, err := findBlocksWithEthTransfers(ctx, c.chainClient, from, headers, startBlock, err := findBlocksWithEthTransfers(ctx, c.chainClient,
c.balanceCache, c.address, c.from.Number, c.to, c.noLimit, c.threadLimit) c.balanceCacher, c.address, c.from.Number, c.to, c.noLimit, c.threadLimit)
if err != nil { if err != nil {
c.error = err c.error = err
@ -189,6 +190,7 @@ type controlCommand struct {
transactionManager *TransactionManager transactionManager *TransactionManager
pendingTxManager *transactions.PendingTxTracker pendingTxManager *transactions.PendingTxTracker
tokenManager *token.Manager tokenManager *token.Manager
balanceCacher balance.Cacher
} }
func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) error { func (c *controlCommand) LoadTransfers(ctx context.Context, limit int) error {
@ -258,13 +260,12 @@ func (c *controlCommand) Run(parent context.Context) error {
toByAddress[address] = target toByAddress[address] = target
} }
bCache := newBalanceCache()
cmnd := &findAndCheckBlockRangeCommand{ cmnd := &findAndCheckBlockRangeCommand{
accounts: c.accounts, accounts: c.accounts,
db: c.db, db: c.db,
blockDAO: c.blockDAO, blockDAO: c.blockDAO,
chainClient: c.chainClient, chainClient: c.chainClient,
balanceCache: bCache, balanceCacher: c.balanceCacher,
feed: c.feed, feed: c.feed,
fromByAddress: fromByAddress, fromByAddress: fromByAddress,
toByAddress: toByAddress, toByAddress: toByAddress,
@ -285,7 +286,7 @@ func (c *controlCommand) Run(parent context.Context) error {
return cmnd.error return cmnd.error
} }
bCache.Clear() c.balanceCacher.Clear()
err = c.LoadTransfers(parent, numberOfBlocksCheckedPerIteration) err = c.LoadTransfers(parent, numberOfBlocksCheckedPerIteration)
if err != nil { if err != nil {
if c.NewError(err) { if c.NewError(err) {
@ -618,7 +619,7 @@ type findAndCheckBlockRangeCommand struct {
db *Database db *Database
blockDAO *BlockDAO blockDAO *BlockDAO
chainClient *chain.ClientWithFallback chainClient *chain.ClientWithFallback
balanceCache *balanceCache balanceCacher balance.Cacher
feed *event.Feed feed *event.Feed
fromByAddress map[common.Address]*Block fromByAddress map[common.Address]*Block
toByAddress map[common.Address]*big.Int toByAddress map[common.Address]*big.Int
@ -637,7 +638,7 @@ func (c *findAndCheckBlockRangeCommand) Command() async.Command {
func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) error { func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) error {
log.Debug("start findAndCHeckBlockRangeCommand") log.Debug("start findAndCHeckBlockRangeCommand")
newFromByAddress, ethHeadersByAddress, err := c.fastIndex(parent, c.balanceCache, c.fromByAddress, c.toByAddress) newFromByAddress, ethHeadersByAddress, err := c.fastIndex(parent, c.balanceCacher, c.fromByAddress, c.toByAddress)
if err != nil { if err != nil {
c.error = err c.error = err
// return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers. But if we return error, we will get stuck in inifinite loop. // return err // In case c.noLimit is true, hystrix "max concurrency" may be reached and we will not be able to index ETH transfers. But if we return error, we will get stuck in inifinite loop.
@ -679,12 +680,12 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) error {
lastBlockNumber := c.toByAddress[address] lastBlockNumber := c.toByAddress[address]
log.Debug("saving headers", "len", len(uniqHeaders), "lastBlockNumber", lastBlockNumber, log.Debug("saving headers", "len", len(uniqHeaders), "lastBlockNumber", lastBlockNumber,
"balance", c.balanceCache.ReadCachedBalance(address, lastBlockNumber), "nonce", c.balanceCache.ReadCachedNonce(address, lastBlockNumber)) "balance", c.balanceCacher.Cache().GetBalance(address, lastBlockNumber), "nonce", c.balanceCacher.Cache().GetNonce(address, lastBlockNumber))
to := &Block{ to := &Block{
Number: lastBlockNumber, Number: lastBlockNumber,
Balance: c.balanceCache.ReadCachedBalance(address, lastBlockNumber), Balance: c.balanceCacher.Cache().GetBalance(address, lastBlockNumber),
Nonce: c.balanceCache.ReadCachedNonce(address, lastBlockNumber), Nonce: c.balanceCacher.Cache().GetNonce(address, lastBlockNumber),
} }
log.Debug("uniqHeaders found for account", "address", address, "uniqHeaders.len", len(uniqHeaders)) log.Debug("uniqHeaders found for account", "address", address, "uniqHeaders.len", len(uniqHeaders))
err = c.db.ProcessBlocks(c.chainClient.ChainID, address, newFromByAddress[address], to, uniqHeaders) err = c.db.ProcessBlocks(c.chainClient.ChainID, address, newFromByAddress[address], to, uniqHeaders)
@ -701,7 +702,7 @@ func (c *findAndCheckBlockRangeCommand) Run(parent context.Context) error {
// run fast indexing for every accont up to canonical chain head minus safety depth. // run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header. // every account will run it from last synced header.
func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *balanceCache, func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCacher balance.Cacher,
fromByAddress map[common.Address]*Block, toByAddress map[common.Address]*big.Int) (map[common.Address]*big.Int, fromByAddress map[common.Address]*Block, toByAddress map[common.Address]*big.Int) (map[common.Address]*big.Int,
map[common.Address][]*DBHeader, error) { map[common.Address][]*DBHeader, error) {
@ -713,14 +714,14 @@ func (c *findAndCheckBlockRangeCommand) fastIndex(ctx context.Context, bCache *b
commands := make([]*ethHistoricalCommand, len(c.accounts)) commands := make([]*ethHistoricalCommand, len(c.accounts))
for i, address := range c.accounts { for i, address := range c.accounts {
eth := &ethHistoricalCommand{ eth := &ethHistoricalCommand{
chainClient: c.chainClient, chainClient: c.chainClient,
balanceCache: bCache, balanceCacher: bCacher,
address: address, address: address,
feed: c.feed, feed: c.feed,
from: fromByAddress[address], from: fromByAddress[address],
to: toByAddress[address], to: toByAddress[address],
noLimit: c.noLimit, noLimit: c.noLimit,
threadLimit: NoThreadLimit, threadLimit: NoThreadLimit,
} }
commands[i] = eth commands[i] = eth
group.Add(eth.Command()) group.Add(eth.Command())

View File

@ -11,6 +11,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions" "github.com/status-im/status-go/transactions"
@ -69,7 +70,7 @@ type findBlocksCommand struct {
db *Database db *Database
blockRangeDAO *BlockRangeSequentialDAO blockRangeDAO *BlockRangeSequentialDAO
chainClient *chain.ClientWithFallback chainClient *chain.ClientWithFallback
balanceCache *balanceCache balanceCacher balance.Cacher
feed *event.Feed feed *event.Feed
noLimit bool noLimit bool
transactionManager *TransactionManager transactionManager *TransactionManager
@ -112,8 +113,8 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
if len(headers) > 0 { if len(headers) > 0 {
log.Debug("findBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", to, log.Debug("findBlocksCommand saving headers", "len", len(headers), "lastBlockNumber", to,
"balance", c.balanceCache.ReadCachedBalance(c.account, to), "balance", c.balanceCacher.Cache().GetBalance(c.account, to),
"nonce", c.balanceCache.ReadCachedNonce(c.account, to)) "nonce", c.balanceCacher.Cache().GetNonce(c.account, to))
err = c.db.SaveBlocks(c.chainClient.ChainID, c.account, headers) err = c.db.SaveBlocks(c.chainClient.ChainID, c.account, headers)
if err != nil { if err != nil {
@ -145,7 +146,7 @@ func (c *findBlocksCommand) Run(parent context.Context) (err error) {
} }
func (c *findBlocksCommand) blocksFound(headers []*DBHeader) { func (c *findBlocksCommand) blocksFound(headers []*DBHeader) {
c.blocksLoadedCh <- headers c.blocksLoadedCh <- headers // TODO Use notifyOfNewBlocksLoaded instead ??
} }
func (c *findBlocksCommand) upsertBlockRange(blockRange *BlockRange) error { func (c *findBlocksCommand) upsertBlockRange(blockRange *BlockRange) error {
@ -167,7 +168,7 @@ func (c *findBlocksCommand) checkRange(parent context.Context, from *big.Int, to
fromBlock := &Block{Number: from} fromBlock := &Block{Number: from}
newFromBlock, ethHeaders, startBlock, err := c.fastIndex(parent, c.balanceCache, fromBlock, to) newFromBlock, ethHeaders, startBlock, err := c.fastIndex(parent, c.balanceCacher, fromBlock, to)
if err != nil { if err != nil {
log.Error("findBlocksCommand checkRange fastIndex", "err", err, "account", c.account, log.Error("findBlocksCommand checkRange fastIndex", "err", err, "account", c.account,
"chain", c.chainClient.ChainID) "chain", c.chainClient.ChainID)
@ -247,7 +248,7 @@ func areAllHistoryBlocksLoadedForAddress(blockRangeDAO *BlockRangeSequentialDAO,
// run fast indexing for every accont up to canonical chain head minus safety depth. // run fast indexing for every accont up to canonical chain head minus safety depth.
// every account will run it from last synced header. // every account will run it from last synced header.
func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache, func (c *findBlocksCommand) fastIndex(ctx context.Context, bCacher balance.Cacher,
fromBlock *Block, toBlockNumber *big.Int) (resultingFrom *Block, headers []*DBHeader, fromBlock *Block, toBlockNumber *big.Int) (resultingFrom *Block, headers []*DBHeader,
startBlock *big.Int, err error) { startBlock *big.Int, err error) {
@ -258,14 +259,14 @@ func (c *findBlocksCommand) fastIndex(ctx context.Context, bCache *balanceCache,
group := async.NewGroup(ctx) group := async.NewGroup(ctx)
command := &ethHistoricalCommand{ command := &ethHistoricalCommand{
chainClient: c.chainClient, chainClient: c.chainClient,
balanceCache: bCache, balanceCacher: bCacher,
address: c.account, address: c.account,
feed: c.feed, feed: c.feed,
from: fromBlock, from: fromBlock,
to: toBlockNumber, to: toBlockNumber,
noLimit: c.noLimit, noLimit: c.noLimit,
threadLimit: SequentialThreadLimit, threadLimit: SequentialThreadLimit,
} }
group.Add(command.Command()) group.Add(command.Command())
@ -349,7 +350,7 @@ func loadTransfersLoop(ctx context.Context, account common.Address, blockDAO *Bl
func newLoadBlocksAndTransfersCommand(account common.Address, db *Database, func newLoadBlocksAndTransfersCommand(account common.Address, db *Database,
blockDAO *BlockDAO, chainClient *chain.ClientWithFallback, feed *event.Feed, blockDAO *BlockDAO, chainClient *chain.ClientWithFallback, feed *event.Feed,
transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker,
tokenManager *token.Manager) *loadBlocksAndTransfersCommand { tokenManager *token.Manager, balanceCacher balance.Cacher) *loadBlocksAndTransfersCommand {
return &loadBlocksAndTransfersCommand{ return &loadBlocksAndTransfersCommand{
account: account, account: account,
@ -358,6 +359,7 @@ func newLoadBlocksAndTransfersCommand(account common.Address, db *Database,
blockDAO: blockDAO, blockDAO: blockDAO,
chainClient: chainClient, chainClient: chainClient,
feed: feed, feed: feed,
balanceCacher: balanceCacher,
errorsCount: 0, errorsCount: 0,
transactionManager: transactionManager, transactionManager: transactionManager,
pendingTxManager: pendingTxManager, pendingTxManager: pendingTxManager,
@ -373,7 +375,7 @@ type loadBlocksAndTransfersCommand struct {
blockDAO *BlockDAO blockDAO *BlockDAO
chainClient *chain.ClientWithFallback chainClient *chain.ClientWithFallback
feed *event.Feed feed *event.Feed
balanceCache *balanceCache balanceCacher balance.Cacher
errorsCount int errorsCount int
// nonArchivalRPCNode bool // TODO Make use of it // nonArchivalRPCNode bool // TODO Make use of it
transactionManager *TransactionManager transactionManager *TransactionManager
@ -389,11 +391,6 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
log.Debug("start load all transfers command", "chain", c.chainClient.ChainID, "account", c.account) log.Debug("start load all transfers command", "chain", c.chainClient.ChainID, "account", c.account)
ctx := parent ctx := parent
if c.balanceCache == nil {
c.balanceCache = newBalanceCache() // TODO - need to keep balanceCache in memory??? What about sharing it with other packages?
}
group := async.NewGroup(ctx) group := async.NewGroup(ctx)
err := c.fetchTransfersForLoadedBlocks(group) err := c.fetchTransfersForLoadedBlocks(group)
@ -414,7 +411,7 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error {
select { select {
case <-ctx.Done(): case <-ctx.Done():
c.balanceCache.Clear() c.balanceCacher.Clear()
return ctx.Err() return ctx.Err()
case <-group.WaitAsync(): case <-group.WaitAsync():
log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.ChainID, "account", c.account) log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.ChainID, "account", c.account)
@ -462,7 +459,7 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocks(ctx context.Context,
db: c.db, db: c.db,
blockRangeDAO: c.blockRangeDAO, blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient, chainClient: c.chainClient,
balanceCache: c.balanceCache, balanceCacher: c.balanceCacher,
feed: c.feed, feed: c.feed,
noLimit: false, noLimit: false,
fromBlockNumber: big.NewInt(0), fromBlockNumber: big.NewInt(0),
@ -500,7 +497,7 @@ func (c *loadBlocksAndTransfersCommand) startFetchingNewBlocks(group *async.Grou
db: c.db, db: c.db,
blockRangeDAO: c.blockRangeDAO, blockRangeDAO: c.blockRangeDAO,
chainClient: c.chainClient, chainClient: c.chainClient,
balanceCache: c.balanceCache, balanceCacher: c.balanceCacher,
feed: c.feed, feed: c.feed,
noLimit: false, noLimit: false,
transactionManager: c.transactionManager, transactionManager: c.transactionManager,

View File

@ -12,6 +12,7 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
) )
const ( const (
@ -93,7 +94,7 @@ type Downloader interface {
// Returns new block ranges that contain transfers and found block headers that contain transfers, and a block where // Returns new block ranges that contain transfers and found block headers that contain transfers, and a block where
// beginning of trasfers history detected // beginning of trasfers history detected
func checkRangesWithStartBlock(parent context.Context, client BalanceReader, cache BalanceCache, func checkRangesWithStartBlock(parent context.Context, client balance.Reader, cache balance.Cacher,
account common.Address, ranges [][]*big.Int, threadLimit uint32, startBlock *big.Int) ( account common.Address, ranges [][]*big.Int, threadLimit uint32, startBlock *big.Int) (
resRanges [][]*big.Int, headers []*DBHeader, newStartBlock *big.Int, err error) { resRanges [][]*big.Int, headers []*DBHeader, newStartBlock *big.Int, err error) {
@ -214,7 +215,7 @@ func checkRangesWithStartBlock(parent context.Context, client BalanceReader, cac
return c.GetRanges(), c.GetHeaders(), newStartBlock, nil return c.GetRanges(), c.GetHeaders(), newStartBlock, nil
} }
func findBlocksWithEthTransfers(parent context.Context, client BalanceReader, cache BalanceCache, func findBlocksWithEthTransfers(parent context.Context, client balance.Reader, cache balance.Cacher,
account common.Address, low, high *big.Int, noLimit bool, threadLimit uint32) ( account common.Address, low, high *big.Int, noLimit bool, threadLimit uint32) (
from *big.Int, headers []*DBHeader, resStartBlock *big.Int, err error) { from *big.Int, headers []*DBHeader, resStartBlock *big.Int, err error) {

View File

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
@ -142,7 +143,7 @@ func TestConcurrentEthDownloader(t *testing.T) {
defer cancel() defer cancel()
concurrent := NewConcurrentDownloader(ctx, 0) concurrent := NewConcurrentDownloader(ctx, 0)
_, headers, _, _ := findBlocksWithEthTransfers( _, headers, _, _ := findBlocksWithEthTransfers(
ctx, tc.options.balances, newBalanceCache(), ctx, tc.options.balances, balance.NewCache(),
common.Address{}, zero, tc.options.last, false, NoThreadLimit) common.Address{}, zero, tc.options.last, false, NoThreadLimit)
concurrent.Wait() concurrent.Wait()
require.NoError(t, concurrent.Error()) require.NoError(t, concurrent.Error())

View File

@ -14,6 +14,7 @@ import (
"github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/accounts/accountsevent" "github.com/status-im/status-go/services/accounts/accountsevent"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/transactions" "github.com/status-im/status-go/transactions"
) )
@ -29,11 +30,13 @@ type Controller struct {
transactionManager *TransactionManager transactionManager *TransactionManager
pendingTxManager *transactions.PendingTxTracker pendingTxManager *transactions.PendingTxTracker
tokenManager *token.Manager tokenManager *token.Manager
balanceCacher balance.Cacher
loadAllTransfers bool loadAllTransfers bool
} }
func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed, func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event.Feed, transferFeed *event.Feed,
transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager, loadAllTransfers bool) *Controller { transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager,
balanceCacher balance.Cacher, loadAllTransfers bool) *Controller {
blockDAO := &BlockDAO{db} blockDAO := &BlockDAO{db}
return &Controller{ return &Controller{
@ -45,6 +48,7 @@ func NewTransferController(db *sql.DB, rpcClient *rpc.Client, accountFeed *event
transactionManager: transactionManager, transactionManager: transactionManager,
pendingTxManager: pendingTxManager, pendingTxManager: pendingTxManager,
tokenManager: tokenManager, tokenManager: tokenManager,
balanceCacher: balanceCacher,
loadAllTransfers: loadAllTransfers, loadAllTransfers: loadAllTransfers,
} }
} }
@ -93,7 +97,7 @@ func (c *Controller) CheckRecentHistory(chainIDs []uint64, accounts []common.Add
} }
} else { } else {
c.reactor = NewReactor(c.db, c.blockDAO, c.TransferFeed, c.transactionManager, c.reactor = NewReactor(c.db, c.blockDAO, c.TransferFeed, c.transactionManager,
c.pendingTxManager, c.tokenManager) c.pendingTxManager, c.tokenManager, c.balanceCacher)
err = c.reactor.start(chainClients, accounts, c.loadAllTransfers) err = c.reactor.start(chainClients, accounts, c.loadAllTransfers)
if err != nil { if err != nil {

View File

@ -13,6 +13,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions" "github.com/status-im/status-go/transactions"
@ -40,14 +41,6 @@ type HeaderReader interface {
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
} }
// BalanceReader interface for reading balance at a specifeid address.
type BalanceReader interface {
BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error)
NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error)
HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error)
FullTransactionByBlockNumberAndIndex(ctx context.Context, blockNumber *big.Int, index uint) (*chain.FullTransaction, error)
}
type HistoryFetcher interface { type HistoryFetcher interface {
start() error start() error
stop() stop()
@ -66,11 +59,13 @@ func NewOnDemandFetchStrategy(
tokenManager *token.Manager, tokenManager *token.Manager,
chainClients map[uint64]*chain.ClientWithFallback, chainClients map[uint64]*chain.ClientWithFallback,
accounts []common.Address, accounts []common.Address,
balanceCacher balance.Cacher,
) *OnDemandFetchStrategy { ) *OnDemandFetchStrategy {
strategy := &OnDemandFetchStrategy{ strategy := &OnDemandFetchStrategy{
db: db, db: db,
blockDAO: blockDAO, blockDAO: blockDAO,
feed: feed, feed: feed,
balanceCacher: balanceCacher,
transactionManager: transactionManager, transactionManager: transactionManager,
pendingTxManager: pendingTxManager, pendingTxManager: pendingTxManager,
tokenManager: tokenManager, tokenManager: tokenManager,
@ -87,7 +82,7 @@ type OnDemandFetchStrategy struct {
feed *event.Feed feed *event.Feed
mu sync.Mutex mu sync.Mutex
group *async.Group group *async.Group
balanceCache *balanceCache balanceCacher balance.Cacher
transactionManager *TransactionManager transactionManager *TransactionManager
pendingTxManager *transactions.PendingTxTracker pendingTxManager *transactions.PendingTxTracker
tokenManager *token.Manager tokenManager *token.Manager
@ -114,6 +109,7 @@ func (s *OnDemandFetchStrategy) newControlCommand(chainClient *chain.ClientWithF
transactionManager: s.transactionManager, transactionManager: s.transactionManager,
pendingTxManager: s.pendingTxManager, pendingTxManager: s.pendingTxManager,
tokenManager: s.tokenManager, tokenManager: s.tokenManager,
balanceCacher: s.balanceCacher,
} }
return ctl return ctl
@ -207,14 +203,11 @@ func (s *OnDemandFetchStrategy) getTransfersByAddress(ctx context.Context, chain
}} }}
toByAddress := map[common.Address]*big.Int{address: block} toByAddress := map[common.Address]*big.Int{address: block}
if s.balanceCache == nil {
s.balanceCache = newBalanceCache()
}
blocksCommand := &findAndCheckBlockRangeCommand{ blocksCommand := &findAndCheckBlockRangeCommand{
accounts: []common.Address{address}, accounts: []common.Address{address},
db: s.db, db: s.db,
chainClient: chainClient, chainClient: chainClient,
balanceCache: s.balanceCache, balanceCacher: s.balanceCacher,
feed: s.feed, feed: s.feed,
fromByAddress: fromByAddress, fromByAddress: fromByAddress,
toByAddress: toByAddress, toByAddress: toByAddress,
@ -223,7 +216,7 @@ func (s *OnDemandFetchStrategy) getTransfersByAddress(ctx context.Context, chain
if err = blocksCommand.Command()(ctx); err != nil { if err = blocksCommand.Command()(ctx); err != nil {
return nil, err return nil, err
} }
s.balanceCache.Clear() s.balanceCacher.Clear()
blocks, err := s.blockDAO.GetBlocksToLoadByAddress(chainID, address, numberOfBlocksCheckedPerIteration) blocks, err := s.blockDAO.GetBlocksToLoadByAddress(chainID, address, numberOfBlocksCheckedPerIteration)
if err != nil { if err != nil {
@ -266,10 +259,12 @@ type Reactor struct {
pendingTxManager *transactions.PendingTxTracker pendingTxManager *transactions.PendingTxTracker
tokenManager *token.Manager tokenManager *token.Manager
strategy HistoryFetcher strategy HistoryFetcher
balanceCacher balance.Cacher
} }
func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *TransactionManager, func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *TransactionManager,
pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager) *Reactor { pendingTxManager *transactions.PendingTxTracker, tokenManager *token.Manager,
balanceCacher balance.Cacher) *Reactor {
return &Reactor{ return &Reactor{
db: db, db: db,
blockDAO: blockDAO, blockDAO: blockDAO,
@ -277,6 +272,7 @@ func NewReactor(db *Database, blockDAO *BlockDAO, feed *event.Feed, tm *Transact
transactionManager: tm, transactionManager: tm,
pendingTxManager: pendingTxManager, pendingTxManager: pendingTxManager,
tokenManager: tokenManager, tokenManager: tokenManager,
balanceCacher: balanceCacher,
} }
} }
@ -315,10 +311,11 @@ func (r *Reactor) createFetchStrategy(chainClients map[uint64]*chain.ClientWithF
r.tokenManager, r.tokenManager,
chainClients, chainClients,
accounts, accounts,
r.balanceCacher,
) )
} }
return NewOnDemandFetchStrategy(r.db, r.blockDAO, r.feed, r.transactionManager, r.pendingTxManager, r.tokenManager, chainClients, accounts) return NewOnDemandFetchStrategy(r.db, r.blockDAO, r.feed, r.transactionManager, r.pendingTxManager, r.tokenManager, chainClients, accounts, r.balanceCacher)
} }
func (r *Reactor) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int, func (r *Reactor) getTransfersByAddress(ctx context.Context, chainID uint64, address common.Address, toBlock *big.Int,

View File

@ -10,6 +10,7 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/token" "github.com/status-im/status-go/services/wallet/token"
"github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/services/wallet/walletevent"
"github.com/status-im/status-go/transactions" "github.com/status-im/status-go/transactions"
@ -19,7 +20,9 @@ func NewSequentialFetchStrategy(db *Database, blockDAO *BlockDAO, feed *event.Fe
transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker, transactionManager *TransactionManager, pendingTxManager *transactions.PendingTxTracker,
tokenManager *token.Manager, tokenManager *token.Manager,
chainClients map[uint64]*chain.ClientWithFallback, chainClients map[uint64]*chain.ClientWithFallback,
accounts []common.Address) *SequentialFetchStrategy { accounts []common.Address,
balanceCacher balance.Cacher,
) *SequentialFetchStrategy {
return &SequentialFetchStrategy{ return &SequentialFetchStrategy{
db: db, db: db,
@ -30,6 +33,7 @@ func NewSequentialFetchStrategy(db *Database, blockDAO *BlockDAO, feed *event.Fe
tokenManager: tokenManager, tokenManager: tokenManager,
chainClients: chainClients, chainClients: chainClients,
accounts: accounts, accounts: accounts,
balanceCacher: balanceCacher,
} }
} }
@ -44,13 +48,14 @@ type SequentialFetchStrategy struct {
tokenManager *token.Manager tokenManager *token.Manager
chainClients map[uint64]*chain.ClientWithFallback chainClients map[uint64]*chain.ClientWithFallback
accounts []common.Address accounts []common.Address
balanceCacher balance.Cacher
} }
func (s *SequentialFetchStrategy) newCommand(chainClient *chain.ClientWithFallback, func (s *SequentialFetchStrategy) newCommand(chainClient *chain.ClientWithFallback,
account common.Address) async.Commander { account common.Address) async.Commander {
return newLoadBlocksAndTransfersCommand(account, s.db, s.blockDAO, chainClient, s.feed, return newLoadBlocksAndTransfersCommand(account, s.db, s.blockDAO, chainClient, s.feed,
s.transactionManager, s.pendingTxManager, s.tokenManager) s.transactionManager, s.pendingTxManager, s.tokenManager, s.balanceCacher)
} }
func (s *SequentialFetchStrategy) start() error { func (s *SequentialFetchStrategy) start() error {