diff --git a/services/wallet/market/market.go b/services/wallet/market/market.go index db40775e2..bfea99d5a 100644 --- a/services/wallet/market/market.go +++ b/services/wallet/market/market.go @@ -18,17 +18,30 @@ const ( EventMarketStatusChanged walletevent.EventType = "wallet-market-status-changed" ) +const ( + MaxAgeInSecondsForFresh int64 = -1 + MaxAgeInSecondsForBalances int64 = 60 +) + type DataPoint struct { Price float64 UpdatedAt int64 } +type MarketValuesSnapshot struct { + MarketValues thirdparty.TokenMarketValues + UpdatedAt int64 +} + type DataPerTokenAndCurrency = map[string]map[string]DataPoint +type MarketValuesPerCurrencyAndToken = map[string]map[string]MarketValuesSnapshot +type TokenMarketCache MarketValuesPerCurrencyAndToken +type TokenPriceCache DataPerTokenAndCurrency type Manager struct { feed *event.Feed - priceCache DataPerTokenAndCurrency - priceCacheLock sync.RWMutex + priceCache MarketCache[TokenPriceCache] + marketCache MarketCache[TokenMarketCache] IsConnected bool LastCheckedAt int64 IsConnectedLock sync.RWMutex @@ -46,7 +59,8 @@ func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Ma return &Manager{ feed: feed, - priceCache: make(DataPerTokenAndCurrency), + priceCache: *NewCache(make(TokenPriceCache)), + marketCache: *NewCache(make(TokenMarketCache)), IsConnected: true, LastCheckedAt: time.Now().Unix(), circuitbreaker: cb, @@ -136,6 +150,81 @@ func (pm *Manager) FetchTokenMarketValues(symbols []string, currency string) (ma return marketValues, nil } +func (pm *Manager) updateMarketCache(currency string, marketValues map[string]thirdparty.TokenMarketValues) { + Write(&pm.marketCache, func(tokenMarketCache TokenMarketCache) TokenMarketCache { + for token, tokenMarketValues := range marketValues { + if _, present := tokenMarketCache[currency]; !present { + tokenMarketCache[currency] = make(map[string]MarketValuesSnapshot) + } + + tokenMarketCache[currency][token] = MarketValuesSnapshot{ + UpdatedAt: time.Now().Unix(), + MarketValues: tokenMarketValues, + } + } + + return tokenMarketCache + }) +} + +func (pm *Manager) GetOrFetchTokenMarketValues(symbols []string, currency string, maxAgeInSeconds int64) (map[string]thirdparty.TokenMarketValues, error) { + // docs: Determine which token market data to fetch based on what's inside the cache and the last time the cache was updated + symbolsToFetch := Read(&pm.marketCache, func(marketCache TokenMarketCache) []string { + tokenMarketValuesCache, ok := marketCache[currency] + if !ok { + return symbols + } + + now := time.Now().Unix() + symbolsToFetchMap := make(map[string]bool) + symbolsToFetch := make([]string, 0, len(symbols)) + + for _, symbol := range symbols { + marketValueSnapshot, found := tokenMarketValuesCache[symbol] + if !found { + if !symbolsToFetchMap[symbol] { + symbolsToFetchMap[symbol] = true + symbolsToFetch = append(symbolsToFetch, symbol) + } + continue + } + if now-marketValueSnapshot.UpdatedAt > maxAgeInSeconds { + if !symbolsToFetchMap[symbol] { + symbolsToFetchMap[symbol] = true + symbolsToFetch = append(symbolsToFetch, symbol) + } + continue + } + } + + return symbolsToFetch + }) + + // docs: Fetch and cache the token market data for missing or stale token market data + if len(symbolsToFetch) > 0 { + marketValues, err := pm.FetchTokenMarketValues(symbolsToFetch, currency) + if err != nil { + return nil, err + } + pm.updateMarketCache(currency, marketValues) + } + + // docs: Extract token market data from populated cache + tokenMarketValues := Read(&pm.marketCache, func(tokenMarketCache TokenMarketCache) map[string]thirdparty.TokenMarketValues { + tokenMarketValuesPerSymbol := make(map[string]thirdparty.TokenMarketValues) + if cachedTokenMarketValues, ok := tokenMarketCache[currency]; ok { + for _, symbol := range symbols { + if marketValuesSnapshot, found := cachedTokenMarketValues[symbol]; found { + tokenMarketValuesPerSymbol[symbol] = marketValuesSnapshot.MarketValues + } + } + } + return tokenMarketValuesPerSymbol + }) + + return tokenMarketValues, nil +} + func (pm *Manager) FetchTokenDetails(symbols []string) (map[string]thirdparty.TokenDetails, error) { result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) { return provider.FetchTokenDetails(symbols) @@ -179,69 +268,67 @@ func (pm *Manager) FetchPrices(symbols []string, currencies []string) (map[strin } func (pm *Manager) getCachedPricesFor(symbols []string, currencies []string) DataPerTokenAndCurrency { - prices := make(DataPerTokenAndCurrency) - - for _, symbol := range symbols { - prices[symbol] = make(map[string]DataPoint) - for _, currency := range currencies { - prices[symbol][currency] = pm.priceCache[symbol][currency] + return Read(&pm.priceCache, func(tokenPriceCache TokenPriceCache) DataPerTokenAndCurrency { + prices := make(DataPerTokenAndCurrency) + for _, symbol := range symbols { + prices[symbol] = make(map[string]DataPoint) + for _, currency := range currencies { + prices[symbol][currency] = tokenPriceCache[symbol][currency] + } } - } - - return prices + return prices + }) } func (pm *Manager) updatePriceCache(prices map[string]map[string]float64) { - pm.priceCacheLock.Lock() - defer pm.priceCacheLock.Unlock() - - for token, pricesPerCurrency := range prices { - _, present := pm.priceCache[token] - if !present { - pm.priceCache[token] = make(map[string]DataPoint) - } - for currency, price := range pricesPerCurrency { - pm.priceCache[token][currency] = DataPoint{ - Price: price, - UpdatedAt: time.Now().Unix(), + Write(&pm.priceCache, func(tokenPriceCache TokenPriceCache) TokenPriceCache { + for token, pricesPerCurrency := range prices { + _, present := tokenPriceCache[token] + if !present { + tokenPriceCache[token] = make(map[string]DataPoint) + } + for currency, price := range pricesPerCurrency { + tokenPriceCache[token][currency] = DataPoint{ + Price: price, + UpdatedAt: time.Now().Unix(), + } } } - } -} -func (pm *Manager) GetCachedPrices() DataPerTokenAndCurrency { - pm.priceCacheLock.RLock() - defer pm.priceCacheLock.RUnlock() - - return pm.priceCache + return tokenPriceCache + }) } // Return cached price if present in cache and age is less than maxAgeInSeconds. Fetch otherwise. func (pm *Manager) GetOrFetchPrices(symbols []string, currencies []string, maxAgeInSeconds int64) (DataPerTokenAndCurrency, error) { - symbolsToFetchMap := make(map[string]bool) - symbolsToFetch := make([]string, 0, len(symbols)) + symbolsToFetch := Read(&pm.priceCache, func(tokenPriceCache TokenPriceCache) []string { + symbolsToFetchMap := make(map[string]bool) + symbolsToFetch := make([]string, 0, len(symbols)) - now := time.Now().Unix() + now := time.Now().Unix() - for _, symbol := range symbols { - tokenPriceCache, ok := pm.GetCachedPrices()[symbol] - if !ok { - if !symbolsToFetchMap[symbol] { - symbolsToFetchMap[symbol] = true - symbolsToFetch = append(symbolsToFetch, symbol) - } - continue - } - for _, currency := range currencies { - if now-tokenPriceCache[currency].UpdatedAt > maxAgeInSeconds { + for _, symbol := range symbols { + tokenPriceCache, ok := tokenPriceCache[symbol] + if !ok { if !symbolsToFetchMap[symbol] { symbolsToFetchMap[symbol] = true symbolsToFetch = append(symbolsToFetch, symbol) } - break + continue + } + for _, currency := range currencies { + if now-tokenPriceCache[currency].UpdatedAt > maxAgeInSeconds { + if !symbolsToFetchMap[symbol] { + symbolsToFetchMap[symbol] = true + symbolsToFetch = append(symbolsToFetch, symbol) + } + break + } } } - } + + return symbolsToFetch + }) if len(symbolsToFetch) > 0 { _, err := pm.FetchPrices(symbolsToFetch, currencies) diff --git a/services/wallet/market/market_cache.go b/services/wallet/market/market_cache.go new file mode 100644 index 000000000..1c8c00efb --- /dev/null +++ b/services/wallet/market/market_cache.go @@ -0,0 +1,35 @@ +package market + +import ( + "sync" +) + +type MarketCache[T any] struct { + store T + lock sync.RWMutex +} + +func NewCache[T any](store T) *MarketCache[T] { + var cache MarketCache[T] + cache.store = store + return &cache +} + +func Read[T any, R any](cache *MarketCache[T], reader func(store T) R) R { + cache.lock.RLock() + defer cache.lock.RUnlock() + return reader(cache.store) +} + +func Write[T any](cache *MarketCache[T], writer func(store T) T) *MarketCache[T] { + cache.lock.Lock() + defer cache.lock.Unlock() + cache.store = writer(cache.store) + return cache +} + +func (cache *MarketCache[T]) Get() T { + cache.lock.RLock() + defer cache.lock.RUnlock() + return cache.store +} diff --git a/services/wallet/market/market_test.go b/services/wallet/market/market_test.go index 33113dfe6..a7d4f0660 100644 --- a/services/wallet/market/market_test.go +++ b/services/wallet/market/market_test.go @@ -52,7 +52,7 @@ func (mpp *MockPriceProviderWithError) FetchPrices(symbols []string, currencies return nil, errors.New("error") } -func setupTestPrice(t *testing.T, providers []thirdparty.MarketDataProvider) *Manager { +func setupMarketManager(t *testing.T, providers []thirdparty.MarketDataProvider) *Manager { return NewManager(providers, &event.Feed{}) } @@ -83,10 +83,10 @@ func TestPrice(t *testing.T) { priceProvider := NewMockPriceProvider(ctrl) priceProvider.setMockPrices(mockPrices) - manager := setupTestPrice(t, []thirdparty.MarketDataProvider{priceProvider, priceProvider}) + manager := setupMarketManager(t, []thirdparty.MarketDataProvider{priceProvider, priceProvider}) { - rst := manager.GetCachedPrices() + rst := manager.priceCache.Get() require.Empty(t, rst) } @@ -114,7 +114,7 @@ func TestPrice(t *testing.T) { } } - cache := manager.GetCachedPrices() + cache := manager.priceCache.Get() for symbol, pricePerCurrency := range mockPrices { for currency, price := range pricePerCurrency { require.Equal(t, price, cache[symbol][currency].Price) @@ -131,7 +131,7 @@ func TestFetchPriceErrorFirstProvider(t *testing.T) { symbols := []string{"BTC", "ETH"} currencies := []string{"USD", "EUR"} - manager := setupTestPrice(t, []thirdparty.MarketDataProvider{priceProviderWithError, priceProvider}) + manager := setupMarketManager(t, []thirdparty.MarketDataProvider{priceProviderWithError, priceProvider}) rst, err := manager.FetchPrices(symbols, currencies) require.NoError(t, err) for _, symbol := range symbols { @@ -141,13 +141,16 @@ func TestFetchPriceErrorFirstProvider(t *testing.T) { } } -func TestFetchTokenMarketValues(t *testing.T) { +func setMarketCacheForTesting(t *testing.T, manager *Manager, currency string, marketValues map[string]thirdparty.TokenMarketValues) { + t.Helper() + manager.updateMarketCache(currency, marketValues) +} + +func TestGetOrFetchTokenMarketValues(t *testing.T) { ctrl := gomock.NewController(t) defer ctrl.Finish() - symbols := []string{"BTC", "ETH"} - currency := "EUR" - expectedMarketValues := map[string]thirdparty.TokenMarketValues{ + initialTokenMarketValues := map[string]thirdparty.TokenMarketValues{ "BTC": { MKTCAP: 1000000000, HIGHDAY: 1.23456, @@ -167,19 +170,109 @@ func TestFetchTokenMarketValues(t *testing.T) { CHANGE24HOUR: 0.8, }, } + updatedTokenMarketValues := map[string]thirdparty.TokenMarketValues{ + "BTC": { + MKTCAP: 1000000000, + HIGHDAY: 2.23456, + LOWDAY: 1.00000, + CHANGEPCTHOUR: 0.1, + CHANGEPCTDAY: 0.2, + CHANGEPCT24HOUR: 0.3, + CHANGE24HOUR: 0.4, + }, + "ETH": { + MKTCAP: 2000000000, + HIGHDAY: 5.56789, + LOWDAY: 4.00000, + CHANGEPCTHOUR: 0.5, + CHANGEPCTDAY: 0.6, + CHANGEPCT24HOUR: 0.7, + CHANGE24HOUR: 0.8, + }, + } + requestCurrency := "EUR" + requestSymbols := []string{"BTC", "ETH"} + testCases := []struct { + description string + requestMaxCachedAgeSeconds int64 - // Can't use fake provider, because the key {receiver, method} will be different, no match - provider := mock_thirdparty.NewMockMarketDataProvider(ctrl) - provider.EXPECT().ID().Return("MockPriceProvider").AnyTimes() - provider.EXPECT().FetchTokenMarketValues(symbols, currency).Return(expectedMarketValues, nil) - manager := setupTestPrice(t, []thirdparty.MarketDataProvider{provider}) - marketValues, err := manager.FetchTokenMarketValues(symbols, currency) - require.NoError(t, err) - require.Equal(t, expectedMarketValues, marketValues) + cachedTokenMarketValues map[string]thirdparty.TokenMarketValues + fetchTokenMarketValues map[string]thirdparty.TokenMarketValues + fetchErr error - // Test error - provider.EXPECT().FetchTokenMarketValues(symbols, currency).Return(nil, errors.New("error")) - marketValues, err = manager.FetchTokenMarketValues(symbols, currency) - require.Error(t, err) - require.Nil(t, marketValues) + wantFetchSymbols []string + wantValues map[string]thirdparty.TokenMarketValues + wantErr error + }{ + { + description: "fetch errors are propagated", + requestMaxCachedAgeSeconds: 0, + cachedTokenMarketValues: nil, + fetchTokenMarketValues: nil, + fetchErr: errors.New("explosion"), + + wantFetchSymbols: requestSymbols, + wantValues: nil, + wantErr: errors.New("explosion"), + }, + { + description: "token values fetched if not cached", + requestMaxCachedAgeSeconds: 10, + cachedTokenMarketValues: nil, + fetchTokenMarketValues: initialTokenMarketValues, + fetchErr: nil, + + wantFetchSymbols: requestSymbols, + wantValues: initialTokenMarketValues, + wantErr: nil, + }, + { + description: "token values returned from cache if fresh", + requestMaxCachedAgeSeconds: 10, + cachedTokenMarketValues: initialTokenMarketValues, + fetchTokenMarketValues: nil, + fetchErr: nil, + + wantFetchSymbols: requestSymbols, + wantValues: initialTokenMarketValues, + wantErr: nil, + }, + { + description: "token values fetched if fetch forced", + requestMaxCachedAgeSeconds: MaxAgeInSecondsForFresh, // N.B. Force a fetch + cachedTokenMarketValues: initialTokenMarketValues, + fetchTokenMarketValues: updatedTokenMarketValues, + fetchErr: nil, + + wantFetchSymbols: requestSymbols, + wantValues: updatedTokenMarketValues, + wantErr: nil, + + // TODO: Implement more test cases + // Test Case: There's cache, but we want fresh data, but fetch fails, we should fallback to cache + }, + } + + for _, tc := range testCases { + provider := mock_thirdparty.NewMockMarketDataProvider(ctrl) + provider.EXPECT().ID().Return("MockMarketProvider").AnyTimes() + manager := setupMarketManager(t, []thirdparty.MarketDataProvider{provider}) + t.Run(tc.description, func(t *testing.T) { + if tc.cachedTokenMarketValues != nil { + setMarketCacheForTesting(t, manager, requestCurrency, tc.cachedTokenMarketValues) + } + + if tc.fetchTokenMarketValues != nil || tc.fetchErr != nil { + provider.EXPECT().FetchTokenMarketValues(tc.wantFetchSymbols, requestCurrency).Return(tc.fetchTokenMarketValues, tc.fetchErr) + } + + gotValues, gotErr := manager.GetOrFetchTokenMarketValues(requestSymbols, requestCurrency, tc.requestMaxCachedAgeSeconds) + if tc.wantErr != nil { + require.ErrorContains(t, gotErr, tc.wantErr.Error()) + } else { + require.NoError(t, gotErr) + } + require.Equal(t, tc.wantValues, gotValues) + }) + } } diff --git a/services/wallet/reader.go b/services/wallet/reader.go index 06a2719b3..08f2af241 100644 --- a/services/wallet/reader.go +++ b/services/wallet/reader.go @@ -479,15 +479,15 @@ func (r *Reader) GetWalletToken(ctx context.Context, clients map[uint64]chain.Cl var ( group = async.NewAtomicGroup(ctx) - prices = map[string]map[string]float64{} + prices = map[string]map[string]market.DataPoint{} tokenDetails = map[string]thirdparty.TokenDetails{} tokenMarketValues = map[string]thirdparty.TokenMarketValues{} ) group.Add(func(parent context.Context) error { - prices, err = r.marketManager.FetchPrices(tokenSymbols, currencies) + prices, err = r.marketManager.GetOrFetchPrices(tokenSymbols, currencies, market.MaxAgeInSecondsForBalances) if err != nil { - log.Info("marketManager.FetchPrices err", err) + log.Info("marketManager.GetOrFetchPrices err", err) } return nil }) @@ -501,9 +501,9 @@ func (r *Reader) GetWalletToken(ctx context.Context, clients map[uint64]chain.Cl }) group.Add(func(parent context.Context) error { - tokenMarketValues, err = r.marketManager.FetchTokenMarketValues(tokenSymbols, currency) + tokenMarketValues, err = r.marketManager.GetOrFetchTokenMarketValues(tokenSymbols, currency, market.MaxAgeInSecondsForBalances) if err != nil { - log.Info("marketManager.FetchTokenMarketValues err", err) + log.Info("marketManager.GetOrFetchTokenMarketValues err", err) } return nil }) @@ -533,7 +533,7 @@ func (r *Reader) GetWalletToken(ctx context.Context, clients map[uint64]chain.Cl ChangePctDay: tokenMarketValues[tok.Symbol].CHANGEPCTDAY, ChangePct24hour: tokenMarketValues[tok.Symbol].CHANGEPCT24HOUR, Change24hour: tokenMarketValues[tok.Symbol].CHANGE24HOUR, - Price: prices[tok.Symbol][currency], + Price: prices[tok.Symbol][currency].Price, HasError: !r.marketManager.IsConnected, } } diff --git a/services/wallet/router/sendtype/send_type.go b/services/wallet/router/sendtype/send_type.go index 4cdc0446c..6018db658 100644 --- a/services/wallet/router/sendtype/send_type.go +++ b/services/wallet/router/sendtype/send_type.go @@ -49,13 +49,14 @@ func (s SendType) FetchPrices(marketManager *market.Manager, tokenIDs []string) symbols = []string{"ETH"} } - pricesMap, err := marketManager.FetchPrices(symbols, []string{"USD"}) + pricesMap, err := marketManager.GetOrFetchPrices(symbols, []string{"USD"}, market.MaxAgeInSecondsForFresh) + if err != nil { return nil, err } prices := make(map[string]float64, 0) for symbol, pricePerCurrency := range pricesMap { - prices[symbol] = pricePerCurrency["USD"] + prices[symbol] = pricePerCurrency["USD"].Price } if s.IsCollectiblesTransfer() { for _, tokenID := range tokenIDs {