diff --git a/healthmanager/providers_health_manager.go b/healthmanager/providers_health_manager.go index d864c88f0..09b69b21d 100644 --- a/healthmanager/providers_health_manager.go +++ b/healthmanager/providers_health_manager.go @@ -9,6 +9,12 @@ import ( "github.com/status-im/status-go/healthmanager/rpcstatus" ) +// Aggregated provider contains both aggregated status of providers and status per provider. +type AggregatedProviderStatus struct { + Status rpcstatus.ProviderStatus `json:"status"` + StatusPerProvider map[string]rpcstatus.ProviderStatus `json:"statusPerProvider"` +} + type ProvidersHealthManager struct { mu sync.RWMutex chainID uint64 @@ -108,6 +114,15 @@ func (p *ProvidersHealthManager) Status() rpcstatus.ProviderStatus { return p.aggregator.GetAggregatedStatus() } +func (p *ProvidersHealthManager) GetAggregatedProviderStatus() AggregatedProviderStatus { + p.mu.RLock() + defer p.mu.RUnlock() + return AggregatedProviderStatus{ + Status: p.aggregator.GetAggregatedStatus(), + StatusPerProvider: p.aggregator.GetStatuses(), + } +} + // ChainID returns the ID of the chain. func (p *ProvidersHealthManager) ChainID() uint64 { return p.chainID diff --git a/rpc/chain/client.go b/rpc/chain/client.go index 226c5aa73..bb3a85fea 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -7,20 +7,17 @@ import ( "errors" "fmt" "math/big" - "strings" - "sync/atomic" - "time" "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core" "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/rpc" "github.com/status-im/status-go/circuitbreaker" "github.com/status-im/status-go/healthmanager" + "github.com/status-im/status-go/healthmanager/provider_errors" "github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/status-im/status-go/rpc/chain/ethclient" "github.com/status-im/status-go/rpc/chain/rpclimiter" @@ -33,8 +30,6 @@ type ClientInterface interface { ethclient.EthClientInterface NetworkID() uint64 ToBigInt() *big.Int - GetWalletNotifier() func(chainId uint64, message string) - SetWalletNotifier(notifier func(chainId uint64, message string)) connection.Connectable GetLimiter() rpclimiter.RequestLimiter SetLimiter(rpclimiter.RequestLimiter) @@ -71,11 +66,6 @@ type ClientWithFallback struct { circuitbreaker *circuitbreaker.CircuitBreaker providersHealthManager *healthmanager.ProvidersHealthManager - WalletNotifier func(chainId uint64, message string) - - isConnected *atomic.Bool - LastCheckedAt int64 - tag string // tag for the limiter groupTag string // tag for the limiter group } @@ -86,9 +76,6 @@ func (c *ClientWithFallback) Copy() interface{} { ethClients: c.ethClients, commonLimiter: c.commonLimiter, circuitbreaker: c.circuitbreaker, - WalletNotifier: c.WalletNotifier, - isConnected: c.isConnected, - LastCheckedAt: c.LastCheckedAt, tag: c.tag, groupTag: c.groupTag, } @@ -122,14 +109,9 @@ func NewClient(ethClients []ethclient.RPSLimitedEthClientInterface, chainID uint ErrorPercentThreshold: 25, } - isConnected := &atomic.Bool{} - isConnected.Store(true) - return &ClientWithFallback{ ChainID: chainID, ethClients: ethClients, - isConnected: isConnected, - LastCheckedAt: time.Now().Unix(), circuitbreaker: circuitbreaker.NewCircuitBreaker(cbConfig), providersHealthManager: providersHealthManager, } @@ -141,53 +123,12 @@ func (c *ClientWithFallback) Close() { } } -// Not found should not be cancelling the requests, as that's returned -// when we are hitting a non archival node for example, it should continue the -// chain as the next provider might have archival support. -func isNotFoundError(err error) bool { - return strings.Contains(err.Error(), ethereum.NotFound.Error()) -} - -func isVMError(err error) bool { - if strings.Contains(err.Error(), core.ErrInsufficientFunds.Error()) { - return true - } - for _, vmError := range propagateErrors { - if strings.Contains(err.Error(), vmError.Error()) { - return true - } - } - return false -} - -func isRPSLimitError(err error) bool { - return strings.Contains(err.Error(), "backoff_seconds") || - strings.Contains(err.Error(), "has exceeded its throughput limit") || - strings.Contains(err.Error(), "request rate exceeded") -} - -func (c *ClientWithFallback) SetIsConnected(value bool) { - c.LastCheckedAt = time.Now().Unix() - if !value { - if c.isConnected.Load() { - if c.WalletNotifier != nil { - c.WalletNotifier(c.ChainID, "down") - } - c.isConnected.Store(false) - } - - } else { - if !c.isConnected.Load() { - c.isConnected.Store(true) - if c.WalletNotifier != nil { - c.WalletNotifier(c.ChainID, "up") - } - } - } +func (c *ClientWithFallback) SetIsConnected(bool) { + log.Warn("SetIsConnected shouldn't be invoked. ProviderHealthManager takes care about the status.") } func (c *ClientWithFallback) IsConnected() bool { - return c.isConnected.Load() + return c.providersHealthManager.Status().Status == rpcstatus.StatusUp } func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []ethclient.RPSLimitedEthClientInterface, f func(client ethclient.RPSLimitedEthClientInterface) (interface{}, error)) (interface{}, error) { @@ -201,8 +142,6 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []ethclien } } - c.LastCheckedAt = time.Now().Unix() - cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range ethClients { provider := provider @@ -217,7 +156,7 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []ethclien res, err := f(provider) if err != nil { - if limiter != nil && isRPSLimitError(err) { + if limiter != nil && provider_errors.IsRateLimitError(err) { provider.GetLimiter().ReduceLimit() err = provider.GetLimiter().WaitForRequestsAvailability(1) @@ -231,7 +170,7 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []ethclien } } - if isVMError(err) || errors.Is(err, context.Canceled) { + if provider_errors.IsVMError(err) || errors.Is(err, context.Canceled) { cmd.Cancel() } @@ -249,7 +188,6 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []ethclien if result.Error() != nil { return nil, result.Error() } - return result.Result()[0], nil } @@ -262,8 +200,6 @@ func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash) }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -279,8 +215,6 @@ func (c *ClientWithFallback) BlockByNumber(ctx context.Context, number *big.Int) }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -297,8 +231,6 @@ func (c *ClientWithFallback) BlockNumber(ctx context.Context) (uint64, error) { }, ) - c.toggleConnectionState(err) - if err != nil { return 0, err } @@ -314,8 +246,6 @@ func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash) }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -331,8 +261,6 @@ func (c *ClientWithFallback) HeaderByNumber(ctx context.Context, number *big.Int }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -350,8 +278,6 @@ func (c *ClientWithFallback) TransactionByHash(ctx context.Context, hash common. }, ) - c.toggleConnectionState(err) - if err != nil { return nil, false, err } @@ -369,8 +295,6 @@ func (c *ClientWithFallback) TransactionSender(ctx context.Context, tx *types.Tr }, ) - c.toggleConnectionState(err) - return res.(common.Address), err } @@ -383,8 +307,6 @@ func (c *ClientWithFallback) TransactionReceipt(ctx context.Context, txHash comm }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -401,8 +323,6 @@ func (c *ClientWithFallback) SyncProgress(ctx context.Context) (*ethereum.SyncPr }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -423,8 +343,6 @@ func (c *ClientWithFallback) BalanceAt(ctx context.Context, account common.Addre }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -441,8 +359,6 @@ func (c *ClientWithFallback) StorageAt(ctx context.Context, account common.Addre }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -459,8 +375,6 @@ func (c *ClientWithFallback) CodeAt(ctx context.Context, account common.Address, }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -477,8 +391,6 @@ func (c *ClientWithFallback) NonceAt(ctx context.Context, account common.Address }, ) - c.toggleConnectionState(err) - if err != nil { return 0, err } @@ -520,8 +432,6 @@ func (c *ClientWithFallback) SubscribeFilterLogs(ctx context.Context, q ethereum }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -538,8 +448,6 @@ func (c *ClientWithFallback) PendingBalanceAt(ctx context.Context, account commo }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -556,8 +464,6 @@ func (c *ClientWithFallback) PendingStorageAt(ctx context.Context, account commo }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -574,8 +480,6 @@ func (c *ClientWithFallback) PendingCodeAt(ctx context.Context, account common.A }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -592,8 +496,6 @@ func (c *ClientWithFallback) PendingNonceAt(ctx context.Context, account common. }, ) - c.toggleConnectionState(err) - if err != nil { return 0, err } @@ -610,8 +512,6 @@ func (c *ClientWithFallback) PendingTransactionCount(ctx context.Context) (uint, }, ) - c.toggleConnectionState(err) - if err != nil { return 0, err } @@ -628,8 +528,6 @@ func (c *ClientWithFallback) CallContract(ctx context.Context, msg ethereum.Call }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -646,8 +544,6 @@ func (c *ClientWithFallback) PendingCallContract(ctx context.Context, msg ethere }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -664,8 +560,6 @@ func (c *ClientWithFallback) SuggestGasPrice(ctx context.Context) (*big.Int, err }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -682,8 +576,6 @@ func (c *ClientWithFallback) SuggestGasTipCap(ctx context.Context) (*big.Int, er }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -700,8 +592,6 @@ func (c *ClientWithFallback) FeeHistory(ctx context.Context, blockCount uint64, }, ) - c.toggleConnectionState(err) - if err != nil { return nil, err } @@ -718,8 +608,6 @@ func (c *ClientWithFallback) EstimateGas(ctx context.Context, msg ethereum.CallM }, ) - c.toggleConnectionState(err) - if err != nil { return 0, err } @@ -736,8 +624,6 @@ func (c *ClientWithFallback) SendTransaction(ctx context.Context, tx *types.Tran }, ) - c.toggleConnectionState(err) - return err } @@ -750,8 +636,6 @@ func (c *ClientWithFallback) CallContext(ctx context.Context, result interface{} }, ) - c.toggleConnectionState(err) - return err } @@ -764,8 +648,6 @@ func (c *ClientWithFallback) BatchCallContext(ctx context.Context, b []rpc.Batch }, ) - c.toggleConnectionState(err) - return err } @@ -793,27 +675,6 @@ func (c *ClientWithFallback) GetBaseFeeFromBlock(ctx context.Context, blockNumbe return baseGasFee, err } -func (c *ClientWithFallback) GetWalletNotifier() func(chainId uint64, message string) { - return c.WalletNotifier -} - -func (c *ClientWithFallback) SetWalletNotifier(notifier func(chainId uint64, message string)) { - c.WalletNotifier = notifier -} - -func (c *ClientWithFallback) toggleConnectionState(err error) { - connected := true - if err != nil { - if !isNotFoundError(err) && !isVMError(err) && !errors.Is(err, rpclimiter.ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) { - log.Warn("Error not in chain call", "error", err, "chain", c.ChainID) - connected = false - } else { - log.Warn("Error in chain call", "error", err) - } - } - c.SetIsConnected(connected) -} - func (c *ClientWithFallback) Tag() string { return c.tag } diff --git a/rpc/client.go b/rpc/client.go index c335cd003..902af22b0 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -52,6 +52,7 @@ const ( // if we see this user agent in the logs that means parts of the application are using a malconfigured http client rpcUserAgentUpstreamFormat = "procuratee-%s-upstream/%s" + EventBlockchainStatusChanged walletevent.EventType = "wallet-blockchain-status-changed" // deprecated EventBlockchainHealthChanged walletevent.EventType = "wallet-blockchain-health-changed" // Full status of the blockchain (including provider statuses) ) @@ -114,7 +115,6 @@ type Client struct { handlers map[string]Handler // locally registered handlers log log.Logger - walletNotifier func(chainID uint64, message string) providerConfigs []params.ProviderConfig } @@ -161,7 +161,7 @@ func NewClient(config ClientConfig) (*Client, error) { walletFeed: config.WalletFeed, } - c.UpstreamChainID = upstreamChainID + c.UpstreamChainID = config.UpstreamChainID c.router = newRouter(true) if verifProxyInitFn != nil { @@ -194,17 +194,42 @@ func (c *Client) Stop() { func (c *Client) monitorHealth(ctx context.Context, statusCh chan struct{}) { sendFullStatusEventFunc := func() { + if c.walletFeed == nil { + return + } + blockchainStatus := c.healthMgr.GetFullStatus() encodedMessage, err := json.Marshal(blockchainStatus) if err != nil { c.log.Warn("could not marshal full blockchain status", "error", err) return } + + c.walletFeed.Send(walletevent.Event{ + Type: EventBlockchainHealthChanged, + Message: string(encodedMessage), + At: time.Now().Unix(), + }) + } + + sendShortStatusEventFunc := func() { if c.walletFeed == nil { return } + + blockchainShortStatus := c.healthMgr.GetStatusPerChain() + result := make(map[uint64]string) + for chainID, status := range blockchainShortStatus.StatusPerChain { + result[chainID] = string(status.Status) + } + encodedMessage, err := json.Marshal(result) + if err != nil { + c.log.Warn("could not marshal blockchain status", "error", err) + return + } + c.walletFeed.Send(walletevent.Event{ - Type: EventBlockchainHealthChanged, + Type: EventBlockchainStatusChanged, Message: string(encodedMessage), At: time.Now().Unix(), }) @@ -215,6 +240,7 @@ func (c *Client) monitorHealth(ctx context.Context, statusCh chan struct{}) { case <-ctx.Done(): return case <-statusCh: + sendShortStatusEventFunc() sendFullStatusEventFunc() } } @@ -224,10 +250,6 @@ func (c *Client) GetNetworkManager() *network.Manager { return c.NetworkManager } -func (c *Client) SetWalletNotifier(notifier func(chainID uint64, message string)) { - c.walletNotifier = notifier -} - func extractHostFromURL(inputURL string) (string, error) { parsedURL, err := url.Parse(inputURL) if err != nil { @@ -261,9 +283,6 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err c.rpcClientsMutex.Lock() defer c.rpcClientsMutex.Unlock() if rpcClient, ok := c.rpcClients[chainID]; ok { - if rpcClient.GetWalletNotifier() == nil { - rpcClient.SetWalletNotifier(c.walletNotifier) - } return rpcClient, nil } @@ -453,8 +472,7 @@ func (c *Client) CallContextIgnoringLocalHandlers(ctx context.Context, result in if c.router.routeRemote(method) { client, err := c.getClientUsingCache(chainID) if err == nil { - return client.CallContext(ctx, r - esult, method, args...) + return client.CallContext(ctx, result, method, args...) } } diff --git a/services/wallet/api_test.go b/services/wallet/api_test.go index 9b14daa79..39bd6b5fa 100644 --- a/services/wallet/api_test.go +++ b/services/wallet/api_test.go @@ -155,10 +155,8 @@ func TestAPI_GetAddressDetails(t *testing.T) { c, err := rpc.NewClient(config) require.NoError(t, err) - chainClient, err := c.EthClient(chainID) + _, err = c.EthClient(chainID) require.NoError(t, err) - chainClient.SetWalletNotifier(func(chainID uint64, message string) {}) - c.SetWalletNotifier(func(chainID uint64, message string) {}) service := NewService(db, accountsDb, appDB, c, accountFeed, nil, nil, nil, ¶ms.NodeConfig{}, nil, nil, nil, nil, nil, "") diff --git a/services/wallet/market/market.go b/services/wallet/market/market.go index bfea99d5a..c6ac4f053 100644 --- a/services/wallet/market/market.go +++ b/services/wallet/market/market.go @@ -2,7 +2,8 @@ package market import ( "context" - "sync" + "encoding/json" + "github.com/status-im/status-go/healthmanager/rpcstatus" "time" "github.com/ethereum/go-ethereum/common" @@ -10,12 +11,14 @@ import ( "github.com/ethereum/go-ethereum/log" "github.com/status-im/status-go/circuitbreaker" + healthManager "github.com/status-im/status-go/healthmanager" "github.com/status-im/status-go/services/wallet/thirdparty" "github.com/status-im/status-go/services/wallet/walletevent" ) const ( - EventMarketStatusChanged walletevent.EventType = "wallet-market-status-changed" + EventMarketStatusChanged walletevent.EventType = "wallet-market-status-changed" // deprecated + EventMarketHealthChanged walletevent.EventType = "wallet-market-health-changed" ) const ( @@ -39,14 +42,13 @@ type TokenMarketCache MarketValuesPerCurrencyAndToken type TokenPriceCache DataPerTokenAndCurrency type Manager struct { - feed *event.Feed - priceCache MarketCache[TokenPriceCache] - marketCache MarketCache[TokenMarketCache] - IsConnected bool - LastCheckedAt int64 - IsConnectedLock sync.RWMutex - circuitbreaker *circuitbreaker.CircuitBreaker - providers []thirdparty.MarketDataProvider + feed *event.Feed + priceCache MarketCache[TokenPriceCache] + marketCache MarketCache[TokenMarketCache] + circuitbreaker *circuitbreaker.CircuitBreaker + healthManager *healthManager.ProvidersHealthManager + stopMonitoringFunc context.CancelFunc + providers []thirdparty.MarketDataProvider } func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Manager { @@ -61,32 +63,12 @@ func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Ma feed: feed, priceCache: *NewCache(make(TokenPriceCache)), marketCache: *NewCache(make(TokenMarketCache)), - IsConnected: true, - LastCheckedAt: time.Now().Unix(), circuitbreaker: cb, providers: providers, + healthManager: healthManager.NewProvidersHealthManager(0), } } -func (pm *Manager) setIsConnected(value bool) { - pm.IsConnectedLock.Lock() - defer pm.IsConnectedLock.Unlock() - pm.LastCheckedAt = time.Now().Unix() - if value != pm.IsConnected { - message := "down" - if value { - message = "up" - } - pm.feed.Send(walletevent.Event{ - Type: EventMarketStatusChanged, - Accounts: []common.Address{}, - Message: message, - At: time.Now().Unix(), - }) - } - pm.IsConnected = value -} - func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(provider thirdparty.MarketDataProvider) (interface{}, error)) (interface{}, error) { cmd := circuitbreaker.NewCommand(context.Background(), nil) for _, provider := range providers { @@ -98,7 +80,10 @@ func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(pr } result := pm.circuitbreaker.Execute(cmd) - pm.setIsConnected(result.Error() == nil) + if pm.healthManager != nil { + rpcCallStatuses := convertFunctorCallStatuses(result.FunctorCallStatuses()) + pm.healthManager.Update(context.Background(), rpcCallStatuses) + } if result.Error() != nil { log.Error("Error fetching prices", "error", result.Error()) @@ -108,6 +93,79 @@ func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(pr return result.Result()[0], nil } +func (pm *Manager) monitorHealth(ctx context.Context, statusCh chan struct{}) { + sendFullStatusEventFunc := func() { + if pm.feed == nil { + return + } + + marketStatus := pm.healthManager.GetAggregatedProviderStatus() + encodedMessage, err := json.Marshal(marketStatus) + if err != nil { + log.Warn("could not marshal full market status", "error", err) + return + } + + pm.feed.Send(walletevent.Event{ + Type: EventMarketHealthChanged, + Message: string(encodedMessage), + At: time.Now().Unix(), + }) + } + + sendOldStatusEventFunc := func() { + if pm.feed == nil { + return + } + marketStatus := pm.healthManager.Status() + message := "down" + if marketStatus.Status == rpcstatus.StatusUp { + message = "up" + } + + pm.feed.Send(walletevent.Event{ + Type: EventMarketStatusChanged, + Accounts: []common.Address{}, + Message: message, + At: time.Now().Unix(), + }) + } + + for { + select { + case <-ctx.Done(): + return + case <-statusCh: + sendOldStatusEventFunc() + sendFullStatusEventFunc() + } + } +} + +func (pm *Manager) Start(ctx context.Context) { + if pm.stopMonitoringFunc != nil { + log.Warn("Market health manager already started") + return + } + + cancelableCtx, cancel := context.WithCancel(ctx) + pm.stopMonitoringFunc = cancel + statusCh := pm.healthManager.Subscribe() + go pm.monitorHealth(cancelableCtx, statusCh) +} + +func (pm *Manager) Stop() { + if pm.stopMonitoringFunc == nil { + return + } + pm.stopMonitoringFunc() + pm.stopMonitoringFunc = nil +} + +func (pm *Manager) IsConnected() bool { + return pm.healthManager.Status().Status == rpcstatus.StatusUp +} + func (pm *Manager) FetchHistoricalDailyPrices(symbol string, currency string, limit int, allData bool, aggregate int) ([]thirdparty.HistoricalPrice, error) { result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) { return provider.FetchHistoricalDailyPrices(symbol, currency, limit, allData, aggregate) @@ -341,3 +399,14 @@ func (pm *Manager) GetOrFetchPrices(symbols []string, currencies []string, maxAg return prices, nil } + +func convertFunctorCallStatuses(statuses []circuitbreaker.FunctorCallStatus) (result []rpcstatus.RpcProviderCallStatus) { + for _, f := range statuses { + result = append(result, rpcstatus.RpcProviderCallStatus{ + Name: f.Name, + Timestamp: f.Timestamp, + Err: f.Err, + }) + } + return +} diff --git a/services/wallet/market/market_feed_test.go b/services/wallet/market/market_feed_test.go index 15697040d..457a29c5d 100644 --- a/services/wallet/market/market_feed_test.go +++ b/services/wallet/market/market_feed_test.go @@ -1,6 +1,7 @@ package market import ( + "context" "errors" "testing" "time" @@ -59,6 +60,7 @@ func (s *MarketTestSuite) TestEventOnNetworkError() { customErr := errors.New("dial tcp: lookup optimism-goerli.infura.io: no such host") priceProviderWithError := mock_market.NewMockPriceProviderWithError(ctrl, customErr) manager := NewManager([]thirdparty.MarketDataProvider{priceProviderWithError}, s.feedSub.GetFeed()) + manager.Start(context.Background()) _, err := manager.FetchPrices(s.symbols, s.currencies) s.Require().Error(err, "expected error from FetchPrices due to MockPriceProviderWithError") diff --git a/services/wallet/reader.go b/services/wallet/reader.go index b0c62551a..6bffcb4f4 100644 --- a/services/wallet/reader.go +++ b/services/wallet/reader.go @@ -536,7 +536,7 @@ func (r *Reader) GetWalletToken(ctx context.Context, clients map[uint64]chain.Cl ChangePct24hour: tokenMarketValues[tok.Symbol].CHANGEPCT24HOUR, Change24hour: tokenMarketValues[tok.Symbol].CHANGE24HOUR, Price: prices[tok.Symbol][currency].Price, - HasError: !r.marketManager.IsConnected, + HasError: !r.marketManager.IsConnected(), } } diff --git a/services/wallet/service.go b/services/wallet/service.go index 9ddeb597b..d38864fb4 100644 --- a/services/wallet/service.go +++ b/services/wallet/service.go @@ -1,13 +1,12 @@ package wallet import ( + "context" "database/sql" - "encoding/json" "fmt" - "sync" + "time" - "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/p2p" @@ -42,10 +41,6 @@ import ( "github.com/status-im/status-go/transactions" ) -const ( - EventBlockchainStatusChanged walletevent.EventType = "wallet-blockchain-status-changed" -) - // NewService initializes service instance. func NewService( db *sql.DB, @@ -67,38 +62,6 @@ func NewService( signals := &walletevent.SignalsTransmitter{ Publisher: feed, } - blockchainStatus := make(map[uint64]string) - mutex := sync.Mutex{} - rpcClient.SetWalletNotifier(func(chainID uint64, message string) { - mutex.Lock() - defer mutex.Unlock() - - if len(blockchainStatus) == 0 { - networks, err := rpcClient.NetworkManager.Get(false) - if err != nil { - return - } - - for _, network := range networks { - blockchainStatus[network.ChainID] = "up" - } - } - - blockchainStatus[chainID] = message - encodedmessage, err := json.Marshal(blockchainStatus) - if err != nil { - return - } - - feed.Send(walletevent.Event{ - Type: EventBlockchainStatusChanged, - Accounts: []common.Address{}, - Message: string(encodedmessage), - At: time.Now().Unix(), - ChainID: chainID, - }) - }) - communityManager := community.NewManager(db, mediaServer, feed) balanceCacher := balance.NewCacherWithTTL(5 * time.Minute) tokenManager := token.NewTokenManager(db, rpcClient, communityManager, rpcClient.NetworkManager, appDB, mediaServer, feed, accountFeed, accountsDB, token.NewPersistence(db)) @@ -126,6 +89,8 @@ func NewService( Password: config.WalletConfig.StatusProxyMarketPassword, }) marketManager := market.NewManager([]thirdparty.MarketDataProvider{cryptoCompare, coingecko, cryptoCompareProxy}, feed) + ctx := context.Background() + marketManager.Start(ctx) reader := NewReader(tokenManager, marketManager, token.NewPersistence(db), feed) history := history.NewService(db, accountsDB, accountFeed, feed, rpcClient, tokenManager, marketManager, balanceCacher.Cache()) currency := currency.NewService(db, feed, tokenManager, marketManager) @@ -280,6 +245,7 @@ func (s *Service) Stop() error { s.activity.Stop() s.collectibles.Stop() s.tokenManager.Stop() + s.marketManager.Stop() s.started = false log.Info("wallet stopped") return nil diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 426d14668..212921dab 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -585,19 +585,6 @@ func (tc *TestClient) CallContext(ctx context.Context, result interface{}, metho return err } -func (tc *TestClient) GetWalletNotifier() func(chainId uint64, message string) { - if tc.traceAPICalls { - tc.t.Log("GetWalletNotifier") - } - return nil -} - -func (tc *TestClient) SetWalletNotifier(notifier func(chainId uint64, message string)) { - if tc.traceAPICalls { - tc.t.Log("SetWalletNotifier") - } -} - func (tc *TestClient) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) { err = tc.countAndlog("EstimateGas") return 0, err