From 23492467b91578b0526ce4024d421b7a2136ffdf Mon Sep 17 00:00:00 2001 From: Anthony Laibe Date: Mon, 5 Jun 2023 14:33:51 +0200 Subject: [PATCH] feat: clear balance cache after usage --- services/wallet/api.go | 10 --- services/wallet/transfer/balance_cache.go | 71 +++++++++++++++---- services/wallet/transfer/commands.go | 2 +- .../wallet/transfer/commands_sequential.go | 1 + services/wallet/transfer/reactor.go | 1 + 5 files changed, 61 insertions(+), 24 deletions(-) diff --git a/services/wallet/api.go b/services/wallet/api.go index e1a61d708..a68df669b 100644 --- a/services/wallet/api.go +++ b/services/wallet/api.go @@ -119,16 +119,6 @@ func (api *API) FetchDecodedTxData(ctx context.Context, data string) (*thirdpart return api.s.decoder.Decode(data) } -// Deprecated: GetCachedBalances is deprecated. Use GetTokensBalances instead -func (api *API) GetCachedBalances(ctx context.Context, addresses []common.Address) ([]transfer.BlockView, error) { - return api.s.transferController.GetCachedBalances(ctx, api.s.rpcClient.UpstreamChainID, addresses) -} - -// Deprecated: GetCachedBalances is deprecated. Use GetTokensBalancesForChainIDs instead -func (api *API) GetCachedBalancesbyChainID(ctx context.Context, chainID uint64, addresses []common.Address) ([]transfer.BlockView, error) { - return api.s.transferController.GetCachedBalances(ctx, chainID, addresses) -} - // GetTokensBalances return mapping of token balances for every account. func (api *API) GetTokensBalances(ctx context.Context, accounts, addresses []common.Address) (map[common.Address]map[common.Address]*hexutil.Big, error) { chainClients, err := api.s.rpcClient.EthClients([]uint64{api.s.rpcClient.UpstreamChainID}) diff --git a/services/wallet/transfer/balance_cache.go b/services/wallet/transfer/balance_cache.go index 909374034..a9faa18dc 100644 --- a/services/wallet/transfer/balance_cache.go +++ b/services/wallet/transfer/balance_cache.go @@ -15,6 +15,12 @@ type nonceRange struct { min *big.Int } +type BalanceCache interface { + BalanceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*big.Int, error) + NonceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*int64, error) + Clear() +} + type balanceCache struct { // balances maps an address to a map of a block number and the balance of this particular address balances map[common.Address]map[uint64]*big.Int // we don't care about block number overflow as we use cache only for comparing balances when fetching, not for UI @@ -24,9 +30,58 @@ type balanceCache struct { rw sync.RWMutex } -type BalanceCache interface { - BalanceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*big.Int, error) - NonceAt(ctx context.Context, client BalanceReader, account common.Address, blockNumber *big.Int) (*int64, error) +func newBalanceCache() *balanceCache { + return &balanceCache{ + balances: make(map[common.Address]map[uint64]*big.Int), + nonces: make(map[common.Address]map[uint64]*int64), + nonceRanges: make(map[common.Address]map[int64]nonceRange), + sortedRanges: make(map[common.Address][]nonceRange), + } +} + +func (b *balanceCache) Clear() { + for address, cache := range b.balances { + if len(cache) == 0 { + continue + } + + var maxBlock uint64 = 0 + var minBlock uint64 = 18446744073709551615 + for key := range cache { + if key > maxBlock { + maxBlock = key + } + if key < minBlock { + minBlock = key + } + } + newCache := make(map[uint64]*big.Int) + newCache[maxBlock] = cache[maxBlock] + newCache[minBlock] = cache[minBlock] + b.balances[address] = newCache + } + for address, cache := range b.nonces { + if len(cache) == 0 { + continue + } + + var maxBlock uint64 = 0 + var minBlock uint64 = 18446744073709551615 + for key := range cache { + if key > maxBlock { + maxBlock = key + } + if key < minBlock { + minBlock = key + } + } + newCache := make(map[uint64]*int64) + newCache[maxBlock] = cache[maxBlock] + newCache[minBlock] = cache[minBlock] + b.nonces[address] = newCache + } + b.nonceRanges = make(map[common.Address]map[int64]nonceRange) + b.sortedRanges = make(map[common.Address][]nonceRange) } func (b *balanceCache) ReadCachedBalance(account common.Address, blockNumber *big.Int) *big.Int { @@ -151,7 +206,6 @@ func (b *balanceCache) NonceAt(ctx context.Context, client BalanceReader, accoun if cachedNonce != nil { return cachedNonce, nil } - rangeNonce := b.findNonceInRange(account, blockNumber) if rangeNonce != nil { return rangeNonce, nil @@ -166,12 +220,3 @@ func (b *balanceCache) NonceAt(ctx context.Context, client BalanceReader, accoun return &int64Nonce, nil } - -func newBalanceCache() *balanceCache { - return &balanceCache{ - balances: make(map[common.Address]map[uint64]*big.Int), - nonces: make(map[common.Address]map[uint64]*int64), - nonceRanges: make(map[common.Address]map[int64]nonceRange), - sortedRanges: make(map[common.Address][]nonceRange), - } -} diff --git a/services/wallet/transfer/commands.go b/services/wallet/transfer/commands.go index fbf3376b5..014e6c395 100644 --- a/services/wallet/transfer/commands.go +++ b/services/wallet/transfer/commands.go @@ -279,6 +279,7 @@ func (c *controlCommand) Run(parent context.Context) error { return cmnd.error } + bCache.Clear() err = c.LoadTransfers(parent, numberOfBlocksCheckedPerIteration) if err != nil { if c.NewError(err) { @@ -314,7 +315,6 @@ func (c *controlCommand) Run(parent context.Context) error { BlockNumber: target, }) } - log.Info("end control command") return err } diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index bfe9b1ca7..42d5c3a13 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -414,6 +414,7 @@ func (c *loadBlocksAndTransfersCommand) Run(parent context.Context) error { select { case <-ctx.Done(): + c.balanceCache.Clear() return ctx.Err() case <-group.WaitAsync(): log.Debug("end loadBlocksAndTransfers command", "chain", c.chainClient.ChainID, "account", c.account) diff --git a/services/wallet/transfer/reactor.go b/services/wallet/transfer/reactor.go index ce8911a0e..f20847067 100644 --- a/services/wallet/transfer/reactor.go +++ b/services/wallet/transfer/reactor.go @@ -199,6 +199,7 @@ func (s *OnDemandFetchStrategy) getTransfersByAddress(ctx context.Context, chain if err = blocksCommand.Command()(ctx); err != nil { return nil, err } + s.balanceCache.Clear() blocks, err := s.blockDAO.GetBlocksToLoadByAddress(chainID, address, numberOfBlocksCheckedPerIteration) if err != nil {