diff --git a/circuitbreaker/circuit_breaker.go b/circuitbreaker/circuit_breaker.go index 10aa11042..a1dfda3a2 100644 --- a/circuitbreaker/circuit_breaker.go +++ b/circuitbreaker/circuit_breaker.go @@ -5,6 +5,8 @@ import ( "fmt" "github.com/afex/hystrix-go/hystrix" + + "github.com/ethereum/go-ethereum/log" ) type FallbackFunc func() ([]any, error) @@ -78,6 +80,16 @@ func NewFunctor(exec FallbackFunc, circuitName string) *Functor { } } +func accumulateCommandError(result CommandResult, circuitName string, err error) CommandResult { + // Accumulate errors + if result.err != nil { + result.err = fmt.Errorf("%w, %s.error: %w", result.err, circuitName, err) + } else { + result.err = fmt.Errorf("%s.error: %w", circuitName, err) + } + return result +} + // Executes the command in its circuit if set. // If the command's circuit is not configured, the circuit of the CircuitBreaker is used. // This is a blocking function. @@ -127,6 +139,16 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult { if err == nil { result = CommandResult{res: res} } + + // If the command has been cancelled, we don't count + // the error towars breaking the circuit, and then we break + if cmd.cancel { + result = accumulateCommandError(result, f.circuitName, err) + return nil + } + if err != nil { + log.Warn("hystrix error", "error", err, "provider", circuitName) + } return err }, nil) } @@ -134,12 +156,8 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult { break } - // Accumulate errors - if result.err != nil { - result.err = fmt.Errorf("%w, %s.error: %w", result.err, f.circuitName, err) - } else { - result.err = fmt.Errorf("%s.error: %w", f.circuitName, err) - } + result = accumulateCommandError(result, f.circuitName, err) + // Lets abuse every provider with the same amount of MaxConcurrentRequests, // keep iterating even in case of ErrMaxConcurrency error } diff --git a/circuitbreaker/circuit_breaker_test.go b/circuitbreaker/circuit_breaker_test.go index 9699ffad5..42d35d0c3 100644 --- a/circuitbreaker/circuit_breaker_test.go +++ b/circuitbreaker/circuit_breaker_test.go @@ -241,6 +241,11 @@ func TestCircuitBreaker_CircuitExistsAndClosed(t *testing.T) { cb := NewCircuitBreaker(Config{}) cmd := NewCommand(context.TODO(), nil) existCircuit := fmt.Sprintf("existing_%d", timestamp) // unique name to avoid conflicts with go tests `-count` option + // We add it twice as otherwise it's only used for the fallback + cmd.Add(NewFunctor(func() ([]interface{}, error) { + return nil, nil + }, existCircuit)) + cmd.Add(NewFunctor(func() ([]interface{}, error) { return nil, nil }, existCircuit)) @@ -248,3 +253,51 @@ func TestCircuitBreaker_CircuitExistsAndClosed(t *testing.T) { require.True(t, CircuitExists(existCircuit)) require.False(t, IsCircuitOpen(existCircuit)) } + +func TestCircuitBreaker_Fallback(t *testing.T) { + cb := NewCircuitBreaker(Config{ + RequestVolumeThreshold: 1, // 1 failed request is enough to trip the circuit + SleepWindow: 50000, + ErrorPercentThreshold: 1, // Trip on first error + }) + + circuitName := fmt.Sprintf("Fallback_%d", time.Now().Nanosecond()) // unique name to avoid conflicts with go tests `-count` option + + prov1Called := 0 + + var ctx context.Context + expectedErr := errors.New("provider 1 failed") + + // we start with 2, and we open the first + for { + cmd := NewCommand(ctx, nil) + cmd.Add(NewFunctor(func() ([]interface{}, error) { + return nil, expectedErr + }, circuitName+"1")) + cmd.Add(NewFunctor(func() ([]interface{}, error) { + return nil, errors.New("provider 2 failed") + }, circuitName+"2")) + + result := cb.Execute(cmd) + require.NotNil(t, result.Error()) + if IsCircuitOpen(circuitName + "1") { + break + } + } + + // Make sure circuit is open + require.True(t, CircuitExists(circuitName+"1")) + require.True(t, IsCircuitOpen(circuitName+"1")) + + // we send a single request, it should hit the provider, at that's a fallback + cmd := NewCommand(ctx, nil) + cmd.Add(NewFunctor(func() ([]interface{}, error) { + prov1Called++ + return nil, expectedErr + }, circuitName+"1")) + + result := cb.Execute(cmd) + require.True(t, errors.Is(result.Error(), expectedErr)) + + assert.Equal(t, 1, prov1Called) +} diff --git a/rpc/chain/client.go b/rpc/chain/client.go index baad43139..da8c24a3e 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -158,7 +158,6 @@ var propagateErrors = []error{ vm.ErrNonceUintOverflow, // Used by balance history to check state - ethereum.NotFound, bind.ErrNoCode, } @@ -207,6 +206,13 @@ func (c *ClientWithFallback) Close() { } } +// Not found should not be cancelling the requests, as that's returned +// when we are hitting a non archival node for example, it should continue the +// chain as the next provider might have archival support. +func isNotFoundError(err error) bool { + return strings.Contains(err.Error(), ethereum.NotFound.Error()) +} + func isVMError(err error) bool { if strings.Contains(err.Error(), core.ErrInsufficientFunds.Error()) { return true @@ -972,7 +978,7 @@ func (c *ClientWithFallback) SetWalletNotifier(notifier func(chainId uint64, mes func (c *ClientWithFallback) toggleConnectionState(err error) { connected := true if err != nil { - if !isVMError(err) && !errors.Is(err, ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) { + if !isNotFoundError(err) && !isVMError(err) && !errors.Is(err, ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) { log.Warn("Error not in chain call", "error", err, "chain", c.ChainID) connected = false } else { diff --git a/services/wallet/collectibles/manager_test.go b/services/wallet/collectibles/manager_test.go index 74742fdab..2343aa700 100644 --- a/services/wallet/collectibles/manager_test.go +++ b/services/wallet/collectibles/manager_test.go @@ -61,9 +61,11 @@ func TestManager_FetchAllAssetsByOwner(t *testing.T) { rpcClient := mock_rpcclient.NewMockClientInterface(mockCtrl) rpcClient.EXPECT().EthClient(gomock.Any()).Return(chainClient, nil).AnyTimes() mockProvider1 := mock_thirdparty.NewMockCollectibleAccountOwnershipProvider(mockCtrl) + // We use 2 providers as the last one is not using hystrix + mockProvider2 := mock_thirdparty.NewMockCollectibleAccountOwnershipProvider(mockCtrl) mockProviders := thirdparty.CollectibleProviders{ - AccountOwnershipProviders: []thirdparty.CollectibleAccountOwnershipProvider{mockProvider1}, + AccountOwnershipProviders: []thirdparty.CollectibleAccountOwnershipProvider{mockProvider1, mockProvider2}, } // Generate many collectibles, but none support toeknURI method, but circuit must not be tripped @@ -91,6 +93,10 @@ func TestManager_FetchAllAssetsByOwner(t *testing.T) { mockProvider1.EXPECT().ID().Return(providerID).AnyTimes() mockProvider1.EXPECT().FetchAllAssetsByOwner(gomock.Any(), chainID, owner, cursor, limit).Return(mockAssetContainer, nil) + mockProvider2.EXPECT().IsChainSupported(chainID).Return(true).AnyTimes() + mockProvider2.EXPECT().IsConnected().Return(true).AnyTimes() + mockProvider2.EXPECT().ID().Return(providerID).AnyTimes() + manager := NewManager(nil, rpcClient, nil, mockProviders, nil, nil) manager.statuses = &sync.Map{} collectiblesDataDB := mock_collectibles.NewMockCollectibleDataStorage(mockCtrl) diff --git a/services/wallet/router/router_v2.go b/services/wallet/router/router_v2.go index c7f3a43bd..10f2d33f3 100644 --- a/services/wallet/router/router_v2.go +++ b/services/wallet/router/router_v2.go @@ -493,7 +493,7 @@ func (r *Router) SuggestedRoutesV2(ctx context.Context, input *RouteInputParams) return nil, errors.CreateErrorResponseFromError(err) } - selectedFromChains, selectedTohains, err := r.getSelectedChains(input) + selectedFromChains, selectedToChains, err := r.getSelectedChains(input) if err != nil { return nil, errors.CreateErrorResponseFromError(err) } @@ -504,7 +504,7 @@ func (r *Router) SuggestedRoutesV2(ctx context.Context, input *RouteInputParams) return nil, errors.CreateErrorResponseFromError(err) } - candidates, processorErrors, err := r.resolveCandidates(ctx, input, selectedFromChains, selectedTohains, balanceMap) + candidates, processorErrors, err := r.resolveCandidates(ctx, input, selectedFromChains, selectedToChains, balanceMap) if err != nil { return nil, errors.CreateErrorResponseFromError(err) } @@ -744,7 +744,7 @@ func (r *Router) findOptionsForSendingAmount(input *RouteInputParams, selectedFr return crossChainAmountOptions, nil } -func (r *Router) getSelectedChains(input *RouteInputParams) (selectedFromChains []*params.Network, selectedTohains []*params.Network, err error) { +func (r *Router) getSelectedChains(input *RouteInputParams) (selectedFromChains []*params.Network, selectedToChains []*params.Network, err error) { var networks []*params.Network networks, err = r.rpcClient.NetworkManager.Get(false) if err != nil { @@ -761,15 +761,15 @@ func (r *Router) getSelectedChains(input *RouteInputParams) (selectedFromChains } if !containsNetworkChainID(network.ChainID, input.DisabledToChainIDs) { - selectedTohains = append(selectedTohains, network) + selectedToChains = append(selectedToChains, network) } } - return selectedFromChains, selectedTohains, nil + return selectedFromChains, selectedToChains, nil } func (r *Router) resolveCandidates(ctx context.Context, input *RouteInputParams, selectedFromChains []*params.Network, - selectedTohains []*params.Network, balanceMap map[string]*big.Int) (candidates []*PathV2, processorErrors []*ProcessorError, err error) { + selectedToChains []*params.Network, balanceMap map[string]*big.Int) (candidates []*PathV2, processorErrors []*ProcessorError, err error) { var ( testsMode = input.testsMode && input.testParams != nil group = async.NewAtomicGroup(ctx) @@ -859,7 +859,7 @@ func (r *Router) resolveCandidates(ctx context.Context, input *RouteInputParams, continue } - for _, dest := range selectedTohains { + for _, dest := range selectedToChains { if !input.SendType.isAvailableFor(network) { continue