chore(wallet)_: integrate health monitor to wallet/service

fixes status-im/status-mobile#21071
This commit is contained in:
Andrey Bocharnikov 2024-09-10 01:19:40 +04:00
parent 7f262312bd
commit e8e44ac525
9 changed files with 161 additions and 245 deletions

View File

@ -9,6 +9,12 @@ import (
"github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/status-im/status-go/healthmanager/rpcstatus"
) )
// Aggregated provider contains both aggregated status of providers and status per provider.
type AggregatedProviderStatus struct {
Status rpcstatus.ProviderStatus `json:"status"`
StatusPerProvider map[string]rpcstatus.ProviderStatus `json:"statusPerProvider"`
}
type ProvidersHealthManager struct { type ProvidersHealthManager struct {
mu sync.RWMutex mu sync.RWMutex
chainID uint64 chainID uint64
@ -108,6 +114,15 @@ func (p *ProvidersHealthManager) Status() rpcstatus.ProviderStatus {
return p.aggregator.GetAggregatedStatus() return p.aggregator.GetAggregatedStatus()
} }
func (p *ProvidersHealthManager) GetAggregatedProviderStatus() AggregatedProviderStatus {
p.mu.RLock()
defer p.mu.RUnlock()
return AggregatedProviderStatus{
Status: p.aggregator.GetAggregatedStatus(),
StatusPerProvider: p.aggregator.GetStatuses(),
}
}
// ChainID returns the ID of the chain. // ChainID returns the ID of the chain.
func (p *ProvidersHealthManager) ChainID() uint64 { func (p *ProvidersHealthManager) ChainID() uint64 {
return p.chainID return p.chainID

View File

@ -7,20 +7,17 @@ import (
"errors" "errors"
"fmt" "fmt"
"math/big" "math/big"
"strings"
"sync/atomic"
"time"
"github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/circuitbreaker" "github.com/status-im/status-go/circuitbreaker"
"github.com/status-im/status-go/healthmanager" "github.com/status-im/status-go/healthmanager"
"github.com/status-im/status-go/healthmanager/provider_errors"
"github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/status-im/status-go/healthmanager/rpcstatus"
"github.com/status-im/status-go/rpc/chain/ethclient" "github.com/status-im/status-go/rpc/chain/ethclient"
"github.com/status-im/status-go/rpc/chain/rpclimiter" "github.com/status-im/status-go/rpc/chain/rpclimiter"
@ -33,8 +30,6 @@ type ClientInterface interface {
ethclient.EthClientInterface ethclient.EthClientInterface
NetworkID() uint64 NetworkID() uint64
ToBigInt() *big.Int ToBigInt() *big.Int
GetWalletNotifier() func(chainId uint64, message string)
SetWalletNotifier(notifier func(chainId uint64, message string))
connection.Connectable connection.Connectable
GetLimiter() rpclimiter.RequestLimiter GetLimiter() rpclimiter.RequestLimiter
SetLimiter(rpclimiter.RequestLimiter) SetLimiter(rpclimiter.RequestLimiter)
@ -71,11 +66,6 @@ type ClientWithFallback struct {
circuitbreaker *circuitbreaker.CircuitBreaker circuitbreaker *circuitbreaker.CircuitBreaker
providersHealthManager *healthmanager.ProvidersHealthManager providersHealthManager *healthmanager.ProvidersHealthManager
WalletNotifier func(chainId uint64, message string)
isConnected *atomic.Bool
LastCheckedAt int64
tag string // tag for the limiter tag string // tag for the limiter
groupTag string // tag for the limiter group groupTag string // tag for the limiter group
} }
@ -86,9 +76,6 @@ func (c *ClientWithFallback) Copy() interface{} {
ethClients: c.ethClients, ethClients: c.ethClients,
commonLimiter: c.commonLimiter, commonLimiter: c.commonLimiter,
circuitbreaker: c.circuitbreaker, circuitbreaker: c.circuitbreaker,
WalletNotifier: c.WalletNotifier,
isConnected: c.isConnected,
LastCheckedAt: c.LastCheckedAt,
tag: c.tag, tag: c.tag,
groupTag: c.groupTag, groupTag: c.groupTag,
} }
@ -122,14 +109,9 @@ func NewClient(ethClients []ethclient.RPSLimitedEthClientInterface, chainID uint
ErrorPercentThreshold: 25, ErrorPercentThreshold: 25,
} }
isConnected := &atomic.Bool{}
isConnected.Store(true)
return &ClientWithFallback{ return &ClientWithFallback{
ChainID: chainID, ChainID: chainID,
ethClients: ethClients, ethClients: ethClients,
isConnected: isConnected,
LastCheckedAt: time.Now().Unix(),
circuitbreaker: circuitbreaker.NewCircuitBreaker(cbConfig), circuitbreaker: circuitbreaker.NewCircuitBreaker(cbConfig),
providersHealthManager: providersHealthManager, providersHealthManager: providersHealthManager,
} }
@ -141,53 +123,12 @@ func (c *ClientWithFallback) Close() {
} }
} }
// Not found should not be cancelling the requests, as that's returned func (c *ClientWithFallback) SetIsConnected(bool) {
// when we are hitting a non archival node for example, it should continue the log.Warn("SetIsConnected shouldn't be invoked. ProviderHealthManager takes care about the status.")
// chain as the next provider might have archival support.
func isNotFoundError(err error) bool {
return strings.Contains(err.Error(), ethereum.NotFound.Error())
}
func isVMError(err error) bool {
if strings.Contains(err.Error(), core.ErrInsufficientFunds.Error()) {
return true
}
for _, vmError := range propagateErrors {
if strings.Contains(err.Error(), vmError.Error()) {
return true
}
}
return false
}
func isRPSLimitError(err error) bool {
return strings.Contains(err.Error(), "backoff_seconds") ||
strings.Contains(err.Error(), "has exceeded its throughput limit") ||
strings.Contains(err.Error(), "request rate exceeded")
}
func (c *ClientWithFallback) SetIsConnected(value bool) {
c.LastCheckedAt = time.Now().Unix()
if !value {
if c.isConnected.Load() {
if c.WalletNotifier != nil {
c.WalletNotifier(c.ChainID, "down")
}
c.isConnected.Store(false)
}
} else {
if !c.isConnected.Load() {
c.isConnected.Store(true)
if c.WalletNotifier != nil {
c.WalletNotifier(c.ChainID, "up")
}
}
}
} }
func (c *ClientWithFallback) IsConnected() bool { func (c *ClientWithFallback) IsConnected() bool {
return c.isConnected.Load() return c.providersHealthManager.Status().Status == rpcstatus.StatusUp
} }
func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []ethclient.RPSLimitedEthClientInterface, f func(client ethclient.RPSLimitedEthClientInterface) (interface{}, error)) (interface{}, error) { func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []ethclient.RPSLimitedEthClientInterface, f func(client ethclient.RPSLimitedEthClientInterface) (interface{}, error)) (interface{}, error) {
@ -201,8 +142,6 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []ethclien
} }
} }
c.LastCheckedAt = time.Now().Unix()
cmd := circuitbreaker.NewCommand(ctx, nil) cmd := circuitbreaker.NewCommand(ctx, nil)
for _, provider := range ethClients { for _, provider := range ethClients {
provider := provider provider := provider
@ -217,7 +156,7 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []ethclien
res, err := f(provider) res, err := f(provider)
if err != nil { if err != nil {
if limiter != nil && isRPSLimitError(err) { if limiter != nil && provider_errors.IsRateLimitError(err) {
provider.GetLimiter().ReduceLimit() provider.GetLimiter().ReduceLimit()
err = provider.GetLimiter().WaitForRequestsAvailability(1) err = provider.GetLimiter().WaitForRequestsAvailability(1)
@ -231,7 +170,7 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []ethclien
} }
} }
if isVMError(err) || errors.Is(err, context.Canceled) { if provider_errors.IsVMError(err) || errors.Is(err, context.Canceled) {
cmd.Cancel() cmd.Cancel()
} }
@ -249,7 +188,6 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []ethclien
if result.Error() != nil { if result.Error() != nil {
return nil, result.Error() return nil, result.Error()
} }
return result.Result()[0], nil return result.Result()[0], nil
} }
@ -262,8 +200,6 @@ func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash)
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -279,8 +215,6 @@ func (c *ClientWithFallback) BlockByNumber(ctx context.Context, number *big.Int)
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -297,8 +231,6 @@ func (c *ClientWithFallback) BlockNumber(ctx context.Context) (uint64, error) {
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -314,8 +246,6 @@ func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash)
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -331,8 +261,6 @@ func (c *ClientWithFallback) HeaderByNumber(ctx context.Context, number *big.Int
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -350,8 +278,6 @@ func (c *ClientWithFallback) TransactionByHash(ctx context.Context, hash common.
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, false, err return nil, false, err
} }
@ -369,8 +295,6 @@ func (c *ClientWithFallback) TransactionSender(ctx context.Context, tx *types.Tr
}, },
) )
c.toggleConnectionState(err)
return res.(common.Address), err return res.(common.Address), err
} }
@ -383,8 +307,6 @@ func (c *ClientWithFallback) TransactionReceipt(ctx context.Context, txHash comm
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -401,8 +323,6 @@ func (c *ClientWithFallback) SyncProgress(ctx context.Context) (*ethereum.SyncPr
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -423,8 +343,6 @@ func (c *ClientWithFallback) BalanceAt(ctx context.Context, account common.Addre
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -441,8 +359,6 @@ func (c *ClientWithFallback) StorageAt(ctx context.Context, account common.Addre
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -459,8 +375,6 @@ func (c *ClientWithFallback) CodeAt(ctx context.Context, account common.Address,
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -477,8 +391,6 @@ func (c *ClientWithFallback) NonceAt(ctx context.Context, account common.Address
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -520,8 +432,6 @@ func (c *ClientWithFallback) SubscribeFilterLogs(ctx context.Context, q ethereum
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -538,8 +448,6 @@ func (c *ClientWithFallback) PendingBalanceAt(ctx context.Context, account commo
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -556,8 +464,6 @@ func (c *ClientWithFallback) PendingStorageAt(ctx context.Context, account commo
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -574,8 +480,6 @@ func (c *ClientWithFallback) PendingCodeAt(ctx context.Context, account common.A
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -592,8 +496,6 @@ func (c *ClientWithFallback) PendingNonceAt(ctx context.Context, account common.
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -610,8 +512,6 @@ func (c *ClientWithFallback) PendingTransactionCount(ctx context.Context) (uint,
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -628,8 +528,6 @@ func (c *ClientWithFallback) CallContract(ctx context.Context, msg ethereum.Call
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -646,8 +544,6 @@ func (c *ClientWithFallback) PendingCallContract(ctx context.Context, msg ethere
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -664,8 +560,6 @@ func (c *ClientWithFallback) SuggestGasPrice(ctx context.Context) (*big.Int, err
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -682,8 +576,6 @@ func (c *ClientWithFallback) SuggestGasTipCap(ctx context.Context) (*big.Int, er
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -700,8 +592,6 @@ func (c *ClientWithFallback) FeeHistory(ctx context.Context, blockCount uint64,
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -718,8 +608,6 @@ func (c *ClientWithFallback) EstimateGas(ctx context.Context, msg ethereum.CallM
}, },
) )
c.toggleConnectionState(err)
if err != nil { if err != nil {
return 0, err return 0, err
} }
@ -736,8 +624,6 @@ func (c *ClientWithFallback) SendTransaction(ctx context.Context, tx *types.Tran
}, },
) )
c.toggleConnectionState(err)
return err return err
} }
@ -750,8 +636,6 @@ func (c *ClientWithFallback) CallContext(ctx context.Context, result interface{}
}, },
) )
c.toggleConnectionState(err)
return err return err
} }
@ -764,8 +648,6 @@ func (c *ClientWithFallback) BatchCallContext(ctx context.Context, b []rpc.Batch
}, },
) )
c.toggleConnectionState(err)
return err return err
} }
@ -793,27 +675,6 @@ func (c *ClientWithFallback) GetBaseFeeFromBlock(ctx context.Context, blockNumbe
return baseGasFee, err return baseGasFee, err
} }
func (c *ClientWithFallback) GetWalletNotifier() func(chainId uint64, message string) {
return c.WalletNotifier
}
func (c *ClientWithFallback) SetWalletNotifier(notifier func(chainId uint64, message string)) {
c.WalletNotifier = notifier
}
func (c *ClientWithFallback) toggleConnectionState(err error) {
connected := true
if err != nil {
if !isNotFoundError(err) && !isVMError(err) && !errors.Is(err, rpclimiter.ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) {
log.Warn("Error not in chain call", "error", err, "chain", c.ChainID)
connected = false
} else {
log.Warn("Error in chain call", "error", err)
}
}
c.SetIsConnected(connected)
}
func (c *ClientWithFallback) Tag() string { func (c *ClientWithFallback) Tag() string {
return c.tag return c.tag
} }

View File

@ -52,6 +52,7 @@ const (
// if we see this user agent in the logs that means parts of the application are using a malconfigured http client // if we see this user agent in the logs that means parts of the application are using a malconfigured http client
rpcUserAgentUpstreamFormat = "procuratee-%s-upstream/%s" rpcUserAgentUpstreamFormat = "procuratee-%s-upstream/%s"
EventBlockchainStatusChanged walletevent.EventType = "wallet-blockchain-status-changed" // deprecated
EventBlockchainHealthChanged walletevent.EventType = "wallet-blockchain-health-changed" // Full status of the blockchain (including provider statuses) EventBlockchainHealthChanged walletevent.EventType = "wallet-blockchain-health-changed" // Full status of the blockchain (including provider statuses)
) )
@ -114,7 +115,6 @@ type Client struct {
handlers map[string]Handler // locally registered handlers handlers map[string]Handler // locally registered handlers
log log.Logger log log.Logger
walletNotifier func(chainID uint64, message string)
providerConfigs []params.ProviderConfig providerConfigs []params.ProviderConfig
} }
@ -161,7 +161,7 @@ func NewClient(config ClientConfig) (*Client, error) {
walletFeed: config.WalletFeed, walletFeed: config.WalletFeed,
} }
c.UpstreamChainID = upstreamChainID c.UpstreamChainID = config.UpstreamChainID
c.router = newRouter(true) c.router = newRouter(true)
if verifProxyInitFn != nil { if verifProxyInitFn != nil {
@ -194,17 +194,42 @@ func (c *Client) Stop() {
func (c *Client) monitorHealth(ctx context.Context, statusCh chan struct{}) { func (c *Client) monitorHealth(ctx context.Context, statusCh chan struct{}) {
sendFullStatusEventFunc := func() { sendFullStatusEventFunc := func() {
if c.walletFeed == nil {
return
}
blockchainStatus := c.healthMgr.GetFullStatus() blockchainStatus := c.healthMgr.GetFullStatus()
encodedMessage, err := json.Marshal(blockchainStatus) encodedMessage, err := json.Marshal(blockchainStatus)
if err != nil { if err != nil {
c.log.Warn("could not marshal full blockchain status", "error", err) c.log.Warn("could not marshal full blockchain status", "error", err)
return return
} }
c.walletFeed.Send(walletevent.Event{
Type: EventBlockchainHealthChanged,
Message: string(encodedMessage),
At: time.Now().Unix(),
})
}
sendShortStatusEventFunc := func() {
if c.walletFeed == nil { if c.walletFeed == nil {
return return
} }
blockchainShortStatus := c.healthMgr.GetStatusPerChain()
result := make(map[uint64]string)
for chainID, status := range blockchainShortStatus.StatusPerChain {
result[chainID] = string(status.Status)
}
encodedMessage, err := json.Marshal(result)
if err != nil {
c.log.Warn("could not marshal blockchain status", "error", err)
return
}
c.walletFeed.Send(walletevent.Event{ c.walletFeed.Send(walletevent.Event{
Type: EventBlockchainHealthChanged, Type: EventBlockchainStatusChanged,
Message: string(encodedMessage), Message: string(encodedMessage),
At: time.Now().Unix(), At: time.Now().Unix(),
}) })
@ -215,6 +240,7 @@ func (c *Client) monitorHealth(ctx context.Context, statusCh chan struct{}) {
case <-ctx.Done(): case <-ctx.Done():
return return
case <-statusCh: case <-statusCh:
sendShortStatusEventFunc()
sendFullStatusEventFunc() sendFullStatusEventFunc()
} }
} }
@ -224,10 +250,6 @@ func (c *Client) GetNetworkManager() *network.Manager {
return c.NetworkManager return c.NetworkManager
} }
func (c *Client) SetWalletNotifier(notifier func(chainID uint64, message string)) {
c.walletNotifier = notifier
}
func extractHostFromURL(inputURL string) (string, error) { func extractHostFromURL(inputURL string) (string, error) {
parsedURL, err := url.Parse(inputURL) parsedURL, err := url.Parse(inputURL)
if err != nil { if err != nil {
@ -261,9 +283,6 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err
c.rpcClientsMutex.Lock() c.rpcClientsMutex.Lock()
defer c.rpcClientsMutex.Unlock() defer c.rpcClientsMutex.Unlock()
if rpcClient, ok := c.rpcClients[chainID]; ok { if rpcClient, ok := c.rpcClients[chainID]; ok {
if rpcClient.GetWalletNotifier() == nil {
rpcClient.SetWalletNotifier(c.walletNotifier)
}
return rpcClient, nil return rpcClient, nil
} }
@ -453,8 +472,7 @@ func (c *Client) CallContextIgnoringLocalHandlers(ctx context.Context, result in
if c.router.routeRemote(method) { if c.router.routeRemote(method) {
client, err := c.getClientUsingCache(chainID) client, err := c.getClientUsingCache(chainID)
if err == nil { if err == nil {
return client.CallContext(ctx, r return client.CallContext(ctx, result, method, args...)
esult, method, args...)
} }
} }

View File

@ -155,10 +155,8 @@ func TestAPI_GetAddressDetails(t *testing.T) {
c, err := rpc.NewClient(config) c, err := rpc.NewClient(config)
require.NoError(t, err) require.NoError(t, err)
chainClient, err := c.EthClient(chainID) _, err = c.EthClient(chainID)
require.NoError(t, err) require.NoError(t, err)
chainClient.SetWalletNotifier(func(chainID uint64, message string) {})
c.SetWalletNotifier(func(chainID uint64, message string) {})
service := NewService(db, accountsDb, appDB, c, accountFeed, nil, nil, nil, &params.NodeConfig{}, nil, nil, nil, nil, nil, "") service := NewService(db, accountsDb, appDB, c, accountFeed, nil, nil, nil, &params.NodeConfig{}, nil, nil, nil, nil, nil, "")

View File

@ -2,7 +2,8 @@ package market
import ( import (
"context" "context"
"sync" "encoding/json"
"github.com/status-im/status-go/healthmanager/rpcstatus"
"time" "time"
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
@ -10,12 +11,14 @@ import (
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/status-im/status-go/circuitbreaker" "github.com/status-im/status-go/circuitbreaker"
healthManager "github.com/status-im/status-go/healthmanager"
"github.com/status-im/status-go/services/wallet/thirdparty" "github.com/status-im/status-go/services/wallet/thirdparty"
"github.com/status-im/status-go/services/wallet/walletevent" "github.com/status-im/status-go/services/wallet/walletevent"
) )
const ( const (
EventMarketStatusChanged walletevent.EventType = "wallet-market-status-changed" EventMarketStatusChanged walletevent.EventType = "wallet-market-status-changed" // deprecated
EventMarketHealthChanged walletevent.EventType = "wallet-market-health-changed"
) )
const ( const (
@ -39,14 +42,13 @@ type TokenMarketCache MarketValuesPerCurrencyAndToken
type TokenPriceCache DataPerTokenAndCurrency type TokenPriceCache DataPerTokenAndCurrency
type Manager struct { type Manager struct {
feed *event.Feed feed *event.Feed
priceCache MarketCache[TokenPriceCache] priceCache MarketCache[TokenPriceCache]
marketCache MarketCache[TokenMarketCache] marketCache MarketCache[TokenMarketCache]
IsConnected bool circuitbreaker *circuitbreaker.CircuitBreaker
LastCheckedAt int64 healthManager *healthManager.ProvidersHealthManager
IsConnectedLock sync.RWMutex stopMonitoringFunc context.CancelFunc
circuitbreaker *circuitbreaker.CircuitBreaker providers []thirdparty.MarketDataProvider
providers []thirdparty.MarketDataProvider
} }
func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Manager { func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Manager {
@ -61,32 +63,12 @@ func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Ma
feed: feed, feed: feed,
priceCache: *NewCache(make(TokenPriceCache)), priceCache: *NewCache(make(TokenPriceCache)),
marketCache: *NewCache(make(TokenMarketCache)), marketCache: *NewCache(make(TokenMarketCache)),
IsConnected: true,
LastCheckedAt: time.Now().Unix(),
circuitbreaker: cb, circuitbreaker: cb,
providers: providers, providers: providers,
healthManager: healthManager.NewProvidersHealthManager(0),
} }
} }
func (pm *Manager) setIsConnected(value bool) {
pm.IsConnectedLock.Lock()
defer pm.IsConnectedLock.Unlock()
pm.LastCheckedAt = time.Now().Unix()
if value != pm.IsConnected {
message := "down"
if value {
message = "up"
}
pm.feed.Send(walletevent.Event{
Type: EventMarketStatusChanged,
Accounts: []common.Address{},
Message: message,
At: time.Now().Unix(),
})
}
pm.IsConnected = value
}
func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(provider thirdparty.MarketDataProvider) (interface{}, error)) (interface{}, error) { func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(provider thirdparty.MarketDataProvider) (interface{}, error)) (interface{}, error) {
cmd := circuitbreaker.NewCommand(context.Background(), nil) cmd := circuitbreaker.NewCommand(context.Background(), nil)
for _, provider := range providers { for _, provider := range providers {
@ -98,7 +80,10 @@ func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(pr
} }
result := pm.circuitbreaker.Execute(cmd) result := pm.circuitbreaker.Execute(cmd)
pm.setIsConnected(result.Error() == nil) if pm.healthManager != nil {
rpcCallStatuses := convertFunctorCallStatuses(result.FunctorCallStatuses())
pm.healthManager.Update(context.Background(), rpcCallStatuses)
}
if result.Error() != nil { if result.Error() != nil {
log.Error("Error fetching prices", "error", result.Error()) log.Error("Error fetching prices", "error", result.Error())
@ -108,6 +93,79 @@ func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(pr
return result.Result()[0], nil return result.Result()[0], nil
} }
func (pm *Manager) monitorHealth(ctx context.Context, statusCh chan struct{}) {
sendFullStatusEventFunc := func() {
if pm.feed == nil {
return
}
marketStatus := pm.healthManager.GetAggregatedProviderStatus()
encodedMessage, err := json.Marshal(marketStatus)
if err != nil {
log.Warn("could not marshal full market status", "error", err)
return
}
pm.feed.Send(walletevent.Event{
Type: EventMarketHealthChanged,
Message: string(encodedMessage),
At: time.Now().Unix(),
})
}
sendOldStatusEventFunc := func() {
if pm.feed == nil {
return
}
marketStatus := pm.healthManager.Status()
message := "down"
if marketStatus.Status == rpcstatus.StatusUp {
message = "up"
}
pm.feed.Send(walletevent.Event{
Type: EventMarketStatusChanged,
Accounts: []common.Address{},
Message: message,
At: time.Now().Unix(),
})
}
for {
select {
case <-ctx.Done():
return
case <-statusCh:
sendOldStatusEventFunc()
sendFullStatusEventFunc()
}
}
}
func (pm *Manager) Start(ctx context.Context) {
if pm.stopMonitoringFunc != nil {
log.Warn("Market health manager already started")
return
}
cancelableCtx, cancel := context.WithCancel(ctx)
pm.stopMonitoringFunc = cancel
statusCh := pm.healthManager.Subscribe()
go pm.monitorHealth(cancelableCtx, statusCh)
}
func (pm *Manager) Stop() {
if pm.stopMonitoringFunc == nil {
return
}
pm.stopMonitoringFunc()
pm.stopMonitoringFunc = nil
}
func (pm *Manager) IsConnected() bool {
return pm.healthManager.Status().Status == rpcstatus.StatusUp
}
func (pm *Manager) FetchHistoricalDailyPrices(symbol string, currency string, limit int, allData bool, aggregate int) ([]thirdparty.HistoricalPrice, error) { func (pm *Manager) FetchHistoricalDailyPrices(symbol string, currency string, limit int, allData bool, aggregate int) ([]thirdparty.HistoricalPrice, error) {
result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) { result, err := pm.makeCall(pm.providers, func(provider thirdparty.MarketDataProvider) (interface{}, error) {
return provider.FetchHistoricalDailyPrices(symbol, currency, limit, allData, aggregate) return provider.FetchHistoricalDailyPrices(symbol, currency, limit, allData, aggregate)
@ -341,3 +399,14 @@ func (pm *Manager) GetOrFetchPrices(symbols []string, currencies []string, maxAg
return prices, nil return prices, nil
} }
func convertFunctorCallStatuses(statuses []circuitbreaker.FunctorCallStatus) (result []rpcstatus.RpcProviderCallStatus) {
for _, f := range statuses {
result = append(result, rpcstatus.RpcProviderCallStatus{
Name: f.Name,
Timestamp: f.Timestamp,
Err: f.Err,
})
}
return
}

View File

@ -1,6 +1,7 @@
package market package market
import ( import (
"context"
"errors" "errors"
"testing" "testing"
"time" "time"
@ -59,6 +60,7 @@ func (s *MarketTestSuite) TestEventOnNetworkError() {
customErr := errors.New("dial tcp: lookup optimism-goerli.infura.io: no such host") customErr := errors.New("dial tcp: lookup optimism-goerli.infura.io: no such host")
priceProviderWithError := mock_market.NewMockPriceProviderWithError(ctrl, customErr) priceProviderWithError := mock_market.NewMockPriceProviderWithError(ctrl, customErr)
manager := NewManager([]thirdparty.MarketDataProvider{priceProviderWithError}, s.feedSub.GetFeed()) manager := NewManager([]thirdparty.MarketDataProvider{priceProviderWithError}, s.feedSub.GetFeed())
manager.Start(context.Background())
_, err := manager.FetchPrices(s.symbols, s.currencies) _, err := manager.FetchPrices(s.symbols, s.currencies)
s.Require().Error(err, "expected error from FetchPrices due to MockPriceProviderWithError") s.Require().Error(err, "expected error from FetchPrices due to MockPriceProviderWithError")

View File

@ -536,7 +536,7 @@ func (r *Reader) GetWalletToken(ctx context.Context, clients map[uint64]chain.Cl
ChangePct24hour: tokenMarketValues[tok.Symbol].CHANGEPCT24HOUR, ChangePct24hour: tokenMarketValues[tok.Symbol].CHANGEPCT24HOUR,
Change24hour: tokenMarketValues[tok.Symbol].CHANGE24HOUR, Change24hour: tokenMarketValues[tok.Symbol].CHANGE24HOUR,
Price: prices[tok.Symbol][currency].Price, Price: prices[tok.Symbol][currency].Price,
HasError: !r.marketManager.IsConnected, HasError: !r.marketManager.IsConnected(),
} }
} }

View File

@ -1,13 +1,12 @@
package wallet package wallet
import ( import (
"context"
"database/sql" "database/sql"
"encoding/json"
"fmt" "fmt"
"sync"
"time" "time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/log"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
@ -42,10 +41,6 @@ import (
"github.com/status-im/status-go/transactions" "github.com/status-im/status-go/transactions"
) )
const (
EventBlockchainStatusChanged walletevent.EventType = "wallet-blockchain-status-changed"
)
// NewService initializes service instance. // NewService initializes service instance.
func NewService( func NewService(
db *sql.DB, db *sql.DB,
@ -67,38 +62,6 @@ func NewService(
signals := &walletevent.SignalsTransmitter{ signals := &walletevent.SignalsTransmitter{
Publisher: feed, Publisher: feed,
} }
blockchainStatus := make(map[uint64]string)
mutex := sync.Mutex{}
rpcClient.SetWalletNotifier(func(chainID uint64, message string) {
mutex.Lock()
defer mutex.Unlock()
if len(blockchainStatus) == 0 {
networks, err := rpcClient.NetworkManager.Get(false)
if err != nil {
return
}
for _, network := range networks {
blockchainStatus[network.ChainID] = "up"
}
}
blockchainStatus[chainID] = message
encodedmessage, err := json.Marshal(blockchainStatus)
if err != nil {
return
}
feed.Send(walletevent.Event{
Type: EventBlockchainStatusChanged,
Accounts: []common.Address{},
Message: string(encodedmessage),
At: time.Now().Unix(),
ChainID: chainID,
})
})
communityManager := community.NewManager(db, mediaServer, feed) communityManager := community.NewManager(db, mediaServer, feed)
balanceCacher := balance.NewCacherWithTTL(5 * time.Minute) balanceCacher := balance.NewCacherWithTTL(5 * time.Minute)
tokenManager := token.NewTokenManager(db, rpcClient, communityManager, rpcClient.NetworkManager, appDB, mediaServer, feed, accountFeed, accountsDB, token.NewPersistence(db)) tokenManager := token.NewTokenManager(db, rpcClient, communityManager, rpcClient.NetworkManager, appDB, mediaServer, feed, accountFeed, accountsDB, token.NewPersistence(db))
@ -126,6 +89,8 @@ func NewService(
Password: config.WalletConfig.StatusProxyMarketPassword, Password: config.WalletConfig.StatusProxyMarketPassword,
}) })
marketManager := market.NewManager([]thirdparty.MarketDataProvider{cryptoCompare, coingecko, cryptoCompareProxy}, feed) marketManager := market.NewManager([]thirdparty.MarketDataProvider{cryptoCompare, coingecko, cryptoCompareProxy}, feed)
ctx := context.Background()
marketManager.Start(ctx)
reader := NewReader(tokenManager, marketManager, token.NewPersistence(db), feed) reader := NewReader(tokenManager, marketManager, token.NewPersistence(db), feed)
history := history.NewService(db, accountsDB, accountFeed, feed, rpcClient, tokenManager, marketManager, balanceCacher.Cache()) history := history.NewService(db, accountsDB, accountFeed, feed, rpcClient, tokenManager, marketManager, balanceCacher.Cache())
currency := currency.NewService(db, feed, tokenManager, marketManager) currency := currency.NewService(db, feed, tokenManager, marketManager)
@ -280,6 +245,7 @@ func (s *Service) Stop() error {
s.activity.Stop() s.activity.Stop()
s.collectibles.Stop() s.collectibles.Stop()
s.tokenManager.Stop() s.tokenManager.Stop()
s.marketManager.Stop()
s.started = false s.started = false
log.Info("wallet stopped") log.Info("wallet stopped")
return nil return nil

View File

@ -585,19 +585,6 @@ func (tc *TestClient) CallContext(ctx context.Context, result interface{}, metho
return err return err
} }
func (tc *TestClient) GetWalletNotifier() func(chainId uint64, message string) {
if tc.traceAPICalls {
tc.t.Log("GetWalletNotifier")
}
return nil
}
func (tc *TestClient) SetWalletNotifier(notifier func(chainId uint64, message string)) {
if tc.traceAPICalls {
tc.t.Log("SetWalletNotifier")
}
}
func (tc *TestClient) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) { func (tc *TestClient) EstimateGas(ctx context.Context, call ethereum.CallMsg) (gas uint64, err error) {
err = tc.countAndlog("EstimateGas") err = tc.countAndlog("EstimateGas")
return 0, err return 0, err