From a009855bbb21c55025c6602a67ea7495a185bfd2 Mon Sep 17 00:00:00 2001 From: Ivan Belyakov Date: Tue, 2 Jul 2024 19:58:55 +0200 Subject: [PATCH] feat(wallet)_: use CircuitBreaker for blockhain RPC calls fix usage of circuit breaker for collectibles and market data to match the implementation --- rpc/chain/client.go | 544 ++++++++---------- rpc/client.go | 39 +- services/wallet/collectibles/manager.go | 75 ++- services/wallet/market/market.go | 6 +- services/wallet/market/market_test.go | 4 + .../wallet/thirdparty/coingecko/client.go | 4 + .../wallet/thirdparty/cryptocompare/client.go | 4 + services/wallet/thirdparty/types.go | 1 + 8 files changed, 334 insertions(+), 343 deletions(-) diff --git a/rpc/chain/client.go b/rpc/chain/client.go index 401c7d25e..a1c4dfa01 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -9,8 +9,6 @@ import ( "sync/atomic" "time" - "github.com/afex/hystrix-go/hystrix" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/common" @@ -19,6 +17,7 @@ import ( "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" + "github.com/status-im/status-go/circuitbreaker" "github.com/status-im/status-go/services/rpcstats" "github.com/status-im/status-go/services/wallet/connection" ) @@ -87,25 +86,35 @@ func ClientWithTag(chainClient ClientInterface, tag, groupTag string) ClientInte return newClient } -type ClientWithFallback struct { - ChainID uint64 - main *ethclient.Client - fallback *ethclient.Client - mainLimiter *RPCRpsLimiter - fallbackLimiter *RPCRpsLimiter - commonLimiter RequestLimiter +type EthClient struct { + ethClient *ethclient.Client + limiter *RPCRpsLimiter + rpcClient *rpc.Client + name string +} - mainRPC *rpc.Client - fallbackRPC *rpc.Client +func NewEthClient(ethClient *ethclient.Client, limiter *RPCRpsLimiter, rpcClient *rpc.Client, name string) *EthClient { + return &EthClient{ + ethClient: ethClient, + limiter: limiter, + rpcClient: rpcClient, + name: name, + } +} + +type ClientWithFallback struct { + ChainID uint64 + ethClients []*EthClient + commonLimiter RequestLimiter + circuitbreaker *circuitbreaker.CircuitBreaker WalletNotifier func(chainId uint64, message string) isConnected *atomic.Bool LastCheckedAt int64 - circuitBreakerCmdName string - tag string // tag for the limiter - groupTag string // tag for the limiter group + tag string // tag for the limiter + groupTag string // tag for the limiter group } // Don't mark connection as failed if we get one of these errors @@ -129,70 +138,48 @@ var propagateErrors = []error{ bind.ErrNoCode, } -type CommandResult struct { - res []any - err error -} - -func NewSimpleClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, chainID uint64) *ClientWithFallback { - circuitBreakerCmdName := fmt.Sprintf("ethClient_%d", chainID) - hystrix.ConfigureCommand(circuitBreakerCmdName, hystrix.CommandConfig{ - Timeout: 10000, - MaxConcurrentRequests: 100, - SleepWindow: 300000, - ErrorPercentThreshold: 25, - }) - - isConnected := &atomic.Bool{} - isConnected.Store(true) - return &ClientWithFallback{ - ChainID: chainID, - main: ethclient.NewClient(main), - fallback: nil, - mainLimiter: mainLimiter, - fallbackLimiter: nil, - mainRPC: main, - fallbackRPC: nil, - isConnected: isConnected, - LastCheckedAt: time.Now().Unix(), - circuitBreakerCmdName: circuitBreakerCmdName, - } -} - -func NewClient(mainLimiter *RPCRpsLimiter, main *rpc.Client, fallbackLimiter *RPCRpsLimiter, fallback *rpc.Client, chainID uint64) *ClientWithFallback { - circuitBreakerCmdName := fmt.Sprintf("ethClient_%d", chainID) - hystrix.ConfigureCommand(circuitBreakerCmdName, hystrix.CommandConfig{ +func NewSimpleClient(ethClient EthClient, chainID uint64) *ClientWithFallback { + cbConfig := circuitbreaker.Config{ Timeout: 20000, MaxConcurrentRequests: 100, SleepWindow: 300000, ErrorPercentThreshold: 25, - }) - - var fallbackEthClient *ethclient.Client - if fallback != nil { - fallbackEthClient = ethclient.NewClient(fallback) } + + isConnected := &atomic.Bool{} + isConnected.Store(true) + return &ClientWithFallback{ + ChainID: chainID, + ethClients: []*EthClient{ðClient}, + isConnected: isConnected, + LastCheckedAt: time.Now().Unix(), + circuitbreaker: circuitbreaker.NewCircuitBreaker(cbConfig), + } +} + +func NewClient(ethClients []*EthClient, chainID uint64) *ClientWithFallback { + cbConfig := circuitbreaker.Config{ + Timeout: 20000, + MaxConcurrentRequests: 100, + SleepWindow: 300000, + ErrorPercentThreshold: 25, + } + isConnected := &atomic.Bool{} isConnected.Store(true) return &ClientWithFallback{ - ChainID: chainID, - main: ethclient.NewClient(main), - fallback: fallbackEthClient, - mainLimiter: mainLimiter, - fallbackLimiter: fallbackLimiter, - mainRPC: main, - fallbackRPC: fallback, - isConnected: isConnected, - LastCheckedAt: time.Now().Unix(), - circuitBreakerCmdName: circuitBreakerCmdName, + ChainID: chainID, + ethClients: ethClients, + isConnected: isConnected, + LastCheckedAt: time.Now().Unix(), + circuitbreaker: circuitbreaker.NewCircuitBreaker(cbConfig), } } func (c *ClientWithFallback) Close() { - c.main.Close() - if c.fallback != nil { - c.fallback.Close() + for _, client := range c.ethClients { + client.ethClient.Close() } } @@ -238,7 +225,7 @@ func (c *ClientWithFallback) IsConnected() bool { return c.isConnected.Load() } -func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, error), fallback func() ([]any, error)) ([]any, error) { +func (c *ClientWithFallback) makeCall(ctx context.Context, ethClients []*EthClient, f func(client *EthClient) (interface{}, error)) (interface{}, error) { if c.commonLimiter != nil { if allow, err := c.commonLimiter.Allow(c.tag); !allow { return nil, fmt.Errorf("tag=%s, %w", c.tag, err) @@ -249,98 +236,58 @@ func (c *ClientWithFallback) makeCall(ctx context.Context, main func() ([]any, e } } - resultChan := make(chan CommandResult, 1) c.LastCheckedAt = time.Now().Unix() - errChan := hystrix.Go(c.circuitBreakerCmdName, func() error { - err := c.mainLimiter.WaitForRequestsAvailability(1) - if err != nil { - return err - } - - res, err := main() - if err != nil { - if isRPSLimitError(err) { - c.mainLimiter.ReduceLimit() - - err = c.mainLimiter.WaitForRequestsAvailability(1) - if err != nil { - return err - } - - res, err = main() - if err == nil { - resultChan <- CommandResult{res: res} - return nil - } + cmd := circuitbreaker.NewCommand(ctx, nil) + for _, provider := range ethClients { + provider := provider + cmd.Add(circuitbreaker.NewFunctor(func() ([]interface{}, error) { + err := provider.limiter.WaitForRequestsAvailability(1) + if err != nil { + return nil, err } - if isVMError(err) { - resultChan <- CommandResult{err: err} - return nil - } + res, err := f(provider) + if err != nil { + if isRPSLimitError(err) { + provider.limiter.ReduceLimit() - return err - } - resultChan <- CommandResult{res: res} - return nil - }, func(err error) error { - if c.fallback == nil { - return err - } + err = provider.limiter.WaitForRequestsAvailability(1) + if err != nil { + return nil, err + } - err = c.fallbackLimiter.WaitForRequestsAvailability(1) - if err != nil { - return err - } - - res, err := fallback() - if err != nil { - if isRPSLimitError(err) { - c.fallbackLimiter.ReduceLimit() - - err = c.fallbackLimiter.WaitForRequestsAvailability(1) - if err != nil { - return err + res, err = f(provider) + if err == nil { + return []interface{}{res}, err + } } - res, err = fallback() - if err == nil { - resultChan <- CommandResult{res: res} - return nil + if isVMError(err) { + cmd.Cancel() } + return nil, err } - - if isVMError(err) { - resultChan <- CommandResult{err: err} - return nil - } - - return err - } - resultChan <- CommandResult{res: res} - return nil - }) - - select { - case result := <-resultChan: - if result.err != nil { - return nil, result.err - } - return result.res, nil - case err := <-errChan: - return nil, err + return []interface{}{res}, err + }, provider.name)) } + + result := c.circuitbreaker.Execute(cmd) + if result.Error() != nil { + return nil, result.Error() + } + + return result.Result()[0], nil } func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash) (*types.Block, error) { rpcstats.CountCallWithTag("eth_BlockByHash", c.tag) res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.BlockByHash(ctx, hash); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.BlockByHash(ctx, hash); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.BlockByHash(ctx, hash) + }, ) c.toggleConnectionState(err) @@ -349,15 +296,15 @@ func (c *ClientWithFallback) BlockByHash(ctx context.Context, hash common.Hash) return nil, err } - return res[0].(*types.Block), nil + return res.(*types.Block), nil } func (c *ClientWithFallback) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { rpcstats.CountCallWithTag("eth_BlockByNumber", c.tag) res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.BlockByNumber(ctx, number); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.BlockByNumber(ctx, number); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.BlockByNumber(ctx, number) + }, ) c.toggleConnectionState(err) @@ -366,16 +313,16 @@ func (c *ClientWithFallback) BlockByNumber(ctx context.Context, number *big.Int) return nil, err } - return res[0].(*types.Block), nil + return res.(*types.Block), nil } func (c *ClientWithFallback) BlockNumber(ctx context.Context) (uint64, error) { rpcstats.CountCallWithTag("eth_BlockNumber", c.tag) res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.BlockNumber(ctx); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.BlockNumber(ctx); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.BlockNumber(ctx) + }, ) c.toggleConnectionState(err) @@ -384,16 +331,16 @@ func (c *ClientWithFallback) BlockNumber(ctx context.Context) (uint64, error) { return 0, err } - return res[0].(uint64), nil + return res.(uint64), nil } func (c *ClientWithFallback) PeerCount(ctx context.Context) (uint64, error) { rpcstats.CountCallWithTag("eth_PeerCount", c.tag) res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.PeerCount(ctx); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.PeerCount(ctx); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.PeerCount(ctx) + }, ) c.toggleConnectionState(err) @@ -402,79 +349,84 @@ func (c *ClientWithFallback) PeerCount(ctx context.Context) (uint64, error) { return 0, err } - return res[0].(uint64), nil + return res.(uint64), nil } func (c *ClientWithFallback) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { rpcstats.CountCallWithTag("eth_HeaderByHash", c.tag) res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.HeaderByHash(ctx, hash); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.HeaderByHash(ctx, hash); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.HeaderByHash(ctx, hash) + }, ) + c.toggleConnectionState(err) + if err != nil { return nil, err } - return res[0].(*types.Header), nil + return res.(*types.Header), nil } func (c *ClientWithFallback) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { rpcstats.CountCallWithTag("eth_HeaderByNumber", c.tag) res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.HeaderByNumber(ctx, number); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.HeaderByNumber(ctx, number); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.HeaderByNumber(ctx, number) + }, ) + c.toggleConnectionState(err) + if err != nil { return nil, err } - return res[0].(*types.Header), nil + return res.(*types.Header), nil } func (c *ClientWithFallback) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) { rpcstats.CountCallWithTag("eth_TransactionByHash", c.tag) res, err := c.makeCall( - ctx, - func() ([]any, error) { a, b, err := c.main.TransactionByHash(ctx, hash); return []any{a, b}, err }, - func() ([]any, error) { a, b, err := c.fallback.TransactionByHash(ctx, hash); return []any{a, b}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + tx, isPending, err := client.ethClient.TransactionByHash(ctx, hash) + return []any{tx, isPending}, err + }, ) + c.toggleConnectionState(err) + if err != nil { return nil, false, err } - return res[0].(*types.Transaction), res[1].(bool), nil + resArr := res.([]any) + return resArr[0].(*types.Transaction), resArr[1].(bool), nil } func (c *ClientWithFallback) TransactionSender(ctx context.Context, tx *types.Transaction, block common.Hash, index uint) (common.Address, error) { rpcstats.CountCall("eth_TransactionSender") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.TransactionSender(ctx, tx, block, index); return []any{a}, err }, - func() ([]any, error) { - a, err := c.fallback.TransactionSender(ctx, tx, block, index) - return []any{a}, err + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.TransactionSender(ctx, tx, block, index) }, ) c.toggleConnectionState(err) - return res[0].(common.Address), err + return res.(common.Address), err } func (c *ClientWithFallback) TransactionCount(ctx context.Context, blockHash common.Hash) (uint, error) { rpcstats.CountCall("eth_TransactionCount") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.TransactionCount(ctx, blockHash); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.TransactionCount(ctx, blockHash); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.TransactionCount(ctx, blockHash) + }, ) c.toggleConnectionState(err) @@ -483,21 +435,15 @@ func (c *ClientWithFallback) TransactionCount(ctx context.Context, blockHash com return 0, err } - return res[0].(uint), nil + return res.(uint), nil } func (c *ClientWithFallback) TransactionInBlock(ctx context.Context, blockHash common.Hash, index uint) (*types.Transaction, error) { rpcstats.CountCall("eth_TransactionInBlock") res, err := c.makeCall( - ctx, - func() ([]any, error) { - a, err := c.main.TransactionInBlock(ctx, blockHash, index) - return []any{a}, err - }, - func() ([]any, error) { - a, err := c.fallback.TransactionInBlock(ctx, blockHash, index) - return []any{a}, err + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.TransactionInBlock(ctx, blockHash, index) }, ) @@ -507,16 +453,16 @@ func (c *ClientWithFallback) TransactionInBlock(ctx context.Context, blockHash c return nil, err } - return res[0].(*types.Transaction), nil + return res.(*types.Transaction), nil } func (c *ClientWithFallback) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { rpcstats.CountCall("eth_TransactionReceipt") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.TransactionReceipt(ctx, txHash); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.TransactionReceipt(ctx, txHash); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.TransactionReceipt(ctx, txHash) + }, ) c.toggleConnectionState(err) @@ -525,16 +471,16 @@ func (c *ClientWithFallback) TransactionReceipt(ctx context.Context, txHash comm return nil, err } - return res[0].(*types.Receipt), nil + return res.(*types.Receipt), nil } func (c *ClientWithFallback) SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error) { rpcstats.CountCall("eth_SyncProgress") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.SyncProgress(ctx); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.SyncProgress(ctx); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.SyncProgress(ctx) + }, ) c.toggleConnectionState(err) @@ -543,16 +489,16 @@ func (c *ClientWithFallback) SyncProgress(ctx context.Context) (*ethereum.SyncPr return nil, err } - return res[0].(*ethereum.SyncProgress), nil + return res.(*ethereum.SyncProgress), nil } func (c *ClientWithFallback) SubscribeNewHead(ctx context.Context, ch chan<- *types.Header) (ethereum.Subscription, error) { rpcstats.CountCall("eth_SubscribeNewHead") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.SubscribeNewHead(ctx, ch); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.SubscribeNewHead(ctx, ch); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.SubscribeNewHead(ctx, ch) + }, ) c.toggleConnectionState(err) @@ -561,7 +507,7 @@ func (c *ClientWithFallback) SubscribeNewHead(ctx context.Context, ch chan<- *ty return nil, err } - return res[0].(ethereum.Subscription), nil + return res.(ethereum.Subscription), nil } func (c *ClientWithFallback) NetworkID() uint64 { @@ -572,9 +518,9 @@ func (c *ClientWithFallback) BalanceAt(ctx context.Context, account common.Addre rpcstats.CountCallWithTag("eth_BalanceAt", c.tag) res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.BalanceAt(ctx, account, blockNumber); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.BalanceAt(ctx, account, blockNumber); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.BalanceAt(ctx, account, blockNumber) + }, ) c.toggleConnectionState(err) @@ -583,21 +529,15 @@ func (c *ClientWithFallback) BalanceAt(ctx context.Context, account common.Addre return nil, err } - return res[0].(*big.Int), nil + return res.(*big.Int), nil } func (c *ClientWithFallback) StorageAt(ctx context.Context, account common.Address, key common.Hash, blockNumber *big.Int) ([]byte, error) { rpcstats.CountCall("eth_StorageAt") res, err := c.makeCall( - ctx, - func() ([]any, error) { - a, err := c.main.StorageAt(ctx, account, key, blockNumber) - return []any{a}, err - }, - func() ([]any, error) { - a, err := c.fallback.StorageAt(ctx, account, key, blockNumber) - return []any{a}, err + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.StorageAt(ctx, account, key, blockNumber) }, ) @@ -607,16 +547,16 @@ func (c *ClientWithFallback) StorageAt(ctx context.Context, account common.Addre return nil, err } - return res[0].([]byte), nil + return res.([]byte), nil } func (c *ClientWithFallback) CodeAt(ctx context.Context, account common.Address, blockNumber *big.Int) ([]byte, error) { rpcstats.CountCall("eth_CodeAt") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.CodeAt(ctx, account, blockNumber); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.CodeAt(ctx, account, blockNumber); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.CodeAt(ctx, account, blockNumber) + }, ) c.toggleConnectionState(err) @@ -625,16 +565,16 @@ func (c *ClientWithFallback) CodeAt(ctx context.Context, account common.Address, return nil, err } - return res[0].([]byte), nil + return res.([]byte), nil } func (c *ClientWithFallback) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { rpcstats.CountCallWithTag("eth_NonceAt", c.tag) res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.NonceAt(ctx, account, blockNumber); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.NonceAt(ctx, account, blockNumber); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.NonceAt(ctx, account, blockNumber) + }, ) c.toggleConnectionState(err) @@ -643,34 +583,46 @@ func (c *ClientWithFallback) NonceAt(ctx context.Context, account common.Address return 0, err } - return res[0].(uint64), nil + return res.(uint64), nil } func (c *ClientWithFallback) FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) { rpcstats.CountCallWithTag("eth_FilterLogs", c.tag) + // Override providers name to use a separate circuit for this command as it more often fails due to rate limiting + ethClients := make([]*EthClient, len(c.ethClients)) + for i, client := range c.ethClients { + ethClients[i] = &EthClient{ + ethClient: client.ethClient, + limiter: client.limiter, + rpcClient: client.rpcClient, + name: client.name + "_FilterLogs", + } + } + res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.FilterLogs(ctx, q); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.FilterLogs(ctx, q); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.FilterLogs(ctx, q) + }, ) - c.toggleConnectionState(err) + // No connection state toggling here, as it often mail fail due to archive node rate limiting + // which does not impact other calls if err != nil { return nil, err } - return res[0].([]types.Log), nil + return res.([]types.Log), nil } func (c *ClientWithFallback) SubscribeFilterLogs(ctx context.Context, q ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { rpcstats.CountCall("eth_SubscribeFilterLogs") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.SubscribeFilterLogs(ctx, q, ch); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.SubscribeFilterLogs(ctx, q, ch); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.SubscribeFilterLogs(ctx, q, ch) + }, ) c.toggleConnectionState(err) @@ -679,16 +631,16 @@ func (c *ClientWithFallback) SubscribeFilterLogs(ctx context.Context, q ethereum return nil, err } - return res[0].(ethereum.Subscription), nil + return res.(ethereum.Subscription), nil } func (c *ClientWithFallback) PendingBalanceAt(ctx context.Context, account common.Address) (*big.Int, error) { rpcstats.CountCall("eth_PendingBalanceAt") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.PendingBalanceAt(ctx, account); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.PendingBalanceAt(ctx, account); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.PendingBalanceAt(ctx, account) + }, ) c.toggleConnectionState(err) @@ -697,16 +649,16 @@ func (c *ClientWithFallback) PendingBalanceAt(ctx context.Context, account commo return nil, err } - return res[0].(*big.Int), nil + return res.(*big.Int), nil } func (c *ClientWithFallback) PendingStorageAt(ctx context.Context, account common.Address, key common.Hash) ([]byte, error) { rpcstats.CountCall("eth_PendingStorageAt") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.PendingStorageAt(ctx, account, key); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.PendingStorageAt(ctx, account, key); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.PendingStorageAt(ctx, account, key) + }, ) c.toggleConnectionState(err) @@ -715,16 +667,16 @@ func (c *ClientWithFallback) PendingStorageAt(ctx context.Context, account commo return nil, err } - return res[0].([]byte), nil + return res.([]byte), nil } func (c *ClientWithFallback) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) { rpcstats.CountCall("eth_PendingCodeAt") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.PendingCodeAt(ctx, account); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.PendingCodeAt(ctx, account); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.PendingCodeAt(ctx, account) + }, ) c.toggleConnectionState(err) @@ -733,16 +685,16 @@ func (c *ClientWithFallback) PendingCodeAt(ctx context.Context, account common.A return nil, err } - return res[0].([]byte), nil + return res.([]byte), nil } func (c *ClientWithFallback) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { rpcstats.CountCall("eth_PendingNonceAt") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.PendingNonceAt(ctx, account); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.PendingNonceAt(ctx, account); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.PendingNonceAt(ctx, account) + }, ) c.toggleConnectionState(err) @@ -751,16 +703,16 @@ func (c *ClientWithFallback) PendingNonceAt(ctx context.Context, account common. return 0, err } - return res[0].(uint64), nil + return res.(uint64), nil } func (c *ClientWithFallback) PendingTransactionCount(ctx context.Context) (uint, error) { rpcstats.CountCall("eth_PendingTransactionCount") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.PendingTransactionCount(ctx); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.PendingTransactionCount(ctx); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.PendingTransactionCount(ctx) + }, ) c.toggleConnectionState(err) @@ -769,16 +721,16 @@ func (c *ClientWithFallback) PendingTransactionCount(ctx context.Context) (uint, return 0, err } - return res[0].(uint), nil + return res.(uint), nil } func (c *ClientWithFallback) CallContract(ctx context.Context, msg ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { rpcstats.CountCall("eth_CallContract_" + msg.To.String()) res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.CallContract(ctx, msg, blockNumber); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.CallContract(ctx, msg, blockNumber); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.CallContract(ctx, msg, blockNumber) + }, ) c.toggleConnectionState(err) @@ -787,18 +739,15 @@ func (c *ClientWithFallback) CallContract(ctx context.Context, msg ethereum.Call return nil, err } - return res[0].([]byte), nil + return res.([]byte), nil } func (c *ClientWithFallback) CallContractAtHash(ctx context.Context, msg ethereum.CallMsg, blockHash common.Hash) ([]byte, error) { rpcstats.CountCall("eth_CallContractAtHash") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.CallContractAtHash(ctx, msg, blockHash); return []any{a}, err }, - func() ([]any, error) { - a, err := c.fallback.CallContractAtHash(ctx, msg, blockHash) - return []any{a}, err + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.CallContractAtHash(ctx, msg, blockHash) }, ) @@ -808,16 +757,16 @@ func (c *ClientWithFallback) CallContractAtHash(ctx context.Context, msg ethereu return nil, err } - return res[0].([]byte), nil + return res.([]byte), nil } func (c *ClientWithFallback) PendingCallContract(ctx context.Context, msg ethereum.CallMsg) ([]byte, error) { rpcstats.CountCall("eth_PendingCallContract") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.PendingCallContract(ctx, msg); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.PendingCallContract(ctx, msg); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.PendingCallContract(ctx, msg) + }, ) c.toggleConnectionState(err) @@ -826,16 +775,16 @@ func (c *ClientWithFallback) PendingCallContract(ctx context.Context, msg ethere return nil, err } - return res[0].([]byte), nil + return res.([]byte), nil } func (c *ClientWithFallback) SuggestGasPrice(ctx context.Context) (*big.Int, error) { rpcstats.CountCall("eth_SuggestGasPrice") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.SuggestGasPrice(ctx); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.SuggestGasPrice(ctx); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.SuggestGasPrice(ctx) + }, ) c.toggleConnectionState(err) @@ -844,16 +793,16 @@ func (c *ClientWithFallback) SuggestGasPrice(ctx context.Context) (*big.Int, err return nil, err } - return res[0].(*big.Int), nil + return res.(*big.Int), nil } func (c *ClientWithFallback) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { rpcstats.CountCall("eth_SuggestGasTipCap") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.SuggestGasTipCap(ctx); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.SuggestGasTipCap(ctx); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.SuggestGasTipCap(ctx) + }, ) c.toggleConnectionState(err) @@ -862,21 +811,15 @@ func (c *ClientWithFallback) SuggestGasTipCap(ctx context.Context) (*big.Int, er return nil, err } - return res[0].(*big.Int), nil + return res.(*big.Int), nil } func (c *ClientWithFallback) FeeHistory(ctx context.Context, blockCount uint64, lastBlock *big.Int, rewardPercentiles []float64) (*ethereum.FeeHistory, error) { rpcstats.CountCall("eth_FeeHistory") res, err := c.makeCall( - ctx, - func() ([]any, error) { - a, err := c.main.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) - return []any{a}, err - }, - func() ([]any, error) { - a, err := c.fallback.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) - return []any{a}, err + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.FeeHistory(ctx, blockCount, lastBlock, rewardPercentiles) }, ) @@ -886,16 +829,16 @@ func (c *ClientWithFallback) FeeHistory(ctx context.Context, blockCount uint64, return nil, err } - return res[0].(*ethereum.FeeHistory), nil + return res.(*ethereum.FeeHistory), nil } func (c *ClientWithFallback) EstimateGas(ctx context.Context, msg ethereum.CallMsg) (uint64, error) { rpcstats.CountCall("eth_EstimateGas") res, err := c.makeCall( - ctx, - func() ([]any, error) { a, err := c.main.EstimateGas(ctx, msg); return []any{a}, err }, - func() ([]any, error) { a, err := c.fallback.EstimateGas(ctx, msg); return []any{a}, err }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return client.ethClient.EstimateGas(ctx, msg) + }, ) c.toggleConnectionState(err) @@ -904,17 +847,20 @@ func (c *ClientWithFallback) EstimateGas(ctx context.Context, msg ethereum.CallM return 0, err } - return res[0].(uint64), nil + return res.(uint64), nil } func (c *ClientWithFallback) SendTransaction(ctx context.Context, tx *types.Transaction) error { rpcstats.CountCall("eth_SendTransaction") _, err := c.makeCall( - ctx, - func() ([]any, error) { return nil, c.main.SendTransaction(ctx, tx) }, - func() ([]any, error) { return nil, c.fallback.SendTransaction(ctx, tx) }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return nil, client.ethClient.SendTransaction(ctx, tx) + }, ) + + c.toggleConnectionState(err) + return err } @@ -922,10 +868,13 @@ func (c *ClientWithFallback) CallContext(ctx context.Context, result interface{} rpcstats.CountCall("eth_CallContext") _, err := c.makeCall( - ctx, - func() ([]any, error) { return nil, c.mainRPC.CallContext(ctx, result, method, args...) }, - func() ([]any, error) { return nil, c.fallbackRPC.CallContext(ctx, result, method, args...) }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return nil, client.rpcClient.CallContext(ctx, result, method, args...) + }, ) + + c.toggleConnectionState(err) + return err } @@ -933,10 +882,13 @@ func (c *ClientWithFallback) BatchCallContext(ctx context.Context, b []rpc.Batch rpcstats.CountCall("eth_BatchCallContext") _, err := c.makeCall( - ctx, - func() ([]any, error) { return nil, c.mainRPC.BatchCallContext(ctx, b) }, - func() ([]any, error) { return nil, c.fallbackRPC.BatchCallContext(ctx, b) }, + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return nil, client.rpcClient.BatchCallContext(ctx, b) + }, ) + + c.toggleConnectionState(err) + return err } @@ -971,14 +923,8 @@ func (c *ClientWithFallback) CallBlockHashByTransaction(ctx context.Context, blo rpcstats.CountCallWithTag("eth_FullTransactionByBlockNumberAndIndex", c.tag) res, err := c.makeCall( - ctx, - func() ([]any, error) { - a, err := callBlockHashByTransaction(ctx, c.mainRPC, blockNumber, index) - return []any{a}, err - }, - func() ([]any, error) { - a, err := callBlockHashByTransaction(ctx, c.fallbackRPC, blockNumber, index) - return []any{a}, err + ctx, c.ethClients, func(client *EthClient) (interface{}, error) { + return callBlockHashByTransaction(ctx, client.rpcClient, blockNumber, index) }, ) @@ -988,7 +934,7 @@ func (c *ClientWithFallback) CallBlockHashByTransaction(ctx context.Context, blo return common.HexToHash(""), err } - return res[0].(common.Hash), nil + return res.(common.Hash), nil } func (c *ClientWithFallback) GetWalletNotifier() func(chainId uint64, message string) { diff --git a/rpc/client.go b/rpc/client.go index 857371d79..fd6d265ed 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -12,6 +12,7 @@ import ( "sync" "time" + "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/log" gethrpc "github.com/ethereum/go-ethereum/rpc" @@ -112,7 +113,11 @@ func NewClient(client *gethrpc.Client, upstreamChainID uint64, upstream params.U if err != nil { return nil, fmt.Errorf("get RPC limiter: %s", err) } - c.upstream = chain.NewSimpleClient(limiter, upstreamClient, upstreamChainID) + hostPortUpstream, err := extractHostAndPortFromURL(c.upstreamURL) + if err != nil { + hostPortUpstream = "upstream" + } + c.upstream = chain.NewSimpleClient(*chain.NewEthClient(ethclient.NewClient(upstreamClient), limiter, upstreamClient, hostPortUpstream), upstreamChainID) } c.router = newRouter(c.upstreamEnabled) @@ -140,6 +145,15 @@ func extractLastParamFromURL(inputURL string) (string, error) { return lastSegment, nil } +func extractHostAndPortFromURL(inputURL string) (string, error) { + parsedURL, err := url.Parse(inputURL) + if err != nil { + return "", err + } + + return parsedURL.Host, nil +} + func (c *Client) getRPCRpsLimiter(URL string) (*chain.RPCRpsLimiter, error) { apiKey, err := extractLastParamFromURL(URL) if err != nil { @@ -183,6 +197,15 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err return nil, fmt.Errorf("get RPC limiter: %s", err) } + hostPortMain, err := extractHostAndPortFromURL(network.RPCURL) + if err != nil { + hostPortMain = "main" + } + + ethClients := []*chain.EthClient{ + chain.NewEthClient(ethclient.NewClient(rpcClient), rpcLimiter, rpcClient, hostPortMain), + } + var ( rpcFallbackClient *gethrpc.Client rpcFallbackLimiter *chain.RPCRpsLimiter @@ -197,9 +220,15 @@ func (c *Client) getClientUsingCache(chainID uint64) (chain.ClientInterface, err if err != nil { return nil, fmt.Errorf("get RPC fallback limiter: %s", err) } + hostPortFallback, err := extractHostAndPortFromURL(network.FallbackURL) + if err != nil { + hostPortFallback = "fallback" + } + + ethClients = append(ethClients, chain.NewEthClient(ethclient.NewClient(rpcFallbackClient), rpcFallbackLimiter, rpcFallbackClient, hostPortFallback)) } - client := chain.NewClient(rpcLimiter, rpcClient, rpcFallbackLimiter, rpcFallbackClient, chainID) + client := chain.NewClient(ethClients, chainID) client.WalletNotifier = c.walletNotifier c.rpcClients[chainID] = client return client, nil @@ -260,7 +289,11 @@ func (c *Client) UpdateUpstreamURL(url string) error { return err } c.Lock() - c.upstream = chain.NewSimpleClient(rpsLimiter, rpcClient, c.UpstreamChainID) + hostPortUpstream, err := extractHostAndPortFromURL(url) + if err != nil { + hostPortUpstream = "upstream" + } + c.upstream = chain.NewSimpleClient(*chain.NewEthClient(ethclient.NewClient(rpcClient), rpsLimiter, rpcClient, hostPortUpstream), c.UpstreamChainID) c.upstreamURL = url c.Unlock() diff --git a/services/wallet/collectibles/manager.go b/services/wallet/collectibles/manager.go index d9ebcb902..97d4ae850 100644 --- a/services/wallet/collectibles/manager.go +++ b/services/wallet/collectibles/manager.go @@ -64,10 +64,10 @@ type Manager struct { mediaServer *server.MediaServer - statuses *sync.Map - statusNotifier *connection.StatusNotifier - feed *event.Feed - circuitBreakers sync.Map + statuses *sync.Map + statusNotifier *connection.StatusNotifier + feed *event.Feed + circuitBreaker *circuitbreaker.CircuitBreaker } func NewManager( @@ -81,6 +81,14 @@ func NewManager( ownershipDB := NewOwnershipDB(db) statuses := initStatuses(ownershipDB) + cb := circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{ + Timeout: 10000, + MaxConcurrentRequests: 100, + RequestVolumeThreshold: 25, + SleepWindow: 300000, + ErrorPercentThreshold: 25, + }) + return &Manager{ rpcClient: rpcClient, providers: providers, @@ -95,6 +103,7 @@ func NewManager( statuses: statuses, statusNotifier: createStatusNotifier(statuses, feed), feed: feed, + circuitBreaker: cb, } } @@ -202,7 +211,7 @@ func (o *Manager) FetchBalancesByOwnerAndContractAddress(ctx context.Context, ch func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(ctx context.Context, chainID walletCommon.ChainID, owner common.Address, contractAddresses []common.Address, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) { defer o.checkConnectionStatus(chainID) - cmd := circuitbreaker.Command{} + cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.AccountOwnershipProviders { if !provider.IsChainSupported(chainID) { continue @@ -219,7 +228,7 @@ func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(ctx context.Context, c log.Error("FetchAllAssetsByOwnerAndContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err) } return []interface{}{assetContainer}, err - }, + }, getCircuitName(provider, chainID), ) cmd.Add(f) } @@ -228,7 +237,7 @@ func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(ctx context.Context, c return nil, ErrNoProvidersAvailableForChainID } - cmdRes := o.getCircuitBreaker(chainID).Execute(cmd) + cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("FetchAllAssetsByOwnerAndContractAddress failed for", "chainID", chainID, "err", cmdRes.Error()) return nil, cmdRes.Error() @@ -246,7 +255,7 @@ func (o *Manager) FetchAllAssetsByOwnerAndContractAddress(ctx context.Context, c func (o *Manager) FetchAllAssetsByOwner(ctx context.Context, chainID walletCommon.ChainID, owner common.Address, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) { defer o.checkConnectionStatus(chainID) - cmd := circuitbreaker.Command{} + cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.AccountOwnershipProviders { if !provider.IsChainSupported(chainID) { continue @@ -263,7 +272,7 @@ func (o *Manager) FetchAllAssetsByOwner(ctx context.Context, chainID walletCommo log.Error("FetchAllAssetsByOwner failed for", "provider", provider.ID(), "chainID", chainID, "err", err) } return []interface{}{assetContainer}, err - }, + }, getCircuitName(provider, chainID), ) cmd.Add(f) } @@ -272,7 +281,7 @@ func (o *Manager) FetchAllAssetsByOwner(ctx context.Context, chainID walletCommo return nil, ErrNoProvidersAvailableForChainID } - cmdRes := o.getCircuitBreaker(chainID).Execute(cmd) + cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("FetchAllAssetsByOwner failed for", "chainID", chainID, "err", cmdRes.Error()) return nil, cmdRes.Error() @@ -439,7 +448,7 @@ func (o *Manager) FetchMissingAssetsByCollectibleUniqueID(ctx context.Context, u } func (o *Manager) fetchMissingAssetsForChainByCollectibleUniqueID(ctx context.Context, chainID walletCommon.ChainID, idsToFetch []thirdparty.CollectibleUniqueID) ([]thirdparty.FullCollectibleData, error) { - cmd := circuitbreaker.Command{} + cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.CollectibleDataProviders { if !provider.IsChainSupported(chainID) { continue @@ -453,14 +462,14 @@ func (o *Manager) fetchMissingAssetsForChainByCollectibleUniqueID(ctx context.Co } return []any{fetchedAssets}, err - })) + }, getCircuitName(provider, chainID))) } if cmd.IsEmpty() { return nil, ErrNoProvidersAvailableForChainID // lets not stop the group if no providers are available for the chain } - cmdRes := o.getCircuitBreaker(chainID).Execute(cmd) + cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("fetchMissingAssetsForChainByCollectibleUniqueID failed for", "chainID", chainID, "err", cmdRes.Error()) return nil, cmdRes.Error() @@ -482,7 +491,7 @@ func (o *Manager) FetchCollectionsDataByContractID(ctx context.Context, ids []th group.Add(func(ctx context.Context) error { defer o.checkConnectionStatus(chainID) - cmd := circuitbreaker.Command{} + cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.CollectionDataProviders { if !provider.IsChainSupported(chainID) { continue @@ -492,14 +501,14 @@ func (o *Manager) FetchCollectionsDataByContractID(ctx context.Context, ids []th cmd.Add(circuitbreaker.NewFunctor(func() ([]any, error) { fetchedCollections, err := provider.FetchCollectionsDataByContractID(ctx, idsToFetch) return []any{fetchedCollections}, err - })) + }, getCircuitName(provider, chainID))) } if cmd.IsEmpty() { return nil } - cmdRes := o.getCircuitBreaker(chainID).Execute(cmd) + cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("FetchCollectionsDataByContractID failed for", "chainID", chainID, "err", cmdRes.Error()) return cmdRes.Error() @@ -536,7 +545,7 @@ func (o *Manager) GetCollectibleOwnership(id thirdparty.CollectibleUniqueID) ([] func (o *Manager) FetchCollectibleOwnersByContractAddress(ctx context.Context, chainID walletCommon.ChainID, contractAddress common.Address) (*thirdparty.CollectibleContractOwnership, error) { defer o.checkConnectionStatus(chainID) - cmd := circuitbreaker.Command{} + cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.ContractOwnershipProviders { if !provider.IsChainSupported(chainID) { continue @@ -549,14 +558,14 @@ func (o *Manager) FetchCollectibleOwnersByContractAddress(ctx context.Context, c log.Error("FetchCollectibleOwnersByContractAddress failed for", "provider", provider.ID(), "chainID", chainID, "err", err) } return []any{res}, err - })) + }, getCircuitName(provider, chainID))) } if cmd.IsEmpty() { return nil, ErrNoProvidersAvailableForChainID } - cmdRes := o.getCircuitBreaker(chainID).Execute(cmd) + cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("FetchCollectibleOwnersByContractAddress failed for", "chainID", chainID, "err", cmdRes.Error()) return nil, cmdRes.Error() @@ -969,22 +978,6 @@ func (o *Manager) signalUpdatedCollectiblesData(ids []thirdparty.CollectibleUniq } } -func (o *Manager) getCircuitBreaker(chainID walletCommon.ChainID) *circuitbreaker.CircuitBreaker { - cb, ok := o.circuitBreakers.Load(chainID.String()) - if !ok { - cb = circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{ - CommandName: chainID.String(), - Timeout: 10000, - MaxConcurrentRequests: 100, - RequestVolumeThreshold: 25, - SleepWindow: 300000, - ErrorPercentThreshold: 25, - }) - o.circuitBreakers.Store(chainID.String(), cb) - } - return cb.(*circuitbreaker.CircuitBreaker) -} - func (o *Manager) SearchCollectibles(ctx context.Context, chainID walletCommon.ChainID, text string, cursor string, limit int, providerID string) (*thirdparty.FullCollectibleDataContainer, error) { defer o.checkConnectionStatus(chainID) @@ -1100,7 +1093,7 @@ func (o *Manager) getOrFetchSocialsForCollection(_ context.Context, contractID t } func (o *Manager) fetchSocialsForCollection(ctx context.Context, contractID thirdparty.ContractID) (*thirdparty.CollectionSocials, error) { - cmd := circuitbreaker.Command{} + cmd := circuitbreaker.NewCommand(ctx, nil) for _, provider := range o.providers.CollectibleDataProviders { if !provider.IsChainSupported(contractID.ChainID) { continue @@ -1113,14 +1106,14 @@ func (o *Manager) fetchSocialsForCollection(ctx context.Context, contractID thir log.Error("FetchCollectionSocials failed for", "provider", provider.ID(), "chainID", contractID.ChainID, "err", err) } return []interface{}{socials}, err - })) + }, getCircuitName(provider, contractID.ChainID))) } if cmd.IsEmpty() { return nil, ErrNoProvidersAvailableForChainID // lets not stop the group if no providers are available for the chain } - cmdRes := o.getCircuitBreaker(contractID.ChainID).Execute(cmd) + cmdRes := o.circuitBreaker.Execute(cmd) if cmdRes.Error() != nil { log.Error("fetchSocialsForCollection failed for", "chainID", contractID.ChainID, "err", cmdRes.Error()) return nil, cmdRes.Error() @@ -1163,3 +1156,9 @@ func createStatusNotifier(statuses *sync.Map, feed *event.Feed) *connection.Stat feed, ) } + +// Different providers have API keys per chain or per testnet/mainnet. +// Proper implementation should respect that. For now, the safest solution is to use the provider ID and chain ID as the key. +func getCircuitName(provider thirdparty.CollectibleProvider, chainID walletCommon.ChainID) string { + return provider.ID() + chainID.String() +} diff --git a/services/wallet/market/market.go b/services/wallet/market/market.go index 23ce9e1bb..db40775e2 100644 --- a/services/wallet/market/market.go +++ b/services/wallet/market/market.go @@ -1,6 +1,7 @@ package market import ( + "context" "sync" "time" @@ -37,7 +38,6 @@ type Manager struct { func NewManager(providers []thirdparty.MarketDataProvider, feed *event.Feed) *Manager { cb := circuitbreaker.NewCircuitBreaker(circuitbreaker.Config{ - CommandName: "marketClient", Timeout: 10000, MaxConcurrentRequests: 100, SleepWindow: 300000, @@ -74,13 +74,13 @@ func (pm *Manager) setIsConnected(value bool) { } func (pm *Manager) makeCall(providers []thirdparty.MarketDataProvider, f func(provider thirdparty.MarketDataProvider) (interface{}, error)) (interface{}, error) { - cmd := circuitbreaker.Command{} + cmd := circuitbreaker.NewCommand(context.Background(), nil) for _, provider := range providers { provider := provider cmd.Add(circuitbreaker.NewFunctor(func() ([]interface{}, error) { result, err := f(provider) return []interface{}{result}, err - })) + }, provider.ID())) } result := pm.circuitbreaker.Execute(cmd) diff --git a/services/wallet/market/market_test.go b/services/wallet/market/market_test.go index 579629fa9..8ee6b542b 100644 --- a/services/wallet/market/market_test.go +++ b/services/wallet/market/market_test.go @@ -36,6 +36,10 @@ func (mpp *MockPriceProvider) FetchTokenDetails(symbols []string) (map[string]th return nil, errors.New("not implmented") } +func (mpp *MockPriceProvider) ID() string { + return "MockPriceProvider" +} + func (mpp *MockPriceProvider) FetchPrices(symbols []string, currencies []string) (map[string]map[string]float64, error) { res := make(map[string]map[string]float64) for _, symbol := range symbols { diff --git a/services/wallet/thirdparty/coingecko/client.go b/services/wallet/thirdparty/coingecko/client.go index 6f528c4c1..5d09b8866 100644 --- a/services/wallet/thirdparty/coingecko/client.go +++ b/services/wallet/thirdparty/coingecko/client.go @@ -364,3 +364,7 @@ func (c *Client) FetchHistoricalDailyPrices(symbol string, currency string, limi return result, nil } + +func (c *Client) ID() string { + return "coingecko" +} diff --git a/services/wallet/thirdparty/cryptocompare/client.go b/services/wallet/thirdparty/cryptocompare/client.go index e43ecbc56..a852da014 100644 --- a/services/wallet/thirdparty/cryptocompare/client.go +++ b/services/wallet/thirdparty/cryptocompare/client.go @@ -198,3 +198,7 @@ func (c *Client) FetchHistoricalDailyPrices(symbol string, currency string, limi return item, nil } + +func (c *Client) ID() string { + return "cryptocompare" +} diff --git a/services/wallet/thirdparty/types.go b/services/wallet/thirdparty/types.go index a72855117..b9c7d149c 100644 --- a/services/wallet/thirdparty/types.go +++ b/services/wallet/thirdparty/types.go @@ -29,6 +29,7 @@ type TokenDetails struct { } type MarketDataProvider interface { + ID() string FetchPrices(symbols []string, currencies []string) (map[string]map[string]float64, error) FetchHistoricalDailyPrices(symbol string, currency string, limit int, allData bool, aggregate int) ([]HistoricalPrice, error) FetchHistoricalHourlyPrices(symbol string, currency string, limit int, aggregate int) ([]HistoricalPrice, error)