fix(wallet)_: balances were not updated for accounts not checked during

initial balance fetch, as cache was considered valid and in case
balance was previously stored in DB for this account, it was returned
though it was not up to date.
This commit is contained in:
Ivan Belyakov 2024-04-11 17:33:07 +02:00 committed by IvanBelyakoff
parent 116fda7461
commit a549529637
1 changed files with 60 additions and 23 deletions

View File

@ -5,7 +5,6 @@ import (
"math" "math"
"math/big" "math/big"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -57,7 +56,6 @@ func NewReader(rpcClient *rpc.Client, tokenManager *token.Manager, marketManager
accountsDB: accountsDB, accountsDB: accountsDB,
persistence: persistence, persistence: persistence,
walletFeed: walletFeed, walletFeed: walletFeed,
lastWalletTokenUpdateTimestamp: atomic.Int64{},
refreshBalanceCache: true, refreshBalanceCache: true,
} }
} }
@ -72,7 +70,7 @@ type Reader struct {
walletFeed *event.Feed walletFeed *event.Feed
cancel context.CancelFunc cancel context.CancelFunc
walletEventsWatcher *walletevent.Watcher walletEventsWatcher *walletevent.Watcher
lastWalletTokenUpdateTimestamp atomic.Int64 lastWalletTokenUpdateTimestamp sync.Map
reloadDelayTimer *time.Timer reloadDelayTimer *time.Timer
refreshBalanceCache bool refreshBalanceCache bool
rw sync.RWMutex rw sync.RWMutex
@ -185,7 +183,7 @@ func (r *Reader) Stop() {
r.cancelDelayedWalletReload() r.cancelDelayedWalletReload()
r.lastWalletTokenUpdateTimestamp.Store(0) r.lastWalletTokenUpdateTimestamp = sync.Map{}
} }
func (r *Reader) triggerWalletReload() { func (r *Reader) triggerWalletReload() {
@ -222,13 +220,15 @@ func (r *Reader) startWalletEventsWatcher() {
return return
} }
timecheck := r.lastWalletTokenUpdateTimestamp.Load() - activityReloadMarginSeconds for _, address := range event.Accounts {
if event.At > timecheck { timestamp, ok := r.lastWalletTokenUpdateTimestamp.Load(address)
r.triggerDelayedWalletReload() timecheck := timestamp.(int64) - activityReloadMarginSeconds
}
if transfer.IsTransferDetectionEvent(event.Type) { if !ok || event.At > timecheck {
r.triggerDelayedWalletReload()
r.invalidateBalanceCache() r.invalidateBalanceCache()
break
}
} }
} }
@ -244,11 +244,42 @@ func (r *Reader) stopWalletEventsWatcher() {
} }
} }
func (r *Reader) isBalanceCacheValid() bool { func (r *Reader) tokensCachedForAddresses(addresses []common.Address) bool {
for _, address := range addresses {
cachedTokens, err := r.GetCachedWalletTokensWithoutMarketData()
if err != nil {
return false
}
_, ok := cachedTokens[address]
if !ok {
return false
}
}
return true
}
func (r *Reader) isCacheTimestampValidForAddress(address common.Address) bool {
_, ok := r.lastWalletTokenUpdateTimestamp.Load(address)
return ok
}
func (r *Reader) areCacheTimestampsValid(addresses []common.Address) bool {
for _, address := range addresses {
if !r.isCacheTimestampValidForAddress(address) {
return false
}
}
return true
}
func (r *Reader) isBalanceCacheValid(addresses []common.Address) bool {
r.rw.RLock() r.rw.RLock()
defer r.rw.RUnlock() defer r.rw.RUnlock()
return !r.refreshBalanceCache return !r.refreshBalanceCache && r.tokensCachedForAddresses(addresses) && r.areCacheTimestampsValid(addresses)
} }
func (r *Reader) balanceRefreshed() { func (r *Reader) balanceRefreshed() {
@ -266,7 +297,7 @@ func (r *Reader) invalidateBalanceCache() {
} }
func (r *Reader) FetchOrGetCachedWalletBalances(ctx context.Context, addresses []common.Address) (map[common.Address][]Token, error) { func (r *Reader) FetchOrGetCachedWalletBalances(ctx context.Context, addresses []common.Address) (map[common.Address][]Token, error) {
if !r.isBalanceCacheValid() { if !r.isBalanceCacheValid(addresses) {
balances, err := r.GetWalletTokenBalances(ctx, addresses) balances, err := r.GetWalletTokenBalances(ctx, addresses)
if err != nil { if err != nil {
return nil, err return nil, err
@ -458,7 +489,7 @@ func (r *Reader) getWalletTokenBalances(ctx context.Context, addresses []common.
} }
} }
r.lastWalletTokenUpdateTimestamp.Store(time.Now().Unix()) r.updateTokenUpdateTimestamp(addresses)
return result, r.persistence.SaveTokens(result) return result, r.persistence.SaveTokens(result)
} }
@ -655,7 +686,7 @@ func (r *Reader) GetWalletToken(ctx context.Context, addresses []common.Address)
} }
} }
r.lastWalletTokenUpdateTimestamp.Store(time.Now().Unix()) r.updateTokenUpdateTimestamp(addresses)
return result, r.persistence.SaveTokens(result) return result, r.persistence.SaveTokens(result)
} }
@ -680,3 +711,9 @@ func (r *Reader) isCachedToken(cachedTokens map[common.Address][]Token, address
func (r *Reader) GetCachedWalletTokensWithoutMarketData() (map[common.Address][]Token, error) { func (r *Reader) GetCachedWalletTokensWithoutMarketData() (map[common.Address][]Token, error) {
return r.persistence.GetTokens() return r.persistence.GetTokens()
} }
func (r *Reader) updateTokenUpdateTimestamp(addresses []common.Address) {
for _, address := range addresses {
r.lastWalletTokenUpdateTimestamp.Store(address, time.Now().Unix())
}
}