diff --git a/services/wallet/market/market.go b/services/wallet/market/market.go index 956c4df92..23ce9e1bb 100644 --- a/services/wallet/market/market.go +++ b/services/wallet/market/market.go @@ -4,11 +4,11 @@ import ( "sync" "time" - "github.com/afex/hystrix-go/hystrix" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" + "github.com/status-im/status-go/circuitbreaker" "github.com/status-im/status-go/services/wallet/thirdparty" "github.com/status-im/status-go/services/wallet/walletevent" ) @@ -25,18 +25,19 @@ type DataPoint struct { type DataPerTokenAndCurrency = map[string]map[string]DataPoint type Manager struct { - main thirdparty.MarketDataProvider - fallback thirdparty.MarketDataProvider feed *event.Feed priceCache DataPerTokenAndCurrency priceCacheLock sync.RWMutex IsConnected bool LastCheckedAt int64 IsConnectedLock sync.RWMutex + circuitbreaker *circuitbreaker.CircuitBreaker + providers []thirdparty.MarketDataProvider } -func NewManager(main thirdparty.MarketDataProvider, fallback thirdparty.MarketDataProvider, feed *event.Feed) *Manager { - hystrix.ConfigureCommand("marketClient", hystrix.CommandConfig{ +func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Manager { + cb := circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{ + CommandName: "marketClient", Timeout: 10000, MaxConcurrentRequests: 100, SleepWindow: 300000, @@ -44,12 +45,12 @@ func NewManager(main thirdparty.MarketDataProvider, fallback thirdparty.MarketDa }) return &Manager{ - main: main, - fallback: fallback, - feed: feed, - priceCache: make(DataPerTokenAndCurrency), - IsConnected: true, - LastCheckedAt: time.Now().Unix(), + feed: feed, + priceCache: make(DataPerTokenAndCurrency), + IsConnected: true, + LastCheckedAt: time.Now().Unix(), + circuitbreaker: cb, + providers: providers, } } @@ -72,101 +73,81 @@ func (pm *Manager) setIsConnected(value bool) { pm.IsConnected = value } -func (pm *Manager) makeCall(main func() (any, error), fallback func() (any, error)) (any, error) { - resultChan := make(chan any, 1) - errChan := hystrix.Go("marketClient", func() error { - res, err := main() - if err != nil { - return err - } - pm.setIsConnected(true) - resultChan <- res - return nil - }, func(err error) error { - if pm.fallback == nil { - return err - } - - res, err := fallback() - if err != nil { - pm.setIsConnected(false) - return err - } - pm.setIsConnected(true) - resultChan <- res - return nil - }) - select { - case result := <-resultChan: - return result, nil - case err := <-errChan: - - return nil, err +func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(provider thirdparty.MarketDataProvider) (interface{}, error)) (interface{}, error) { + cmd := circuitbreaker.Command{} + for _, provider := range providers { + provider := provider + cmd.Add(circuitbreaker.NewFunctor(func() ([]interface{}, error) { + result, err := f(provider) + return []interface{}{result}, err + })) } + + result := pm.circuitbreaker.Execute(cmd) + pm.setIsConnected(result.Error() == nil) + + if result.Error() != nil { + log.Error("Error fetching prices", "error", result.Error()) + return nil, result.Error() + } + + return result.Result()[0], nil } func (pm *Manager) FetchHistoricalDailyPrices(symbol string, currency string, limit int, allData bool, aggregate int) ([]thirdparty.HistoricalPrice, error) { - prices, err := pm.makeCall( - func() (any, error) { - return pm.main.FetchHistoricalDailyPrices(symbol, currency, limit, allData, aggregate) - }, - func() (any, error) { - return pm.fallback.FetchHistoricalDailyPrices(symbol, currency, limit, allData, aggregate) - }, - ) + result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) { + return provider.FetchHistoricalDailyPrices(symbol, currency, limit, allData, aggregate) + }) + if err != nil { + log.Error("Error fetching prices", "error", err) return nil, err } - return prices.([]thirdparty.HistoricalPrice), nil + prices := result.([]thirdparty.HistoricalPrice) + return prices, nil } func (pm *Manager) FetchHistoricalHourlyPrices(symbol string, currency string, limit int, aggregate int) ([]thirdparty.HistoricalPrice, error) { - prices, err := pm.makeCall( - func() (any, error) { - return pm.main.FetchHistoricalHourlyPrices(symbol, currency, limit, aggregate) - }, - func() (any, error) { - return pm.fallback.FetchHistoricalHourlyPrices(symbol, currency, limit, aggregate) - }, - ) + result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) { + return provider.FetchHistoricalHourlyPrices(symbol, currency, limit, aggregate) + }) + if err != nil { + log.Error("Error fetching prices", "error", err) return nil, err } - return prices.([]thirdparty.HistoricalPrice), nil + prices := result.([]thirdparty.HistoricalPrice) + return prices, nil } func (pm *Manager) FetchTokenMarketValues(symbols []string, currency string) (map[string]thirdparty.TokenMarketValues, error) { - marketValues, err := pm.makeCall( - func() (any, error) { - return pm.main.FetchTokenMarketValues(symbols, currency) - }, - func() (any, error) { - return pm.fallback.FetchTokenMarketValues(symbols, currency) - }, - ) + result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) { + return provider.FetchTokenMarketValues(symbols, currency) + }) + if err != nil { + log.Error("Error fetching prices", "error", err) return nil, err } - return marketValues.(map[string]thirdparty.TokenMarketValues), nil + marketValues := result.(map[string]thirdparty.TokenMarketValues) + return marketValues, nil } func (pm *Manager) FetchTokenDetails(symbols []string) (map[string]thirdparty.TokenDetails, error) { - tokenDetails, err := pm.makeCall( - func() (any, error) { - return pm.main.FetchTokenDetails(symbols) - }, - func() (any, error) { - return pm.fallback.FetchTokenDetails(symbols) - }, - ) + result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) { + return provider.FetchTokenDetails(symbols) + }) + if err != nil { + log.Error("Error fetching prices", "error", err) return nil, err } - return tokenDetails.(map[string]thirdparty.TokenDetails), nil + tokenDetails := result.(map[string]thirdparty.TokenDetails) + return tokenDetails, nil } func (pm *Manager) FetchPrice(symbol string, currency string) (float64, error) { @@ -183,20 +164,16 @@ func (pm *Manager) FetchPrice(symbol string, currency string) (float64, error) { } func (pm *Manager) FetchPrices(symbols []string, currencies []string) (map[string]map[string]float64, error) { - result, err := pm.makeCall( - func() (any, error) { - return pm.main.FetchPrices(symbols, currencies) - }, - func() (any, error) { - return pm.fallback.FetchPrices(symbols, currencies) - }, - ) + response, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) { + return provider.FetchPrices(symbols, currencies) + }) if err != nil { + log.Error("Error fetching prices", "error", err) return nil, err } - prices := result.(map[string]map[string]float64) + prices := response.(map[string]map[string]float64) pm.updatePriceCache(prices) return prices, nil } diff --git a/services/wallet/market/market_test.go b/services/wallet/market/market_test.go index 2f725f30c..579629fa9 100644 --- a/services/wallet/market/market_test.go +++ b/services/wallet/market/market_test.go @@ -47,35 +47,44 @@ func (mpp *MockPriceProvider) FetchPrices(symbols []string, currencies []string) return res, nil } -func setupTestPrice(t *testing.T, provider thirdparty.MarketDataProvider) *Manager { - return NewManager(provider, provider, &event.Feed{}) +type MockPriceProviderWithError struct { + MockPriceProvider +} + +func (mpp *MockPriceProviderWithError) FetchPrices(symbols []string, currencies []string) (map[string]map[string]float64, error) { + return nil, errors.New("error") +} + +func setupTestPrice(t *testing.T, providers []thirdparty.MarketDataProvider) *Manager { + return NewManager(providers, &event.Feed{}) +} + +var mockPrices = map[string]map[string]float64{ + "BTC": { + "USD": 1.23456, + "EUR": 2.34567, + "DAI": 3.45678, + "ARS": 9.87654, + }, + "ETH": { + "USD": 4.56789, + "EUR": 5.67891, + "DAI": 6.78912, + "ARS": 8.76543, + }, + "SNT": { + "USD": 7.654, + "EUR": 6.0, + "DAI": 1455.12, + "ARS": 0.0, + }, } func TestPrice(t *testing.T) { priceProvider := NewMockPriceProvider() - mockPrices := map[string]map[string]float64{ - "BTC": { - "USD": 1.23456, - "EUR": 2.34567, - "DAI": 3.45678, - "ARS": 9.87654, - }, - "ETH": { - "USD": 4.56789, - "EUR": 5.67891, - "DAI": 6.78912, - "ARS": 8.76543, - }, - "SNT": { - "USD": 7.654, - "EUR": 6.0, - "DAI": 1455.12, - "ARS": 0.0, - }, - } priceProvider.setMockPrices(mockPrices) - manager := setupTestPrice(t, priceProvider) + manager := setupTestPrice(t, []thirdparty.MarketDataProvider{priceProvider, priceProvider}) { rst := manager.GetCachedPrices() @@ -113,3 +122,20 @@ func TestPrice(t *testing.T) { } } } + +func TestFetchPriceErrorFirstProvider(t *testing.T) { + priceProvider := NewMockPriceProvider() + priceProvider.setMockPrices(mockPrices) + priceProviderWithError := &MockPriceProviderWithError{} + symbols := []string{"BTC", "ETH"} + currencies := []string{"USD", "EUR"} + + manager := setupTestPrice(t, []thirdparty.MarketDataProvider{priceProviderWithError, priceProvider}) + rst, err := manager.FetchPrices(symbols, currencies) + require.NoError(t, err) + for _, symbol := range symbols { + for _, currency := range currencies { + require.Equal(t, rst[symbol][currency], mockPrices[symbol][currency]) + } + } +} diff --git a/services/wallet/service.go b/services/wallet/service.go index ce74931f9..19e3a8d8c 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -113,7 +113,8 @@ func NewService( transferController.Start() cryptoCompare := cryptocompare.NewClient() coingecko := coingecko.NewClient() - marketManager := market.NewManager(cryptoCompare, coingecko, feed) + cryptoCompareProxy := cryptocompare.NewClientWithURL(cryptocompare.CryptoCompareStatusProxyURL) + marketManager := market.NewManager([]thirdparty.MarketDataProvider{cryptoCompare, coingecko, cryptoCompareProxy}, feed) reader := NewReader(tokenManager, marketManager, token.NewPersistence(db), feed) history := history.NewService(db, accountsDB, accountFeed, feed, rpcClient, tokenManager, marketManager, balanceCacher.Cache()) currency := currency.NewService(db, feed, tokenManager, marketManager) diff --git a/services/wallet/thirdparty/cryptocompare/client.go b/services/wallet/thirdparty/cryptocompare/client.go index eaa1f4db7..e43ecbc56 100644 --- a/services/wallet/thirdparty/cryptocompare/client.go +++ b/services/wallet/thirdparty/cryptocompare/client.go @@ -12,6 +12,7 @@ import ( ) const baseURL = "https://min-api.cryptocompare.com" +const CryptoCompareStatusProxyURL = "https://cryptocompare.test.api.status.im" const extraParamStatus = "Status.im" type HistoricalPricesContainer struct { @@ -35,11 +36,20 @@ type MarketValuesContainer struct { type Client struct { httpClient *thirdparty.HTTPClient + baseURL string } func NewClient() *Client { return &Client{ httpClient: thirdparty.NewHTTPClient(), + baseURL: baseURL, + } +} + +func NewClientWithURL(url string) *Client { + return &Client{ + httpClient: thirdparty.NewHTTPClient(), + baseURL: url, } } @@ -55,7 +65,7 @@ func (c *Client) FetchPrices(symbols []string, currencies []string) (map[string] params.Add("tsyms", strings.Join(realCurrencies, ",")) params.Add("extraParams", extraParamStatus) - url := fmt.Sprintf("%s/data/pricemulti", baseURL) + url := fmt.Sprintf("%s/data/pricemulti", c.baseURL) response, err := c.httpClient.DoGetRequest(context.Background(), url, params) if err != nil { return nil, err @@ -78,7 +88,7 @@ func (c *Client) FetchPrices(symbols []string, currencies []string) (map[string] } func (c *Client) FetchTokenDetails(symbols []string) (map[string]thirdparty.TokenDetails, error) { - url := fmt.Sprintf("%s/data/all/coinlist", baseURL) + url := fmt.Sprintf("%s/data/all/coinlist", c.baseURL) response, err := c.httpClient.DoGetRequest(context.Background(), url, nil) if err != nil { return nil, err @@ -111,7 +121,7 @@ func (c *Client) FetchTokenMarketValues(symbols []string, currency string) (map[ params.Add("tsyms", realCurrency) params.Add("extraParams", extraParamStatus) - url := fmt.Sprintf("%s/data/pricemultifull", baseURL) + url := fmt.Sprintf("%s/data/pricemultifull", c.baseURL) response, err := c.httpClient.DoGetRequest(context.Background(), url, params) if err != nil { return nil, err @@ -144,7 +154,7 @@ func (c *Client) FetchHistoricalHourlyPrices(symbol string, currency string, lim params.Add("limit", fmt.Sprintf("%d", limit)) params.Add("extraParams", extraParamStatus) - url := fmt.Sprintf("%s/data/v2/histohour", baseURL) + url := fmt.Sprintf("%s/data/v2/histohour", c.baseURL) response, err := c.httpClient.DoGetRequest(context.Background(), url, params) if err != nil { return item, err @@ -172,7 +182,7 @@ func (c *Client) FetchHistoricalDailyPrices(symbol string, currency string, limi params.Add("allData", fmt.Sprintf("%v", allData)) params.Add("extraParams", extraParamStatus) - url := fmt.Sprintf("%s/data/v2/histoday", baseURL) + url := fmt.Sprintf("%s/data/v2/histoday", c.baseURL) response, err := c.httpClient.DoGetRequest(context.Background(), url, params) if err != nil { return item, err