feat: fallback to cache for prices and token market values when calling GetWalletToken (#5832)

This change adapts the market manager to cache the token-market-cap, and modifies the existing `GetWalletToken` function to use the token-price and token-market cache data. Additionally, we choose to use the cached price and market data for roughly 60 seconds when calling the `GetWalletToken` function.
This commit is contained in:
Sean Hagstrom 2024-09-20 12:24:43 +01:00 committed by GitHub
parent f165103f66
commit 385933a60d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 293 additions and 77 deletions

View File

@ -18,17 +18,30 @@ const (
EventMarketStatusChanged walletevent.EventType = "wallet-market-status-changed" EventMarketStatusChanged walletevent.EventType = "wallet-market-status-changed"
) )
const (
MaxAgeInSecondsForFresh int64 = -1
MaxAgeInSecondsForBalances int64 = 60
)
type DataPoint struct { type DataPoint struct {
Price float64 Price float64
UpdatedAt int64 UpdatedAt int64
} }
type MarketValuesSnapshot struct {
MarketValues thirdparty.TokenMarketValues
UpdatedAt int64
}
type DataPerTokenAndCurrency = map[string]map[string]DataPoint type DataPerTokenAndCurrency = map[string]map[string]DataPoint
type MarketValuesPerCurrencyAndToken = map[string]map[string]MarketValuesSnapshot
type TokenMarketCache MarketValuesPerCurrencyAndToken
type TokenPriceCache DataPerTokenAndCurrency
type Manager struct { type Manager struct {
feed *event.Feed feed *event.Feed
priceCache DataPerTokenAndCurrency priceCache MarketCache[TokenPriceCache]
priceCacheLock sync.RWMutex marketCache MarketCache[TokenMarketCache]
IsConnected bool IsConnected bool
LastCheckedAt int64 LastCheckedAt int64
IsConnectedLock sync.RWMutex IsConnectedLock sync.RWMutex
@ -46,7 +59,8 @@ func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Ma
return &Manager{ return &Manager{
feed: feed, feed: feed,
priceCache: make(DataPerTokenAndCurrency), priceCache: *NewCache(make(TokenPriceCache)),
marketCache: *NewCache(make(TokenMarketCache)),
IsConnected: true, IsConnected: true,
LastCheckedAt: time.Now().Unix(), LastCheckedAt: time.Now().Unix(),
circuitbreaker: cb, circuitbreaker: cb,
@ -136,6 +150,81 @@ func (pm *Manager) FetchTokenMarketValues(symbols []string, currency string) (ma
return marketValues, nil return marketValues, nil
} }
func (pm *Manager) updateMarketCache(currency string, marketValues map[string]thirdparty.TokenMarketValues) {
Write(&pm.marketCache, func(tokenMarketCache TokenMarketCache) TokenMarketCache {
for token, tokenMarketValues := range marketValues {
if _, present := tokenMarketCache[currency]; !present {
tokenMarketCache[currency] = make(map[string]MarketValuesSnapshot)
}
tokenMarketCache[currency][token] = MarketValuesSnapshot{
UpdatedAt: time.Now().Unix(),
MarketValues: tokenMarketValues,
}
}
return tokenMarketCache
})
}
func (pm *Manager) GetOrFetchTokenMarketValues(symbols []string, currency string, maxAgeInSeconds int64) (map[string]thirdparty.TokenMarketValues, error) {
// docs: Determine which token market data to fetch based on what's inside the cache and the last time the cache was updated
symbolsToFetch := Read(&pm.marketCache, func(marketCache TokenMarketCache) []string {
tokenMarketValuesCache, ok := marketCache[currency]
if !ok {
return symbols
}
now := time.Now().Unix()
symbolsToFetchMap := make(map[string]bool)
symbolsToFetch := make([]string, 0, len(symbols))
for _, symbol := range symbols {
marketValueSnapshot, found := tokenMarketValuesCache[symbol]
if !found {
if !symbolsToFetchMap[symbol] {
symbolsToFetchMap[symbol] = true
symbolsToFetch = append(symbolsToFetch, symbol)
}
continue
}
if now-marketValueSnapshot.UpdatedAt > maxAgeInSeconds {
if !symbolsToFetchMap[symbol] {
symbolsToFetchMap[symbol] = true
symbolsToFetch = append(symbolsToFetch, symbol)
}
continue
}
}
return symbolsToFetch
})
// docs: Fetch and cache the token market data for missing or stale token market data
if len(symbolsToFetch) > 0 {
marketValues, err := pm.FetchTokenMarketValues(symbolsToFetch, currency)
if err != nil {
return nil, err
}
pm.updateMarketCache(currency, marketValues)
}
// docs: Extract token market data from populated cache
tokenMarketValues := Read(&pm.marketCache, func(tokenMarketCache TokenMarketCache) map[string]thirdparty.TokenMarketValues {
tokenMarketValuesPerSymbol := make(map[string]thirdparty.TokenMarketValues)
if cachedTokenMarketValues, ok := tokenMarketCache[currency]; ok {
for _, symbol := range symbols {
if marketValuesSnapshot, found := cachedTokenMarketValues[symbol]; found {
tokenMarketValuesPerSymbol[symbol] = marketValuesSnapshot.MarketValues
}
}
}
return tokenMarketValuesPerSymbol
})
return tokenMarketValues, nil
}
func (pm *Manager) FetchTokenDetails(symbols []string) (map[string]thirdparty.TokenDetails, error) { func (pm *Manager) FetchTokenDetails(symbols []string) (map[string]thirdparty.TokenDetails, error) {
result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) { result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) {
return provider.FetchTokenDetails(symbols) return provider.FetchTokenDetails(symbols)
@ -179,52 +268,47 @@ func (pm *Manager) FetchPrices(symbols []string, currencies []string) (map[strin
} }
func (pm *Manager) getCachedPricesFor(symbols []string, currencies []string) DataPerTokenAndCurrency { func (pm *Manager) getCachedPricesFor(symbols []string, currencies []string) DataPerTokenAndCurrency {
return Read(&pm.priceCache, func(tokenPriceCache TokenPriceCache) DataPerTokenAndCurrency {
prices := make(DataPerTokenAndCurrency) prices := make(DataPerTokenAndCurrency)
for _, symbol := range symbols { for _, symbol := range symbols {
prices[symbol] = make(map[string]DataPoint) prices[symbol] = make(map[string]DataPoint)
for _, currency := range currencies { for _, currency := range currencies {
prices[symbol][currency] = pm.priceCache[symbol][currency] prices[symbol][currency] = tokenPriceCache[symbol][currency]
} }
} }
return prices return prices
})
} }
func (pm *Manager) updatePriceCache(prices map[string]map[string]float64) { func (pm *Manager) updatePriceCache(prices map[string]map[string]float64) {
pm.priceCacheLock.Lock() Write(&pm.priceCache, func(tokenPriceCache TokenPriceCache) TokenPriceCache {
defer pm.priceCacheLock.Unlock()
for token, pricesPerCurrency := range prices { for token, pricesPerCurrency := range prices {
_, present := pm.priceCache[token] _, present := tokenPriceCache[token]
if !present { if !present {
pm.priceCache[token] = make(map[string]DataPoint) tokenPriceCache[token] = make(map[string]DataPoint)
} }
for currency, price := range pricesPerCurrency { for currency, price := range pricesPerCurrency {
pm.priceCache[token][currency] = DataPoint{ tokenPriceCache[token][currency] = DataPoint{
Price: price, Price: price,
UpdatedAt: time.Now().Unix(), UpdatedAt: time.Now().Unix(),
} }
} }
} }
}
func (pm *Manager) GetCachedPrices() DataPerTokenAndCurrency { return tokenPriceCache
pm.priceCacheLock.RLock() })
defer pm.priceCacheLock.RUnlock()
return pm.priceCache
} }
// Return cached price if present in cache and age is less than maxAgeInSeconds. Fetch otherwise. // Return cached price if present in cache and age is less than maxAgeInSeconds. Fetch otherwise.
func (pm *Manager) GetOrFetchPrices(symbols []string, currencies []string, maxAgeInSeconds int64) (DataPerTokenAndCurrency, error) { func (pm *Manager) GetOrFetchPrices(symbols []string, currencies []string, maxAgeInSeconds int64) (DataPerTokenAndCurrency, error) {
symbolsToFetch := Read(&pm.priceCache, func(tokenPriceCache TokenPriceCache) []string {
symbolsToFetchMap := make(map[string]bool) symbolsToFetchMap := make(map[string]bool)
symbolsToFetch := make([]string, 0, len(symbols)) symbolsToFetch := make([]string, 0, len(symbols))
now := time.Now().Unix() now := time.Now().Unix()
for _, symbol := range symbols { for _, symbol := range symbols {
tokenPriceCache, ok := pm.GetCachedPrices()[symbol] tokenPriceCache, ok := tokenPriceCache[symbol]
if !ok { if !ok {
if !symbolsToFetchMap[symbol] { if !symbolsToFetchMap[symbol] {
symbolsToFetchMap[symbol] = true symbolsToFetchMap[symbol] = true
@ -243,6 +327,9 @@ func (pm *Manager) GetOrFetchPrices(symbols []string, currencies []string, maxAg
} }
} }
return symbolsToFetch
})
if len(symbolsToFetch) > 0 { if len(symbolsToFetch) > 0 {
_, err := pm.FetchPrices(symbolsToFetch, currencies) _, err := pm.FetchPrices(symbolsToFetch, currencies)
if err != nil { if err != nil {

View File

@ -0,0 +1,35 @@
package market
import (
"sync"
)
type MarketCache[T any] struct {
store T
lock sync.RWMutex
}
func NewCache[T any](store T) *MarketCache[T] {
var cache MarketCache[T]
cache.store = store
return &cache
}
func Read[T any, R any](cache *MarketCache[T], reader func(store T) R) R {
cache.lock.RLock()
defer cache.lock.RUnlock()
return reader(cache.store)
}
func Write[T any](cache *MarketCache[T], writer func(store T) T) *MarketCache[T] {
cache.lock.Lock()
defer cache.lock.Unlock()
cache.store = writer(cache.store)
return cache
}
func (cache *MarketCache[T]) Get() T {
cache.lock.RLock()
defer cache.lock.RUnlock()
return cache.store
}

View File

@ -52,7 +52,7 @@ func (mpp *MockPriceProviderWithError) FetchPrices(symbols []string, currencies
return nil, errors.New("error") return nil, errors.New("error")
} }
func setupTestPrice(t *testing.T, providers []thirdparty.MarketDataProvider) *Manager { func setupMarketManager(t *testing.T, providers []thirdparty.MarketDataProvider) *Manager {
return NewManager(providers, &event.Feed{}) return NewManager(providers, &event.Feed{})
} }
@ -83,10 +83,10 @@ func TestPrice(t *testing.T) {
priceProvider := NewMockPriceProvider(ctrl) priceProvider := NewMockPriceProvider(ctrl)
priceProvider.setMockPrices(mockPrices) priceProvider.setMockPrices(mockPrices)
manager := setupTestPrice(t, []thirdparty.MarketDataProvider{priceProvider, priceProvider}) manager := setupMarketManager(t, []thirdparty.MarketDataProvider{priceProvider, priceProvider})
{ {
rst := manager.GetCachedPrices() rst := manager.priceCache.Get()
require.Empty(t, rst) require.Empty(t, rst)
} }
@ -114,7 +114,7 @@ func TestPrice(t *testing.T) {
} }
} }
cache := manager.GetCachedPrices() cache := manager.priceCache.Get()
for symbol, pricePerCurrency := range mockPrices { for symbol, pricePerCurrency := range mockPrices {
for currency, price := range pricePerCurrency { for currency, price := range pricePerCurrency {
require.Equal(t, price, cache[symbol][currency].Price) require.Equal(t, price, cache[symbol][currency].Price)
@ -131,7 +131,7 @@ func TestFetchPriceErrorFirstProvider(t *testing.T) {
symbols := []string{"BTC", "ETH"} symbols := []string{"BTC", "ETH"}
currencies := []string{"USD", "EUR"} currencies := []string{"USD", "EUR"}
manager := setupTestPrice(t, []thirdparty.MarketDataProvider{priceProviderWithError, priceProvider}) manager := setupMarketManager(t, []thirdparty.MarketDataProvider{priceProviderWithError, priceProvider})
rst, err := manager.FetchPrices(symbols, currencies) rst, err := manager.FetchPrices(symbols, currencies)
require.NoError(t, err) require.NoError(t, err)
for _, symbol := range symbols { for _, symbol := range symbols {
@ -141,13 +141,16 @@ func TestFetchPriceErrorFirstProvider(t *testing.T) {
} }
} }
func TestFetchTokenMarketValues(t *testing.T) { func setMarketCacheForTesting(t *testing.T, manager *Manager, currency string, marketValues map[string]thirdparty.TokenMarketValues) {
t.Helper()
manager.updateMarketCache(currency, marketValues)
}
func TestGetOrFetchTokenMarketValues(t *testing.T) {
ctrl := gomock.NewController(t) ctrl := gomock.NewController(t)
defer ctrl.Finish() defer ctrl.Finish()
symbols := []string{"BTC", "ETH"} initialTokenMarketValues := map[string]thirdparty.TokenMarketValues{
currency := "EUR"
expectedMarketValues := map[string]thirdparty.TokenMarketValues{
"BTC": { "BTC": {
MKTCAP: 1000000000, MKTCAP: 1000000000,
HIGHDAY: 1.23456, HIGHDAY: 1.23456,
@ -167,19 +170,109 @@ func TestFetchTokenMarketValues(t *testing.T) {
CHANGE24HOUR: 0.8, CHANGE24HOUR: 0.8,
}, },
} }
updatedTokenMarketValues := map[string]thirdparty.TokenMarketValues{
"BTC": {
MKTCAP: 1000000000,
HIGHDAY: 2.23456,
LOWDAY: 1.00000,
CHANGEPCTHOUR: 0.1,
CHANGEPCTDAY: 0.2,
CHANGEPCT24HOUR: 0.3,
CHANGE24HOUR: 0.4,
},
"ETH": {
MKTCAP: 2000000000,
HIGHDAY: 5.56789,
LOWDAY: 4.00000,
CHANGEPCTHOUR: 0.5,
CHANGEPCTDAY: 0.6,
CHANGEPCT24HOUR: 0.7,
CHANGE24HOUR: 0.8,
},
}
requestCurrency := "EUR"
requestSymbols := []string{"BTC", "ETH"}
testCases := []struct {
description string
requestMaxCachedAgeSeconds int64
// Can't use fake provider, because the key {receiver, method} will be different, no match cachedTokenMarketValues map[string]thirdparty.TokenMarketValues
fetchTokenMarketValues map[string]thirdparty.TokenMarketValues
fetchErr error
wantFetchSymbols []string
wantValues map[string]thirdparty.TokenMarketValues
wantErr error
}{
{
description: "fetch errors are propagated",
requestMaxCachedAgeSeconds: 0,
cachedTokenMarketValues: nil,
fetchTokenMarketValues: nil,
fetchErr: errors.New("explosion"),
wantFetchSymbols: requestSymbols,
wantValues: nil,
wantErr: errors.New("explosion"),
},
{
description: "token values fetched if not cached",
requestMaxCachedAgeSeconds: 10,
cachedTokenMarketValues: nil,
fetchTokenMarketValues: initialTokenMarketValues,
fetchErr: nil,
wantFetchSymbols: requestSymbols,
wantValues: initialTokenMarketValues,
wantErr: nil,
},
{
description: "token values returned from cache if fresh",
requestMaxCachedAgeSeconds: 10,
cachedTokenMarketValues: initialTokenMarketValues,
fetchTokenMarketValues: nil,
fetchErr: nil,
wantFetchSymbols: requestSymbols,
wantValues: initialTokenMarketValues,
wantErr: nil,
},
{
description: "token values fetched if fetch forced",
requestMaxCachedAgeSeconds: MaxAgeInSecondsForFresh, // N.B. Force a fetch
cachedTokenMarketValues: initialTokenMarketValues,
fetchTokenMarketValues: updatedTokenMarketValues,
fetchErr: nil,
wantFetchSymbols: requestSymbols,
wantValues: updatedTokenMarketValues,
wantErr: nil,
// TODO: Implement more test cases
// Test Case: There's cache, but we want fresh data, but fetch fails, we should fallback to cache
},
}
for _, tc := range testCases {
provider := mock_thirdparty.NewMockMarketDataProvider(ctrl) provider := mock_thirdparty.NewMockMarketDataProvider(ctrl)
provider.EXPECT().ID().Return("MockPriceProvider").AnyTimes() provider.EXPECT().ID().Return("MockMarketProvider").AnyTimes()
provider.EXPECT().FetchTokenMarketValues(symbols, currency).Return(expectedMarketValues, nil) manager := setupMarketManager(t, []thirdparty.MarketDataProvider{provider})
manager := setupTestPrice(t, []thirdparty.MarketDataProvider{provider}) t.Run(tc.description, func(t *testing.T) {
marketValues, err := manager.FetchTokenMarketValues(symbols, currency) if tc.cachedTokenMarketValues != nil {
require.NoError(t, err) setMarketCacheForTesting(t, manager, requestCurrency, tc.cachedTokenMarketValues)
require.Equal(t, expectedMarketValues, marketValues) }
// Test error if tc.fetchTokenMarketValues != nil || tc.fetchErr != nil {
provider.EXPECT().FetchTokenMarketValues(symbols, currency).Return(nil, errors.New("error")) provider.EXPECT().FetchTokenMarketValues(tc.wantFetchSymbols, requestCurrency).Return(tc.fetchTokenMarketValues, tc.fetchErr)
marketValues, err = manager.FetchTokenMarketValues(symbols, currency) }
require.Error(t, err)
require.Nil(t, marketValues) gotValues, gotErr := manager.GetOrFetchTokenMarketValues(requestSymbols, requestCurrency, tc.requestMaxCachedAgeSeconds)
if tc.wantErr != nil {
require.ErrorContains(t, gotErr, tc.wantErr.Error())
} else {
require.NoError(t, gotErr)
}
require.Equal(t, tc.wantValues, gotValues)
})
}
} }

View File

@ -479,15 +479,15 @@ func (r *Reader) GetWalletToken(ctx context.Context, clients map[uint64]chain.Cl
var ( var (
group = async.NewAtomicGroup(ctx) group = async.NewAtomicGroup(ctx)
prices = map[string]map[string]float64{} prices = map[string]map[string]market.DataPoint{}
tokenDetails = map[string]thirdparty.TokenDetails{} tokenDetails = map[string]thirdparty.TokenDetails{}
tokenMarketValues = map[string]thirdparty.TokenMarketValues{} tokenMarketValues = map[string]thirdparty.TokenMarketValues{}
) )
group.Add(func(parent context.Context) error { group.Add(func(parent context.Context) error {
prices, err = r.marketManager.FetchPrices(tokenSymbols, currencies) prices, err = r.marketManager.GetOrFetchPrices(tokenSymbols, currencies, market.MaxAgeInSecondsForBalances)
if err != nil { if err != nil {
log.Info("marketManager.FetchPrices err", err) log.Info("marketManager.GetOrFetchPrices err", err)
} }
return nil return nil
}) })
@ -501,9 +501,9 @@ func (r *Reader) GetWalletToken(ctx context.Context, clients map[uint64]chain.Cl
}) })
group.Add(func(parent context.Context) error { group.Add(func(parent context.Context) error {
tokenMarketValues, err = r.marketManager.FetchTokenMarketValues(tokenSymbols, currency) tokenMarketValues, err = r.marketManager.GetOrFetchTokenMarketValues(tokenSymbols, currency, market.MaxAgeInSecondsForBalances)
if err != nil { if err != nil {
log.Info("marketManager.FetchTokenMarketValues err", err) log.Info("marketManager.GetOrFetchTokenMarketValues err", err)
} }
return nil return nil
}) })
@ -533,7 +533,7 @@ func (r *Reader) GetWalletToken(ctx context.Context, clients map[uint64]chain.Cl
ChangePctDay: tokenMarketValues[tok.Symbol].CHANGEPCTDAY, ChangePctDay: tokenMarketValues[tok.Symbol].CHANGEPCTDAY,
ChangePct24hour: tokenMarketValues[tok.Symbol].CHANGEPCT24HOUR, ChangePct24hour: tokenMarketValues[tok.Symbol].CHANGEPCT24HOUR,
Change24hour: tokenMarketValues[tok.Symbol].CHANGE24HOUR, Change24hour: tokenMarketValues[tok.Symbol].CHANGE24HOUR,
Price: prices[tok.Symbol][currency], Price: prices[tok.Symbol][currency].Price,
HasError: !r.marketManager.IsConnected, HasError: !r.marketManager.IsConnected,
} }
} }

View File

@ -49,13 +49,14 @@ func (s SendType) FetchPrices(marketManager *market.Manager, tokenIDs []string)
symbols = []string{"ETH"} symbols = []string{"ETH"}
} }
pricesMap, err := marketManager.FetchPrices(symbols, []string{"USD"}) pricesMap, err := marketManager.GetOrFetchPrices(symbols, []string{"USD"}, market.MaxAgeInSecondsForFresh)
if err != nil { if err != nil {
return nil, err return nil, err
} }
prices := make(map[string]float64, 0) prices := make(map[string]float64, 0)
for symbol, pricePerCurrency := range pricesMap { for symbol, pricePerCurrency := range pricesMap {
prices[symbol] = pricePerCurrency["USD"] prices[symbol] = pricePerCurrency["USD"].Price
} }
if s.IsCollectiblesTransfer() { if s.IsCollectiblesTransfer() {
for _, tokenID := range tokenIDs { for _, tokenID := range tokenIDs {