fix_: Don't count error rate for circuit breaker on cancel and handle

not found
This commit is contained in:
Andrea Maria Piana 2024-08-19 13:44:46 +01:00
parent 7c9de0429e
commit e07182b3f3
5 changed files with 99 additions and 16 deletions

View File

@ -5,6 +5,8 @@ import (
"fmt"
"github.com/afex/hystrix-go/hystrix"
"github.com/ethereum/go-ethereum/log"
)
type FallbackFunc func() ([]any, error)
@ -78,6 +80,16 @@ func NewFunctor(exec FallbackFunc, circuitName string) *Functor {
}
}
func accumulateCommandError(result CommandResult, circuitName string, err error) CommandResult {
// Accumulate errors
if result.err != nil {
result.err = fmt.Errorf("%w, %s.error: %w", result.err, circuitName, err)
} else {
result.err = fmt.Errorf("%s.error: %w", circuitName, err)
}
return result
}
// Executes the command in its circuit if set.
// If the command's circuit is not configured, the circuit of the CircuitBreaker is used.
// This is a blocking function.
@ -127,6 +139,16 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {
if err == nil {
result = CommandResult{res: res}
}
// If the command has been cancelled, we don't count
// the error towars breaking the circuit, and then we break
if cmd.cancel {
result = accumulateCommandError(result, f.circuitName, err)
return nil
}
if err != nil {
log.Warn("hystrix error", "error", err, "provider", circuitName)
}
return err
}, nil)
}
@ -134,12 +156,8 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {
break
}
// Accumulate errors
if result.err != nil {
result.err = fmt.Errorf("%w, %s.error: %w", result.err, f.circuitName, err)
} else {
result.err = fmt.Errorf("%s.error: %w", f.circuitName, err)
}
result = accumulateCommandError(result, f.circuitName, err)
// Lets abuse every provider with the same amount of MaxConcurrentRequests,
// keep iterating even in case of ErrMaxConcurrency error
}

View File

@ -241,6 +241,11 @@ func TestCircuitBreaker_CircuitExistsAndClosed(t *testing.T) {
cb := NewCircuitBreaker(Config{})
cmd := NewCommand(context.TODO(), nil)
existCircuit := fmt.Sprintf("existing_%d", timestamp) // unique name to avoid conflicts with go tests `-count` option
// We add it twice as otherwise it's only used for the fallback
cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, nil
}, existCircuit))
cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, nil
}, existCircuit))
@ -248,3 +253,51 @@ func TestCircuitBreaker_CircuitExistsAndClosed(t *testing.T) {
require.True(t, CircuitExists(existCircuit))
require.False(t, IsCircuitOpen(existCircuit))
}
func TestCircuitBreaker_Fallback(t *testing.T) {
cb := NewCircuitBreaker(Config{
RequestVolumeThreshold: 1, // 1 failed request is enough to trip the circuit
SleepWindow: 50000,
ErrorPercentThreshold: 1, // Trip on first error
})
circuitName := fmt.Sprintf("Fallback_%d", time.Now().Nanosecond()) // unique name to avoid conflicts with go tests `-count` option
prov1Called := 0
var ctx context.Context
expectedErr := errors.New("provider 1 failed")
// we start with 2, and we open the first
for {
cmd := NewCommand(ctx, nil)
cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, expectedErr
}, circuitName+"1"))
cmd.Add(NewFunctor(func() ([]interface{}, error) {
return nil, errors.New("provider 2 failed")
}, circuitName+"2"))
result := cb.Execute(cmd)
require.NotNil(t, result.Error())
if IsCircuitOpen(circuitName + "1") {
break
}
}
// Make sure circuit is open
require.True(t, CircuitExists(circuitName+"1"))
require.True(t, IsCircuitOpen(circuitName+"1"))
// we send a single request, it should hit the provider, at that's a fallback
cmd := NewCommand(ctx, nil)
cmd.Add(NewFunctor(func() ([]interface{}, error) {
prov1Called++
return nil, expectedErr
}, circuitName+"1"))
result := cb.Execute(cmd)
require.True(t, errors.Is(result.Error(), expectedErr))
assert.Equal(t, 1, prov1Called)
}

View File

@ -158,7 +158,6 @@ var propagateErrors = []error{
vm.ErrNonceUintOverflow,
// Used by balance history to check state
ethereum.NotFound,
bind.ErrNoCode,
}
@ -207,6 +206,13 @@ func (c *ClientWithFallback) Close() {
}
}
// Not found should not be cancelling the requests, as that's returned
// when we are hitting a non archival node for example, it should continue the
// chain as the next provider might have archival support.
func isNotFoundError(err error) bool {
return strings.Contains(err.Error(), ethereum.NotFound.Error())
}
func isVMError(err error) bool {
if strings.Contains(err.Error(), core.ErrInsufficientFunds.Error()) {
return true
@ -972,7 +978,7 @@ func (c *ClientWithFallback) SetWalletNotifier(notifier func(chainId uint64, mes
func (c *ClientWithFallback) toggleConnectionState(err error) {
connected := true
if err != nil {
if !isVMError(err) && !errors.Is(err, ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) {
if !isNotFoundError(err) && !isVMError(err) && !errors.Is(err, ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) {
log.Warn("Error not in chain call", "error", err, "chain", c.ChainID)
connected = false
} else {

View File

@ -61,9 +61,11 @@ func TestManager_FetchAllAssetsByOwner(t *testing.T) {
rpcClient := mock_rpcclient.NewMockClientInterface(mockCtrl)
rpcClient.EXPECT().EthClient(gomock.Any()).Return(chainClient, nil).AnyTimes()
mockProvider1 := mock_thirdparty.NewMockCollectibleAccountOwnershipProvider(mockCtrl)
// We use 2 providers as the last one is not using hystrix
mockProvider2 := mock_thirdparty.NewMockCollectibleAccountOwnershipProvider(mockCtrl)
mockProviders := thirdparty.CollectibleProviders{
AccountOwnershipProviders: []thirdparty.CollectibleAccountOwnershipProvider{mockProvider1},
AccountOwnershipProviders: []thirdparty.CollectibleAccountOwnershipProvider{mockProvider1, mockProvider2},
}
// Generate many collectibles, but none support toeknURI method, but circuit must not be tripped
@ -91,6 +93,10 @@ func TestManager_FetchAllAssetsByOwner(t *testing.T) {
mockProvider1.EXPECT().ID().Return(providerID).AnyTimes()
mockProvider1.EXPECT().FetchAllAssetsByOwner(gomock.Any(), chainID, owner, cursor, limit).Return(mockAssetContainer, nil)
mockProvider2.EXPECT().IsChainSupported(chainID).Return(true).AnyTimes()
mockProvider2.EXPECT().IsConnected().Return(true).AnyTimes()
mockProvider2.EXPECT().ID().Return(providerID).AnyTimes()
manager := NewManager(nil, rpcClient, nil, mockProviders, nil, nil)
manager.statuses = &sync.Map{}
collectiblesDataDB := mock_collectibles.NewMockCollectibleDataStorage(mockCtrl)

View File

@ -493,7 +493,7 @@ func (r *Router) SuggestedRoutesV2(ctx context.Context, input *RouteInputParams)
return nil, errors.CreateErrorResponseFromError(err)
}
selectedFromChains, selectedTohains, err := r.getSelectedChains(input)
selectedFromChains, selectedToChains, err := r.getSelectedChains(input)
if err != nil {
return nil, errors.CreateErrorResponseFromError(err)
}
@ -504,7 +504,7 @@ func (r *Router) SuggestedRoutesV2(ctx context.Context, input *RouteInputParams)
return nil, errors.CreateErrorResponseFromError(err)
}
candidates, processorErrors, err := r.resolveCandidates(ctx, input, selectedFromChains, selectedTohains, balanceMap)
candidates, processorErrors, err := r.resolveCandidates(ctx, input, selectedFromChains, selectedToChains, balanceMap)
if err != nil {
return nil, errors.CreateErrorResponseFromError(err)
}
@ -744,7 +744,7 @@ func (r *Router) findOptionsForSendingAmount(input *RouteInputParams, selectedFr
return crossChainAmountOptions, nil
}
func (r *Router) getSelectedChains(input *RouteInputParams) (selectedFromChains []*params.Network, selectedTohains []*params.Network, err error) {
func (r *Router) getSelectedChains(input *RouteInputParams) (selectedFromChains []*params.Network, selectedToChains []*params.Network, err error) {
var networks []*params.Network
networks, err = r.rpcClient.NetworkManager.Get(false)
if err != nil {
@ -761,15 +761,15 @@ func (r *Router) getSelectedChains(input *RouteInputParams) (selectedFromChains
}
if !containsNetworkChainID(network.ChainID, input.DisabledToChainIDs) {
selectedTohains = append(selectedTohains, network)
selectedToChains = append(selectedToChains, network)
}
}
return selectedFromChains, selectedTohains, nil
return selectedFromChains, selectedToChains, nil
}
func (r *Router) resolveCandidates(ctx context.Context, input *RouteInputParams, selectedFromChains []*params.Network,
selectedTohains []*params.Network, balanceMap map[string]*big.Int) (candidates []*PathV2, processorErrors []*ProcessorError, err error) {
selectedToChains []*params.Network, balanceMap map[string]*big.Int) (candidates []*PathV2, processorErrors []*ProcessorError, err error) {
var (
testsMode = input.testsMode && input.testParams != nil
group = async.NewAtomicGroup(ctx)
@ -859,7 +859,7 @@ func (r *Router) resolveCandidates(ctx context.Context, input *RouteInputParams,
continue
}
for _, dest := range selectedTohains {
for _, dest := range selectedToChains {
if !input.SendType.isAvailableFor(network) {
continue