feat(wallet)_: add CryptoCompare proxy to market manager as a last

resort
This commit is contained in:
Ivan Belyakov 2024-06-21 11:53:31 +02:00 committed by IvanBelyakoff
parent 13ade7ccff
commit ee2330fe5d
4 changed files with 130 additions and 116 deletions

View File

@ -4,11 +4,11 @@ import (
"sync" "sync"
"time" "time"
"github.com/afex/hystrix-go/hystrix"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/circuitbreaker"
"github.com/status-im/status-go/services/wallet/thirdparty" "github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/services/wallet/walletevent"
) )
@ -25,18 +25,19 @@ type DataPoint struct {
type DataPerTokenAndCurrency = map[string]map[string]DataPoint type DataPerTokenAndCurrency = map[string]map[string]DataPoint
type Manager struct { type Manager struct {
main thirdparty.MarketDataProvider
fallback thirdparty.MarketDataProvider
feed *event.Feed feed *event.Feed
priceCache DataPerTokenAndCurrency priceCache DataPerTokenAndCurrency
priceCacheLock sync.RWMutex priceCacheLock sync.RWMutex
IsConnected bool IsConnected bool
LastCheckedAt int64 LastCheckedAt int64
IsConnectedLock sync.RWMutex IsConnectedLock sync.RWMutex
circuitbreaker *circuitbreaker.CircuitBreaker
providers []thirdparty.MarketDataProvider
} }
func NewManager(main thirdparty.MarketDataProvider, fallback thirdparty.MarketDataProvider, feed *event.Feed) *Manager { func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Manager {
hystrix.ConfigureCommand("marketClient", hystrix.CommandConfig{ cb := circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{
CommandName: "marketClient",
Timeout: 10000, Timeout: 10000,
MaxConcurrentRequests: 100, MaxConcurrentRequests: 100,
SleepWindow: 300000, SleepWindow: 300000,
@ -44,12 +45,12 @@ func NewManager(main thirdparty.MarketDataProvider, fallback thirdparty.MarketDa
}) })
return &Manager{ return &Manager{
main: main,
fallback: fallback,
feed: feed, feed: feed,
priceCache: make(DataPerTokenAndCurrency), priceCache: make(DataPerTokenAndCurrency),
IsConnected: true, IsConnected: true,
LastCheckedAt: time.Now().Unix(), LastCheckedAt: time.Now().Unix(),
circuitbreaker: cb,
providers: providers,
} }
} }
@ -72,101 +73,81 @@ func (pm *Manager) setIsConnected(value bool) {
pm.IsConnected = value pm.IsConnected = value
} }
func (pm *Manager) makeCall(main func() (any, error), fallback func() (any, error)) (any, error) { func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(provider thirdparty.MarketDataProvider) (interface{}, error)) (interface{}, error) {
resultChan := make(chan any, 1) cmd := circuitbreaker.Command{}
errChan := hystrix.Go("marketClient", func() error { for _, provider := range providers {
res, err := main() provider := provider
if err != nil { cmd.Add(circuitbreaker.NewFunctor(func() ([]interface{}, error) {
return err result, err := f(provider)
} return []interface{}{result}, err
pm.setIsConnected(true) }))
resultChan <- res
return nil
}, func(err error) error {
if pm.fallback == nil {
return err
} }
res, err := fallback() result := pm.circuitbreaker.Execute(cmd)
if err != nil { pm.setIsConnected(result.Error() == nil)
pm.setIsConnected(false)
return err
}
pm.setIsConnected(true)
resultChan <- res
return nil
})
select {
case result := <-resultChan:
return result, nil
case err := <-errChan:
return nil, err if result.Error() != nil {
log.Error("Error fetching prices", "error", result.Error())
return nil, result.Error()
} }
return result.Result()[0], nil
} }
func (pm *Manager) FetchHistoricalDailyPrices(symbol string, currency string, limit int, allData bool, aggregate int) ([]thirdparty.HistoricalPrice, error) { func (pm *Manager) FetchHistoricalDailyPrices(symbol string, currency string, limit int, allData bool, aggregate int) ([]thirdparty.HistoricalPrice, error) {
prices, err := pm.makeCall( result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) {
func() (any, error) { return provider.FetchHistoricalDailyPrices(symbol, currency, limit, allData, aggregate)
return pm.main.FetchHistoricalDailyPrices(symbol, currency, limit, allData, aggregate) })
},
func() (any, error) {
return pm.fallback.FetchHistoricalDailyPrices(symbol, currency, limit, allData, aggregate)
},
)
if err != nil { if err != nil {
log.Error("Error fetching prices", "error", err)
return nil, err return nil, err
} }
return prices.([]thirdparty.HistoricalPrice), nil prices := result.([]thirdparty.HistoricalPrice)
return prices, nil
} }
func (pm *Manager) FetchHistoricalHourlyPrices(symbol string, currency string, limit int, aggregate int) ([]thirdparty.HistoricalPrice, error) { func (pm *Manager) FetchHistoricalHourlyPrices(symbol string, currency string, limit int, aggregate int) ([]thirdparty.HistoricalPrice, error) {
prices, err := pm.makeCall( result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) {
func() (any, error) { return provider.FetchHistoricalHourlyPrices(symbol, currency, limit, aggregate)
return pm.main.FetchHistoricalHourlyPrices(symbol, currency, limit, aggregate) })
},
func() (any, error) {
return pm.fallback.FetchHistoricalHourlyPrices(symbol, currency, limit, aggregate)
},
)
if err != nil { if err != nil {
log.Error("Error fetching prices", "error", err)
return nil, err return nil, err
} }
return prices.([]thirdparty.HistoricalPrice), nil prices := result.([]thirdparty.HistoricalPrice)
return prices, nil
} }
func (pm *Manager) FetchTokenMarketValues(symbols []string, currency string) (map[string]thirdparty.TokenMarketValues, error) { func (pm *Manager) FetchTokenMarketValues(symbols []string, currency string) (map[string]thirdparty.TokenMarketValues, error) {
marketValues, err := pm.makeCall( result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) {
func() (any, error) { return provider.FetchTokenMarketValues(symbols, currency)
return pm.main.FetchTokenMarketValues(symbols, currency) })
},
func() (any, error) {
return pm.fallback.FetchTokenMarketValues(symbols, currency)
},
)
if err != nil { if err != nil {
log.Error("Error fetching prices", "error", err)
return nil, err return nil, err
} }
return marketValues.(map[string]thirdparty.TokenMarketValues), nil marketValues := result.(map[string]thirdparty.TokenMarketValues)
return marketValues, nil
} }
func (pm *Manager) FetchTokenDetails(symbols []string) (map[string]thirdparty.TokenDetails, error) { func (pm *Manager) FetchTokenDetails(symbols []string) (map[string]thirdparty.TokenDetails, error) {
tokenDetails, err := pm.makeCall( result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) {
func() (any, error) { return provider.FetchTokenDetails(symbols)
return pm.main.FetchTokenDetails(symbols) })
},
func() (any, error) {
return pm.fallback.FetchTokenDetails(symbols)
},
)
if err != nil { if err != nil {
log.Error("Error fetching prices", "error", err)
return nil, err return nil, err
} }
return tokenDetails.(map[string]thirdparty.TokenDetails), nil tokenDetails := result.(map[string]thirdparty.TokenDetails)
return tokenDetails, nil
} }
func (pm *Manager) FetchPrice(symbol string, currency string) (float64, error) { func (pm *Manager) FetchPrice(symbol string, currency string) (float64, error) {
@ -183,20 +164,16 @@ func (pm *Manager) FetchPrice(symbol string, currency string) (float64, error) {
} }
func (pm *Manager) FetchPrices(symbols []string, currencies []string) (map[string]map[string]float64, error) { func (pm *Manager) FetchPrices(symbols []string, currencies []string) (map[string]map[string]float64, error) {
result, err := pm.makeCall( response, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) {
func() (any, error) { return provider.FetchPrices(symbols, currencies)
return pm.main.FetchPrices(symbols, currencies) })
},
func() (any, error) {
return pm.fallback.FetchPrices(symbols, currencies)
},
)
if err != nil { if err != nil {
log.Error("Error fetching prices", "error", err)
return nil, err return nil, err
} }
prices := result.(map[string]map[string]float64) prices := response.(map[string]map[string]float64)
pm.updatePriceCache(prices) pm.updatePriceCache(prices)
return prices, nil return prices, nil
} }

View File

@ -47,13 +47,19 @@ func (mpp *MockPriceProvider) FetchPrices(symbols []string, currencies []string)
return res, nil return res, nil
} }
func setupTestPrice(t *testing.T, provider thirdparty.MarketDataProvider) *Manager { type MockPriceProviderWithError struct {
return NewManager(provider, provider, &event.Feed{}) MockPriceProvider
} }
func TestPrice(t *testing.T) { func (mpp *MockPriceProviderWithError) FetchPrices(symbols []string, currencies []string) (map[string]map[string]float64, error) {
priceProvider := NewMockPriceProvider() return nil, errors.New("error")
mockPrices := map[string]map[string]float64{ }
func setupTestPrice(t *testing.T, providers []thirdparty.MarketDataProvider) *Manager {
return NewManager(providers, &event.Feed{})
}
var mockPrices = map[string]map[string]float64{
"BTC": { "BTC": {
"USD": 1.23456, "USD": 1.23456,
"EUR": 2.34567, "EUR": 2.34567,
@ -73,9 +79,12 @@ func TestPrice(t *testing.T) {
"ARS": 0.0, "ARS": 0.0,
}, },
} }
func TestPrice(t *testing.T) {
priceProvider := NewMockPriceProvider()
priceProvider.setMockPrices(mockPrices) priceProvider.setMockPrices(mockPrices)
manager := setupTestPrice(t, priceProvider) manager := setupTestPrice(t, []thirdparty.MarketDataProvider{priceProvider, priceProvider})
{ {
rst := manager.GetCachedPrices() rst := manager.GetCachedPrices()
@ -113,3 +122,20 @@ func TestPrice(t *testing.T) {
} }
} }
} }
func TestFetchPriceErrorFirstProvider(t *testing.T) {
priceProvider := NewMockPriceProvider()
priceProvider.setMockPrices(mockPrices)
priceProviderWithError := &MockPriceProviderWithError{}
symbols := []string{"BTC", "ETH"}
currencies := []string{"USD", "EUR"}
manager := setupTestPrice(t, []thirdparty.MarketDataProvider{priceProviderWithError, priceProvider})
rst, err := manager.FetchPrices(symbols, currencies)
require.NoError(t, err)
for _, symbol := range symbols {
for _, currency := range currencies {
require.Equal(t, rst[symbol][currency], mockPrices[symbol][currency])
}
}
}

View File

@ -113,7 +113,8 @@ func NewService(
transferController.Start() transferController.Start()
cryptoCompare := cryptocompare.NewClient() cryptoCompare := cryptocompare.NewClient()
coingecko := coingecko.NewClient() coingecko := coingecko.NewClient()
marketManager := market.NewManager(cryptoCompare, coingecko, feed) cryptoCompareProxy := cryptocompare.NewClientWithURL(cryptocompare.CryptoCompareStatusProxyURL)
marketManager := market.NewManager([]thirdparty.MarketDataProvider{cryptoCompare, coingecko, cryptoCompareProxy}, feed)
reader := NewReader(tokenManager, marketManager, token.NewPersistence(db), feed) reader := NewReader(tokenManager, marketManager, token.NewPersistence(db), feed)
history := history.NewService(db, accountsDB, accountFeed, feed, rpcClient, tokenManager, marketManager, balanceCacher.Cache()) history := history.NewService(db, accountsDB, accountFeed, feed, rpcClient, tokenManager, marketManager, balanceCacher.Cache())
currency := currency.NewService(db, feed, tokenManager, marketManager) currency := currency.NewService(db, feed, tokenManager, marketManager)

View File

@ -12,6 +12,7 @@ import (
) )
const baseURL = "https://min-api.cryptocompare.com" const baseURL = "https://min-api.cryptocompare.com"
const CryptoCompareStatusProxyURL = "https://cryptocompare.test.api.status.im"
const extraParamStatus = "Status.im" const extraParamStatus = "Status.im"
type HistoricalPricesContainer struct { type HistoricalPricesContainer struct {
@ -35,11 +36,20 @@ type MarketValuesContainer struct {
type Client struct { type Client struct {
httpClient *thirdparty.HTTPClient httpClient *thirdparty.HTTPClient
baseURL string
} }
func NewClient() *Client { func NewClient() *Client {
return &Client{ return &Client{
httpClient: thirdparty.NewHTTPClient(), httpClient: thirdparty.NewHTTPClient(),
baseURL: baseURL,
}
}
func NewClientWithURL(url string) *Client {
return &Client{
httpClient: thirdparty.NewHTTPClient(),
baseURL: url,
} }
} }
@ -55,7 +65,7 @@ func (c *Client) FetchPrices(symbols []string, currencies []string) (map[string]
params.Add("tsyms", strings.Join(realCurrencies, ",")) params.Add("tsyms", strings.Join(realCurrencies, ","))
params.Add("extraParams", extraParamStatus) params.Add("extraParams", extraParamStatus)
url := fmt.Sprintf("%s/data/pricemulti", baseURL) url := fmt.Sprintf("%s/data/pricemulti", c.baseURL)
response, err := c.httpClient.DoGetRequest(context.Background(), url, params) response, err := c.httpClient.DoGetRequest(context.Background(), url, params)
if err != nil { if err != nil {
return nil, err return nil, err
@ -78,7 +88,7 @@ func (c *Client) FetchPrices(symbols []string, currencies []string) (map[string]
} }
func (c *Client) FetchTokenDetails(symbols []string) (map[string]thirdparty.TokenDetails, error) { func (c *Client) FetchTokenDetails(symbols []string) (map[string]thirdparty.TokenDetails, error) {
url := fmt.Sprintf("%s/data/all/coinlist", baseURL) url := fmt.Sprintf("%s/data/all/coinlist", c.baseURL)
response, err := c.httpClient.DoGetRequest(context.Background(), url, nil) response, err := c.httpClient.DoGetRequest(context.Background(), url, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -111,7 +121,7 @@ func (c *Client) FetchTokenMarketValues(symbols []string, currency string) (map[
params.Add("tsyms", realCurrency) params.Add("tsyms", realCurrency)
params.Add("extraParams", extraParamStatus) params.Add("extraParams", extraParamStatus)
url := fmt.Sprintf("%s/data/pricemultifull", baseURL) url := fmt.Sprintf("%s/data/pricemultifull", c.baseURL)
response, err := c.httpClient.DoGetRequest(context.Background(), url, params) response, err := c.httpClient.DoGetRequest(context.Background(), url, params)
if err != nil { if err != nil {
return nil, err return nil, err
@ -144,7 +154,7 @@ func (c *Client) FetchHistoricalHourlyPrices(symbol string, currency string, lim
params.Add("limit", fmt.Sprintf("%d", limit)) params.Add("limit", fmt.Sprintf("%d", limit))
params.Add("extraParams", extraParamStatus) params.Add("extraParams", extraParamStatus)
url := fmt.Sprintf("%s/data/v2/histohour", baseURL) url := fmt.Sprintf("%s/data/v2/histohour", c.baseURL)
response, err := c.httpClient.DoGetRequest(context.Background(), url, params) response, err := c.httpClient.DoGetRequest(context.Background(), url, params)
if err != nil { if err != nil {
return item, err return item, err
@ -172,7 +182,7 @@ func (c *Client) FetchHistoricalDailyPrices(symbol string, currency string, limi
params.Add("allData", fmt.Sprintf("%v", allData)) params.Add("allData", fmt.Sprintf("%v", allData))
params.Add("extraParams", extraParamStatus) params.Add("extraParams", extraParamStatus)
url := fmt.Sprintf("%s/data/v2/histoday", baseURL) url := fmt.Sprintf("%s/data/v2/histoday", c.baseURL)
response, err := c.httpClient.DoGetRequest(context.Background(), url, params) response, err := c.httpClient.DoGetRequest(context.Background(), url, params)
if err != nil { if err != nil {
return item, err return item, err