feat(wallet)_: use CircuitBreaker for blockhain RPC calls

fix usage of circuit breaker for collectibles and market data to
match the implementation
This commit is contained in:
Ivan Belyakov 2024-07-02 19:58:55 +02:00 committed by Andrea Maria Piana
parent 0e10b38e4b
commit a009855bbb
8 changed files with 334 additions and 343 deletions

File diff suppressed because it is too large Load Diff

View File

@ -12,6 +12,7 @@ import (
"sync"
"time"
"github.com/ethereum/go-ethereum/ethclient"
"github.com/ethereum/go-ethereum/log"
gethrpc "github.com/ethereum/go-ethereum/rpc"
@ -112,7 +113,11 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U
if err != nil {
return nil, fmt.Errorf("get RPC limiter: %s", err)
}
c.upstream = chain.NewSimpleClient(limiter, upstreamClient, upstreamChainID)
hostPortUpstream, err := extractHostAndPortFromURL(c.upstreamURL)
if err != nil {
hostPortUpstream = "upstream"
}
c.upstream = chain.NewSimpleClient(*chain.NewEthClient(ethclient.NewClient(upstreamClient), limiter, upstreamClient, hostPortUpstream), upstreamChainID)
}
c.router = newRouter(c.upstreamEnabled)
@ -140,6 +145,15 @@ func extractLastParamFromURL(inputURL string) (string, error) {
return lastSegment, nil
}
func extractHostAndPortFromURL(inputURL string) (string, error) {
parsedURL, err := url.Parse(inputURL)
if err != nil {
return "", err
}
return parsedURL.Host, nil
}
func (c *Client) getRPCRpsLimiter(URL string) (*chain.RPCRpsLimiter, error) {
apiKey, err := extractLastParamFromURL(URL)
if err != nil {
@ -183,6 +197,15 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err
return nil, fmt.Errorf("get RPC limiter: %s", err)
}
hostPortMain, err := extractHostAndPortFromURL(network.RPCURL)
if err != nil {
hostPortMain = "main"
}
ethClients := []*chain.EthClient{
chain.NewEthClient(ethclient.NewClient(rpcClient), rpcLimiter, rpcClient, hostPortMain),
}
var (
rpcFallbackClient *gethrpc.Client
rpcFallbackLimiter *chain.RPCRpsLimiter
@ -197,9 +220,15 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err
if err != nil {
return nil, fmt.Errorf("get RPC fallback limiter: %s", err)
}
hostPortFallback, err := extractHostAndPortFromURL(network.FallbackURL)
if err != nil {
hostPortFallback = "fallback"
}
ethClients = append(ethClients, chain.NewEthClient(ethclient.NewClient(rpcFallbackClient), rpcFallbackLimiter, rpcFallbackClient, hostPortFallback))
}
client := chain.NewClient(rpcLimiter, rpcClient, rpcFallbackLimiter, rpcFallbackClient, chainID)
client := chain.NewClient(ethClients, chainID)
client.WalletNotifier = c.walletNotifier
c.rpcClients[chainID] = client
return client, nil
@ -260,7 +289,11 @@ func (c *Client) UpdateUpstreamURL(url string) error {
return err
}
c.Lock()
c.upstream = chain.NewSimpleClient(rpsLimiter, rpcClient, c.UpstreamChainID)
hostPortUpstream, err := extractHostAndPortFromURL(url)
if err != nil {
hostPortUpstream = "upstream"
}
c.upstream = chain.NewSimpleClient(*chain.NewEthClient(ethclient.NewClient(rpcClient), rpsLimiter, rpcClient, hostPortUpstream), c.UpstreamChainID)
c.upstreamURL = url
c.Unlock()

View File

@ -64,10 +64,10 @@ type Manager struct {
mediaServer *server.MediaServer
statuses *sync.Map
statusNotifier *connection.StatusNotifier
feed *event.Feed
circuitBreakers sync.Map
statuses *sync.Map
statusNotifier *connection.StatusNotifier
feed *event.Feed
circuitBreaker *circuitbreaker.CircuitBreaker
}
func NewManager(
@ -81,6 +81,14 @@ func NewManager(
ownershipDB := NewOwnershipDB(db)
statuses := initStatuses(ownershipDB)
cb := circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{
Timeout: 10000,
MaxConcurrentRequests: 100,
RequestVolumeThreshold: 25,
SleepWindow: 300000,
ErrorPercentThreshold: 25,
})
return &Manager{
rpcClient: rpcClient,
providers: providers,
@ -95,6 +103,7 @@ func NewManager(
statuses: statuses,
statusNotifier: createStatusNotifier(statuses, feed),
feed: feed,
circuitBreaker: cb,
}
}
@ -202,7 +211,7 @@ func (o *Manager) FetchBalancesByOwnerAndContractAddress(ctx context.Context, ch
func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(ctx context.Context, chainID walletCommon.ChainID, owner common.Address, contractAddresses []common.Address, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) {
defer o.checkConnectionStatus(chainID)
cmd := circuitbreaker.Command{}
cmd := circuitbreaker.NewCommand(ctx, nil)
for _, provider := range o.providers.AccountOwnershipProviders {
if !provider.IsChainSupported(chainID) {
continue
@ -219,7 +228,7 @@ func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(ctx context.Context, c
log.Error("FetchAllAssetsByOwnerAndContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err)
}
return []interface{}{assetContainer}, err
},
}, getCircuitName(provider, chainID),
)
cmd.Add(f)
}
@ -228,7 +237,7 @@ func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(ctx context.Context, c
return nil, ErrNoProvidersAvailableForChainID
}
cmdRes := o.getCircuitBreaker(chainID).Execute(cmd)
cmdRes := o.circuitBreaker.Execute(cmd)
if cmdRes.Error() != nil {
log.Error("FetchAllAssetsByOwnerAndContractAddress failed for", "chainID", chainID, "err", cmdRes.Error())
return nil, cmdRes.Error()
@ -246,7 +255,7 @@ func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(ctx context.Context, c
func (o *Manager) FetchAllAssetsByOwner(ctx context.Context, chainID walletCommon.ChainID, owner common.Address, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) {
defer o.checkConnectionStatus(chainID)
cmd := circuitbreaker.Command{}
cmd := circuitbreaker.NewCommand(ctx, nil)
for _, provider := range o.providers.AccountOwnershipProviders {
if !provider.IsChainSupported(chainID) {
continue
@ -263,7 +272,7 @@ func (o *Manager) FetchAllAssetsByOwner(ctx context.Context, chainID walletCommo
log.Error("FetchAllAssetsByOwner failed for", "provider", provider.ID(), "chainID", chainID, "err", err)
}
return []interface{}{assetContainer}, err
},
}, getCircuitName(provider, chainID),
)
cmd.Add(f)
}
@ -272,7 +281,7 @@ func (o *Manager) FetchAllAssetsByOwner(ctx context.Context, chainID walletCommo
return nil, ErrNoProvidersAvailableForChainID
}
cmdRes := o.getCircuitBreaker(chainID).Execute(cmd)
cmdRes := o.circuitBreaker.Execute(cmd)
if cmdRes.Error() != nil {
log.Error("FetchAllAssetsByOwner failed for", "chainID", chainID, "err", cmdRes.Error())
return nil, cmdRes.Error()
@ -439,7 +448,7 @@ func (o *Manager) FetchMissingAssetsByCollectibleUniqueID(ctx context.Context, u
}
func (o *Manager) fetchMissingAssetsForChainByCollectibleUniqueID(ctx context.Context, chainID walletCommon.ChainID, idsToFetch []thirdparty.CollectibleUniqueID) ([]thirdparty.FullCollectibleData, error) {
cmd := circuitbreaker.Command{}
cmd := circuitbreaker.NewCommand(ctx, nil)
for _, provider := range o.providers.CollectibleDataProviders {
if !provider.IsChainSupported(chainID) {
continue
@ -453,14 +462,14 @@ func (o *Manager) fetchMissingAssetsForChainByCollectibleUniqueID(ctx context.Co
}
return []any{fetchedAssets}, err
}))
}, getCircuitName(provider, chainID)))
}
if cmd.IsEmpty() {
return nil, ErrNoProvidersAvailableForChainID // lets not stop the group if no providers are available for the chain
}
cmdRes := o.getCircuitBreaker(chainID).Execute(cmd)
cmdRes := o.circuitBreaker.Execute(cmd)
if cmdRes.Error() != nil {
log.Error("fetchMissingAssetsForChainByCollectibleUniqueID failed for", "chainID", chainID, "err", cmdRes.Error())
return nil, cmdRes.Error()
@ -482,7 +491,7 @@ func (o *Manager) FetchCollectionsDataByContractID(ctx context.Context, ids []th
group.Add(func(ctx context.Context) error {
defer o.checkConnectionStatus(chainID)
cmd := circuitbreaker.Command{}
cmd := circuitbreaker.NewCommand(ctx, nil)
for _, provider := range o.providers.CollectionDataProviders {
if !provider.IsChainSupported(chainID) {
continue
@ -492,14 +501,14 @@ func (o *Manager) FetchCollectionsDataByContractID(ctx context.Context, ids []th
cmd.Add(circuitbreaker.NewFunctor(func() ([]any, error) {
fetchedCollections, err := provider.FetchCollectionsDataByContractID(ctx, idsToFetch)
return []any{fetchedCollections}, err
}))
}, getCircuitName(provider, chainID)))
}
if cmd.IsEmpty() {
return nil
}
cmdRes := o.getCircuitBreaker(chainID).Execute(cmd)
cmdRes := o.circuitBreaker.Execute(cmd)
if cmdRes.Error() != nil {
log.Error("FetchCollectionsDataByContractID failed for", "chainID", chainID, "err", cmdRes.Error())
return cmdRes.Error()
@ -536,7 +545,7 @@ func (o *Manager) GetCollectibleOwnership(id thirdparty.CollectibleUniqueID) ([]
func (o *Manager) FetchCollectibleOwnersByContractAddress(ctx context.Context, chainID walletCommon.ChainID, contractAddress common.Address) (*thirdparty.CollectibleContractOwnership, error) {
defer o.checkConnectionStatus(chainID)
cmd := circuitbreaker.Command{}
cmd := circuitbreaker.NewCommand(ctx, nil)
for _, provider := range o.providers.ContractOwnershipProviders {
if !provider.IsChainSupported(chainID) {
continue
@ -549,14 +558,14 @@ func (o *Manager) FetchCollectibleOwnersByContractAddress(ctx context.Context, c
log.Error("FetchCollectibleOwnersByContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err)
}
return []any{res}, err
}))
}, getCircuitName(provider, chainID)))
}
if cmd.IsEmpty() {
return nil, ErrNoProvidersAvailableForChainID
}
cmdRes := o.getCircuitBreaker(chainID).Execute(cmd)
cmdRes := o.circuitBreaker.Execute(cmd)
if cmdRes.Error() != nil {
log.Error("FetchCollectibleOwnersByContractAddress failed for", "chainID", chainID, "err", cmdRes.Error())
return nil, cmdRes.Error()
@ -969,22 +978,6 @@ func (o *Manager) signalUpdatedCollectiblesData(ids []thirdparty.CollectibleUniq
}
}
func (o *Manager) getCircuitBreaker(chainID walletCommon.ChainID) *circuitbreaker.CircuitBreaker {
cb, ok := o.circuitBreakers.Load(chainID.String())
if !ok {
cb = circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{
CommandName: chainID.String(),
Timeout: 10000,
MaxConcurrentRequests: 100,
RequestVolumeThreshold: 25,
SleepWindow: 300000,
ErrorPercentThreshold: 25,
})
o.circuitBreakers.Store(chainID.String(), cb)
}
return cb.(*circuitbreaker.CircuitBreaker)
}
func (o *Manager) SearchCollectibles(ctx context.Context, chainID walletCommon.ChainID, text string, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) {
defer o.checkConnectionStatus(chainID)
@ -1100,7 +1093,7 @@ func (o *Manager) getOrFetchSocialsForCollection(_ context.Context, contractID t
}
func (o *Manager) fetchSocialsForCollection(ctx context.Context, contractID thirdparty.ContractID) (*thirdparty.CollectionSocials, error) {
cmd := circuitbreaker.Command{}
cmd := circuitbreaker.NewCommand(ctx, nil)
for _, provider := range o.providers.CollectibleDataProviders {
if !provider.IsChainSupported(contractID.ChainID) {
continue
@ -1113,14 +1106,14 @@ func (o *Manager) fetchSocialsForCollection(ctx context.Context, contractID thir
log.Error("FetchCollectionSocials failed for", "provider", provider.ID(), "chainID", contractID.ChainID, "err", err)
}
return []interface{}{socials}, err
}))
}, getCircuitName(provider, contractID.ChainID)))
}
if cmd.IsEmpty() {
return nil, ErrNoProvidersAvailableForChainID // lets not stop the group if no providers are available for the chain
}
cmdRes := o.getCircuitBreaker(contractID.ChainID).Execute(cmd)
cmdRes := o.circuitBreaker.Execute(cmd)
if cmdRes.Error() != nil {
log.Error("fetchSocialsForCollection failed for", "chainID", contractID.ChainID, "err", cmdRes.Error())
return nil, cmdRes.Error()
@ -1163,3 +1156,9 @@ func createStatusNotifier(statuses *sync.Map, feed *event.Feed) *connection.Stat
feed,
)
}
// Different providers have API keys per chain or per testnet/mainnet.
// Proper implementation should respect that. For now, the safest solution is to use the provider ID and chain ID as the key.
func getCircuitName(provider thirdparty.CollectibleProvider, chainID walletCommon.ChainID) string {
return provider.ID() + chainID.String()
}

View File

@ -1,6 +1,7 @@
package market
import (
"context"
"sync"
"time"
@ -37,7 +38,6 @@ type Manager struct {
func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Manager {
cb := circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{
CommandName: "marketClient",
Timeout: 10000,
MaxConcurrentRequests: 100,
SleepWindow: 300000,
@ -74,13 +74,13 @@ func (pm *Manager) setIsConnected(value bool) {
}
func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(provider thirdparty.MarketDataProvider) (interface{}, error)) (interface{}, error) {
cmd := circuitbreaker.Command{}
cmd := circuitbreaker.NewCommand(context.Background(), nil)
for _, provider := range providers {
provider := provider
cmd.Add(circuitbreaker.NewFunctor(func() ([]interface{}, error) {
result, err := f(provider)
return []interface{}{result}, err
}))
}, provider.ID()))
}
result := pm.circuitbreaker.Execute(cmd)

View File

@ -36,6 +36,10 @@ func (mpp *MockPriceProvider) FetchTokenDetails(symbols []string) (map[string]th
return nil, errors.New("not implmented")
}
func (mpp *MockPriceProvider) ID() string {
return "MockPriceProvider"
}
func (mpp *MockPriceProvider) FetchPrices(symbols []string, currencies []string) (map[string]map[string]float64, error) {
res := make(map[string]map[string]float64)
for _, symbol := range symbols {

View File

@ -364,3 +364,7 @@ func (c *Client) FetchHistoricalDailyPrices(symbol string, currency string, limi
return result, nil
}
func (c *Client) ID() string {
return "coingecko"
}

View File

@ -198,3 +198,7 @@ func (c *Client) FetchHistoricalDailyPrices(symbol string, currency string, limi
return item, nil
}
func (c *Client) ID() string {
return "cryptocompare"
}

View File

@ -29,6 +29,7 @@ type TokenDetails struct {
}
type MarketDataProvider interface {
ID() string
FetchPrices(symbols []string, currencies []string) (map[string]map[string]float64, error)
FetchHistoricalDailyPrices(symbol string, currency string, limit int, allData bool, aggregate int) ([]HistoricalPrice, error)
FetchHistoricalHourlyPrices(symbol string, currency string, limit int, aggregate int) ([]HistoricalPrice, error)