fix_: Don't count error rate for circuit breaker on cancel and handle
not found
This commit is contained in:
parent
c08d10b8ab
commit
605ceae989
|
@ -5,6 +5,8 @@ import (
|
|||
"fmt"
|
||||
|
||||
"github.com/afex/hystrix-go/hystrix"
|
||||
|
||||
"github.com/ethereum/go-ethereum/log"
|
||||
)
|
||||
|
||||
type FallbackFunc func() ([]any, error)
|
||||
|
@ -78,6 +80,16 @@ func NewFunctor(exec FallbackFunc, circuitName string) *Functor {
|
|||
}
|
||||
}
|
||||
|
||||
func accumulateCommandError(result CommandResult, circuitName string, err error) CommandResult {
|
||||
// Accumulate errors
|
||||
if result.err != nil {
|
||||
result.err = fmt.Errorf("%w, %s.error: %w", result.err, circuitName, err)
|
||||
} else {
|
||||
result.err = fmt.Errorf("%s.error: %w", circuitName, err)
|
||||
}
|
||||
return result
|
||||
}
|
||||
|
||||
// Executes the command in its circuit if set.
|
||||
// If the command's circuit is not configured, the circuit of the CircuitBreaker is used.
|
||||
// This is a blocking function.
|
||||
|
@ -127,6 +139,16 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {
|
|||
if err == nil {
|
||||
result = CommandResult{res: res}
|
||||
}
|
||||
|
||||
// If the command has been cancelled, we don't count
|
||||
// the error towars breaking the circuit, and then we break
|
||||
if cmd.cancel {
|
||||
result = accumulateCommandError(result, f.circuitName, err)
|
||||
return nil
|
||||
}
|
||||
if err != nil {
|
||||
log.Warn("hystrix error", "error", err, "provider", circuitName)
|
||||
}
|
||||
return err
|
||||
}, nil)
|
||||
}
|
||||
|
@ -134,12 +156,8 @@ func (cb *CircuitBreaker) Execute(cmd *Command) CommandResult {
|
|||
break
|
||||
}
|
||||
|
||||
// Accumulate errors
|
||||
if result.err != nil {
|
||||
result.err = fmt.Errorf("%w, %s.error: %w", result.err, f.circuitName, err)
|
||||
} else {
|
||||
result.err = fmt.Errorf("%s.error: %w", f.circuitName, err)
|
||||
}
|
||||
result = accumulateCommandError(result, f.circuitName, err)
|
||||
|
||||
// Lets abuse every provider with the same amount of MaxConcurrentRequests,
|
||||
// keep iterating even in case of ErrMaxConcurrency error
|
||||
}
|
||||
|
|
|
@ -241,6 +241,11 @@ func TestCircuitBreaker_CircuitExistsAndClosed(t *testing.T) {
|
|||
cb := NewCircuitBreaker(Config{})
|
||||
cmd := NewCommand(context.TODO(), nil)
|
||||
existCircuit := fmt.Sprintf("existing_%d", timestamp) // unique name to avoid conflicts with go tests `-count` option
|
||||
// We add it twice as otherwise it's only used for the fallback
|
||||
cmd.Add(NewFunctor(func() ([]interface{}, error) {
|
||||
return nil, nil
|
||||
}, existCircuit))
|
||||
|
||||
cmd.Add(NewFunctor(func() ([]interface{}, error) {
|
||||
return nil, nil
|
||||
}, existCircuit))
|
||||
|
@ -248,3 +253,51 @@ func TestCircuitBreaker_CircuitExistsAndClosed(t *testing.T) {
|
|||
require.True(t, CircuitExists(existCircuit))
|
||||
require.False(t, IsCircuitOpen(existCircuit))
|
||||
}
|
||||
|
||||
func TestCircuitBreaker_Fallback(t *testing.T) {
|
||||
cb := NewCircuitBreaker(Config{
|
||||
RequestVolumeThreshold: 1, // 1 failed request is enough to trip the circuit
|
||||
SleepWindow: 50000,
|
||||
ErrorPercentThreshold: 1, // Trip on first error
|
||||
})
|
||||
|
||||
circuitName := fmt.Sprintf("Fallback_%d", time.Now().Nanosecond()) // unique name to avoid conflicts with go tests `-count` option
|
||||
|
||||
prov1Called := 0
|
||||
|
||||
var ctx context.Context
|
||||
expectedErr := errors.New("provider 1 failed")
|
||||
|
||||
// we start with 2, and we open the first
|
||||
for {
|
||||
cmd := NewCommand(ctx, nil)
|
||||
cmd.Add(NewFunctor(func() ([]interface{}, error) {
|
||||
return nil, expectedErr
|
||||
}, circuitName+"1"))
|
||||
cmd.Add(NewFunctor(func() ([]interface{}, error) {
|
||||
return nil, errors.New("provider 2 failed")
|
||||
}, circuitName+"2"))
|
||||
|
||||
result := cb.Execute(cmd)
|
||||
require.NotNil(t, result.Error())
|
||||
if IsCircuitOpen(circuitName + "1") {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// Make sure circuit is open
|
||||
require.True(t, CircuitExists(circuitName+"1"))
|
||||
require.True(t, IsCircuitOpen(circuitName+"1"))
|
||||
|
||||
// we send a single request, it should hit the provider, at that's a fallback
|
||||
cmd := NewCommand(ctx, nil)
|
||||
cmd.Add(NewFunctor(func() ([]interface{}, error) {
|
||||
prov1Called++
|
||||
return nil, expectedErr
|
||||
}, circuitName+"1"))
|
||||
|
||||
result := cb.Execute(cmd)
|
||||
require.True(t, errors.Is(result.Error(), expectedErr))
|
||||
|
||||
assert.Equal(t, 1, prov1Called)
|
||||
}
|
||||
|
|
|
@ -158,7 +158,6 @@ var propagateErrors = []error{
|
|||
vm.ErrNonceUintOverflow,
|
||||
|
||||
// Used by balance history to check state
|
||||
ethereum.NotFound,
|
||||
bind.ErrNoCode,
|
||||
}
|
||||
|
||||
|
@ -207,6 +206,13 @@ func (c *ClientWithFallback) Close() {
|
|||
}
|
||||
}
|
||||
|
||||
// Not found should not be cancelling the requests, as that's returned
|
||||
// when we are hitting a non archival node for example, it should continue the
|
||||
// chain as the next provider might have archival support.
|
||||
func isNotFoundError(err error) bool {
|
||||
return strings.Contains(err.Error(), ethereum.NotFound.Error())
|
||||
}
|
||||
|
||||
func isVMError(err error) bool {
|
||||
if strings.Contains(err.Error(), core.ErrInsufficientFunds.Error()) {
|
||||
return true
|
||||
|
@ -972,7 +978,7 @@ func (c *ClientWithFallback) SetWalletNotifier(notifier func(chainId uint64, mes
|
|||
func (c *ClientWithFallback) toggleConnectionState(err error) {
|
||||
connected := true
|
||||
if err != nil {
|
||||
if !isVMError(err) && !errors.Is(err, ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) {
|
||||
if !isNotFoundError(err) && !isVMError(err) && !errors.Is(err, ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) {
|
||||
log.Warn("Error not in chain call", "error", err, "chain", c.ChainID)
|
||||
connected = false
|
||||
} else {
|
||||
|
|
|
@ -61,9 +61,11 @@ func TestManager_FetchAllAssetsByOwner(t *testing.T) {
|
|||
rpcClient := mock_rpcclient.NewMockClientInterface(mockCtrl)
|
||||
rpcClient.EXPECT().EthClient(gomock.Any()).Return(chainClient, nil).AnyTimes()
|
||||
mockProvider1 := mock_thirdparty.NewMockCollectibleAccountOwnershipProvider(mockCtrl)
|
||||
// We use 2 providers as the last one is not using hystrix
|
||||
mockProvider2 := mock_thirdparty.NewMockCollectibleAccountOwnershipProvider(mockCtrl)
|
||||
|
||||
mockProviders := thirdparty.CollectibleProviders{
|
||||
AccountOwnershipProviders: []thirdparty.CollectibleAccountOwnershipProvider{mockProvider1},
|
||||
AccountOwnershipProviders: []thirdparty.CollectibleAccountOwnershipProvider{mockProvider1, mockProvider2},
|
||||
}
|
||||
|
||||
// Generate many collectibles, but none support toeknURI method, but circuit must not be tripped
|
||||
|
@ -91,6 +93,10 @@ func TestManager_FetchAllAssetsByOwner(t *testing.T) {
|
|||
mockProvider1.EXPECT().ID().Return(providerID).AnyTimes()
|
||||
mockProvider1.EXPECT().FetchAllAssetsByOwner(gomock.Any(), chainID, owner, cursor, limit).Return(mockAssetContainer, nil)
|
||||
|
||||
mockProvider2.EXPECT().IsChainSupported(chainID).Return(true).AnyTimes()
|
||||
mockProvider2.EXPECT().IsConnected().Return(true).AnyTimes()
|
||||
mockProvider2.EXPECT().ID().Return(providerID).AnyTimes()
|
||||
|
||||
manager := NewManager(nil, rpcClient, nil, mockProviders, nil, nil)
|
||||
manager.statuses = &sync.Map{}
|
||||
collectiblesDataDB := mock_collectibles.NewMockCollectibleDataStorage(mockCtrl)
|
||||
|
|
|
@ -8,9 +8,10 @@ import (
|
|||
big "math/big"
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
|
||||
common "github.com/ethereum/go-ethereum/common"
|
||||
types "github.com/ethereum/go-ethereum/core/types"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
account "github.com/status-im/status-go/account"
|
||||
types0 "github.com/status-im/status-go/eth-node/types"
|
||||
pathprocessor "github.com/status-im/status-go/services/wallet/router/pathprocessor"
|
||||
|
|
|
@ -493,7 +493,7 @@ func (r *Router) SuggestedRoutesV2(ctx context.Context, input *RouteInputParams)
|
|||
return nil, errors.CreateErrorResponseFromError(err)
|
||||
}
|
||||
|
||||
selectedFromChains, selectedTohains, err := r.getSelectedChains(input)
|
||||
selectedFromChains, selectedToChains, err := r.getSelectedChains(input)
|
||||
if err != nil {
|
||||
return nil, errors.CreateErrorResponseFromError(err)
|
||||
}
|
||||
|
@ -504,7 +504,7 @@ func (r *Router) SuggestedRoutesV2(ctx context.Context, input *RouteInputParams)
|
|||
return nil, errors.CreateErrorResponseFromError(err)
|
||||
}
|
||||
|
||||
candidates, processorErrors, err := r.resolveCandidates(ctx, input, selectedFromChains, selectedTohains, balanceMap)
|
||||
candidates, processorErrors, err := r.resolveCandidates(ctx, input, selectedFromChains, selectedToChains, balanceMap)
|
||||
if err != nil {
|
||||
return nil, errors.CreateErrorResponseFromError(err)
|
||||
}
|
||||
|
@ -744,7 +744,7 @@ func (r *Router) findOptionsForSendingAmount(input *RouteInputParams, selectedFr
|
|||
return crossChainAmountOptions, nil
|
||||
}
|
||||
|
||||
func (r *Router) getSelectedChains(input *RouteInputParams) (selectedFromChains []*params.Network, selectedTohains []*params.Network, err error) {
|
||||
func (r *Router) getSelectedChains(input *RouteInputParams) (selectedFromChains []*params.Network, selectedToChains []*params.Network, err error) {
|
||||
var networks []*params.Network
|
||||
networks, err = r.rpcClient.NetworkManager.Get(false)
|
||||
if err != nil {
|
||||
|
@ -761,15 +761,15 @@ func (r *Router) getSelectedChains(input *RouteInputParams) (selectedFromChains
|
|||
}
|
||||
|
||||
if !containsNetworkChainID(network.ChainID, input.DisabledToChainIDs) {
|
||||
selectedTohains = append(selectedTohains, network)
|
||||
selectedToChains = append(selectedToChains, network)
|
||||
}
|
||||
}
|
||||
|
||||
return selectedFromChains, selectedTohains, nil
|
||||
return selectedFromChains, selectedToChains, nil
|
||||
}
|
||||
|
||||
func (r *Router) resolveCandidates(ctx context.Context, input *RouteInputParams, selectedFromChains []*params.Network,
|
||||
selectedTohains []*params.Network, balanceMap map[string]*big.Int) (candidates []*PathV2, processorErrors []*ProcessorError, err error) {
|
||||
selectedToChains []*params.Network, balanceMap map[string]*big.Int) (candidates []*PathV2, processorErrors []*ProcessorError, err error) {
|
||||
var (
|
||||
testsMode = input.testsMode && input.testParams != nil
|
||||
group = async.NewAtomicGroup(ctx)
|
||||
|
@ -859,7 +859,7 @@ func (r *Router) resolveCandidates(ctx context.Context, input *RouteInputParams,
|
|||
continue
|
||||
}
|
||||
|
||||
for _, dest := range selectedTohains {
|
||||
for _, dest := range selectedToChains {
|
||||
|
||||
if !input.SendType.isAvailableFor(network) {
|
||||
continue
|
||||
|
|
|
@ -8,9 +8,10 @@ import (
|
|||
big "math/big"
|
||||
reflect "reflect"
|
||||
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
|
||||
common "github.com/ethereum/go-ethereum/common"
|
||||
types "github.com/ethereum/go-ethereum/core/types"
|
||||
gomock "github.com/golang/mock/gomock"
|
||||
account "github.com/status-im/status-go/account"
|
||||
types0 "github.com/status-im/status-go/eth-node/types"
|
||||
params "github.com/status-im/status-go/params"
|
||||
|
|
Loading…
Reference in New Issue