chore_: remove rpclimiter

fixes #5942
This commit is contained in:
Andrey Bocharnikov 2024-10-16 16:48:23 +04:00
parent 72a4734935
commit b1c4588a8c
15 changed files with 23 additions and 850 deletions

View File

@ -11,7 +11,6 @@ import (
"github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/core/vm"
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
) )
// ProviderErrorType defines the type of non-RPC error for JSON serialization. // ProviderErrorType defines the type of non-RPC error for JSON serialization.
@ -106,10 +105,6 @@ func IsRateLimitError(err error) bool {
return true return true
} }
if errors.Is(err, rpclimiter.ErrRequestsOverLimit) {
return true
}
errMsg := strings.ToLower(err.Error()) errMsg := strings.ToLower(err.Error())
if strings.Contains(errMsg, "backoff_seconds") || if strings.Contains(errMsg, "backoff_seconds") ||
strings.Contains(errMsg, "has exceeded its throughput limit") || strings.Contains(errMsg, "has exceeded its throughput limit") ||

View File

@ -4,8 +4,6 @@ import (
"errors" "errors"
"testing" "testing"
"time" "time"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
) )
func TestNewRpcProviderStatus(t *testing.T) { func TestNewRpcProviderStatus(t *testing.T) {
@ -39,18 +37,6 @@ func TestNewRpcProviderStatus(t *testing.T) {
Status: StatusDown, Status: StatusDown,
}, },
}, },
{
name: "Non-critical RPC error, should be up",
res: RpcProviderCallStatus{
Name: "Provider2",
Timestamp: time.Now(),
Err: rpclimiter.ErrRequestsOverLimit, // Assuming this is non-critical
},
expected: ProviderStatus{
Name: "Provider2",
Status: StatusUp,
},
},
} }
for _, tt := range tests { for _, tt := range tests {

View File

@ -51,7 +51,6 @@ func (s *BlockchainHealthManagerSuite) setupClients(chainIDs []uint64) {
for _, chainID := range chainIDs { for _, chainID := range chainIDs {
mockEthClient := mockEthclient.NewMockRPSLimitedEthClientInterface(s.mockCtrl) mockEthClient := mockEthclient.NewMockRPSLimitedEthClientInterface(s.mockCtrl)
mockEthClient.EXPECT().GetName().AnyTimes().Return(fmt.Sprintf("test_client_chain_%d", chainID)) mockEthClient.EXPECT().GetName().AnyTimes().Return(fmt.Sprintf("test_client_chain_%d", chainID))
mockEthClient.EXPECT().GetLimiter().AnyTimes().Return(nil)
phm := healthmanager.NewProvidersHealthManager(chainID) phm := healthmanager.NewProvidersHealthManager(chainID)
client := NewClient([]ethclient.RPSLimitedEthClientInterface{mockEthClient}, chainID, phm) client := NewClient([]ethclient.RPSLimitedEthClientInterface{mockEthClient}, chainID, phm)

View File

@ -22,7 +22,6 @@ import (
"github.com/status-im/status-go/healthmanager" "github.com/status-im/status-go/healthmanager"
"github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/status-im/status-go/healthmanager/rpcstatus"
"github.com/status-im/status-go/rpc/chain/ethclient" "github.com/status-im/status-go/rpc/chain/ethclient"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
"github.com/status-im/status-go/rpc/chain/tagger" "github.com/status-im/status-go/rpc/chain/tagger"
"github.com/status-im/status-go/services/rpcstats" "github.com/status-im/status-go/services/rpcstats"
"github.com/status-im/status-go/services/wallet/connection" "github.com/status-im/status-go/services/wallet/connection"
@ -35,8 +34,6 @@ type ClientInterface interface {
GetWalletNotifier() func(chainId uint64, message string) GetWalletNotifier() func(chainId uint64, message string)
SetWalletNotifier(notifier func(chainId uint64, message string)) SetWalletNotifier(notifier func(chainId uint64, message string))
connection.Connectable connection.Connectable
GetLimiter() rpclimiter.RequestLimiter
SetLimiter(rpclimiter.RequestLimiter)
} }
type HealthMonitor interface { type HealthMonitor interface {
@ -66,7 +63,6 @@ func ClientWithTag(chainClient ClientInterface, tag, groupTag string) ClientInte
type ClientWithFallback struct { type ClientWithFallback struct {
ChainID uint64 ChainID uint64
ethClients []ethclient.RPSLimitedEthClientInterface ethClients []ethclient.RPSLimitedEthClientInterface
commonLimiter rpclimiter.RequestLimiter // FIXME: remove from RPC client https://github.com/status-im/status-go/issues/5942
circuitbreaker *circuitbreaker.CircuitBreaker circuitbreaker *circuitbreaker.CircuitBreaker
providersHealthManager *healthmanager.ProvidersHealthManager providersHealthManager *healthmanager.ProvidersHealthManager
@ -83,7 +79,6 @@ func (c *ClientWithFallback) Copy() interface{} {
return &ClientWithFallback{ return &ClientWithFallback{
ChainID: c.ChainID, ChainID: c.ChainID,
ethClients: c.ethClients, ethClients: c.ethClients,
commonLimiter: c.commonLimiter,
circuitbreaker: c.circuitbreaker, circuitbreaker: c.circuitbreaker,
WalletNotifier: c.WalletNotifier, WalletNotifier: c.WalletNotifier,
isConnected: c.isConnected, isConnected: c.isConnected,
@ -765,7 +760,7 @@ func (c *ClientWithFallback) SetWalletNotifier(notifier func(chainId uint64, mes
func (c *ClientWithFallback) toggleConnectionState(err error) { func (c *ClientWithFallback) toggleConnectionState(err error) {
connected := true connected := true
if err != nil { if err != nil {
if !isNotFoundError(err) && !isVMError(err) && !errors.Is(err, rpclimiter.ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) { if !isNotFoundError(err) && !isVMError(err) && !errors.Is(err, context.Canceled) {
log.Warn("Error not in chain call", "error", err, "chain", c.ChainID) log.Warn("Error not in chain call", "error", err, "chain", c.ChainID)
connected = false connected = false
} else { } else {
@ -796,14 +791,6 @@ func (c *ClientWithFallback) DeepCopyTag() tagger.Tagger {
return &copy return &copy
} }
func (c *ClientWithFallback) GetLimiter() rpclimiter.RequestLimiter {
return c.commonLimiter
}
func (c *ClientWithFallback) SetLimiter(limiter rpclimiter.RequestLimiter) {
c.commonLimiter = limiter
}
func (c *ClientWithFallback) GetCircuitBreaker() *circuitbreaker.CircuitBreaker { func (c *ClientWithFallback) GetCircuitBreaker() *circuitbreaker.CircuitBreaker {
return c.circuitbreaker return c.circuitbreaker
} }

View File

@ -17,7 +17,6 @@ import (
healthManager "github.com/status-im/status-go/healthmanager" healthManager "github.com/status-im/status-go/healthmanager"
"github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/status-im/status-go/healthmanager/rpcstatus"
"github.com/status-im/status-go/rpc/chain/ethclient" "github.com/status-im/status-go/rpc/chain/ethclient"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
mockEthclient "github.com/status-im/status-go/rpc/chain/ethclient/mock/client/ethclient" mockEthclient "github.com/status-im/status-go/rpc/chain/ethclient/mock/client/ethclient"
) )
@ -45,7 +44,6 @@ func (s *ClientWithFallbackSuite) setupClients(numClients int) {
for i := 0; i < numClients; i++ { for i := 0; i < numClients; i++ {
ethClient := mockEthclient.NewMockRPSLimitedEthClientInterface(s.mockCtrl) ethClient := mockEthclient.NewMockRPSLimitedEthClientInterface(s.mockCtrl)
ethClient.EXPECT().GetName().AnyTimes().Return("test" + strconv.Itoa(i)) ethClient.EXPECT().GetName().AnyTimes().Return("test" + strconv.Itoa(i))
ethClient.EXPECT().GetLimiter().AnyTimes().Return(nil)
s.mockEthClients = append(s.mockEthClients, ethClient) s.mockEthClients = append(s.mockEthClients, ethClient)
ethClients = append(ethClients, ethClient) ethClients = append(ethClients, ethClient)
@ -104,7 +102,8 @@ func (s *ClientWithFallbackSuite) TestRPSLimitErrorDoesNotMarkChainDown() {
hash := common.HexToHash("0x1234") hash := common.HexToHash("0x1234")
// WHEN // WHEN
s.mockEthClients[0].EXPECT().BlockByHash(ctx, hash).Return(nil, rpclimiter.ErrRequestsOverLimit).Times(1) rpsError := errors.New("Request rate exceeded")
s.mockEthClients[0].EXPECT().BlockByHash(ctx, hash).Return(nil, rpsError).Times(1)
_, err := s.client.BlockByHash(ctx, hash) _, err := s.client.BlockByHash(ctx, hash)
require.Error(s.T(), err) require.Error(s.T(), err)
@ -186,7 +185,7 @@ func (s *ClientWithFallbackSuite) TestAllClientsDifferentErrors() {
// GIVEN // GIVEN
s.mockEthClients[0].EXPECT().BlockByHash(ctx, hash).Return(nil, errors.New("no such host")).Times(1) s.mockEthClients[0].EXPECT().BlockByHash(ctx, hash).Return(nil, errors.New("no such host")).Times(1)
s.mockEthClients[1].EXPECT().BlockByHash(ctx, hash).Return(nil, rpclimiter.ErrRequestsOverLimit).Times(1) s.mockEthClients[1].EXPECT().BlockByHash(ctx, hash).Return(nil, errors.New("request rate exceeded")).Times(1)
s.mockEthClients[2].EXPECT().BlockByHash(ctx, hash).Return(nil, vm.ErrOutOfGas).Times(1) s.mockEthClients[2].EXPECT().BlockByHash(ctx, hash).Return(nil, vm.ErrOutOfGas).Times(1)
// WHEN // WHEN

View File

@ -26,7 +26,6 @@ func setupClientTest(t *testing.T) (*ClientWithFallback, []*mock_ethclient.MockR
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
ethCl := mock_ethclient.NewMockRPSLimitedEthClientInterface(mockCtrl) ethCl := mock_ethclient.NewMockRPSLimitedEthClientInterface(mockCtrl)
ethCl.EXPECT().GetName().AnyTimes().Return("test" + strconv.Itoa(i)) ethCl.EXPECT().GetName().AnyTimes().Return("test" + strconv.Itoa(i))
ethCl.EXPECT().GetLimiter().AnyTimes().Return(nil)
mockEthClients = append(mockEthClients, ethCl) mockEthClients = append(mockEthClients, ethCl)
ethClients = append(ethClients, ethCl) ethClients = append(ethClients, ethCl)

View File

@ -4,7 +4,6 @@ package ethclient
import ( import (
"github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/rpc"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
) )
// RPSLimitedEthClientInterface extends EthClientInterface with additional // RPSLimitedEthClientInterface extends EthClientInterface with additional
@ -14,33 +13,26 @@ import (
// PRS limiting. fallback mechanisms or caching. // PRS limiting. fallback mechanisms or caching.
type RPSLimitedEthClientInterface interface { type RPSLimitedEthClientInterface interface {
EthClientInterface EthClientInterface
GetLimiter() *rpclimiter.RPCRpsLimiter
GetName() string GetName() string
CopyWithName(name string) RPSLimitedEthClientInterface CopyWithName(name string) RPSLimitedEthClientInterface
} }
type RPSLimitedEthClient struct { type RPSLimitedEthClient struct {
*EthClient *EthClient
limiter *rpclimiter.RPCRpsLimiter name string
name string
} }
func NewRPSLimitedEthClient(rpcClient *rpc.Client, limiter *rpclimiter.RPCRpsLimiter, name string) *RPSLimitedEthClient { func NewRPSLimitedEthClient(rpcClient *rpc.Client, name string) *RPSLimitedEthClient {
return &RPSLimitedEthClient{ return &RPSLimitedEthClient{
EthClient: NewEthClient(rpcClient), EthClient: NewEthClient(rpcClient),
limiter: limiter,
name: name, name: name,
} }
} }
func (c *RPSLimitedEthClient) GetLimiter() *rpclimiter.RPCRpsLimiter {
return c.limiter
}
func (c *RPSLimitedEthClient) GetName() string { func (c *RPSLimitedEthClient) GetName() string {
return c.name return c.name
} }
func (c *RPSLimitedEthClient) CopyWithName(name string) RPSLimitedEthClientInterface { func (c *RPSLimitedEthClient) CopyWithName(name string) RPSLimitedEthClientInterface {
return NewRPSLimitedEthClient(c.rpcClient, c.limiter, name) return NewRPSLimitedEthClient(c.rpcClient, name)
} }

View File

@ -1,356 +0,0 @@
package rpclimiter
import (
"database/sql"
"errors"
"fmt"
"sync"
"time"
"github.com/google/uuid"
"github.com/ethereum/go-ethereum/log"
gocommon "github.com/status-im/status-go/common"
)
const (
defaultMaxRequestsPerSecond = 50
minRequestsPerSecond = 20
requestsPerSecondStep = 10
tickerInterval = 1 * time.Second
LimitInfinitely = 0
)
var (
ErrRequestsOverLimit = errors.New("number of requests over limit")
)
type callerOnWait struct {
requests int
ch chan bool
}
type LimitsStorage interface {
Get(tag string) (*LimitData, error)
Set(data *LimitData) error
Delete(tag string) error
}
type InMemRequestsMapStorage struct {
data sync.Map
}
func NewInMemRequestsMapStorage() *InMemRequestsMapStorage {
return &InMemRequestsMapStorage{}
}
func (s *InMemRequestsMapStorage) Get(tag string) (*LimitData, error) {
data, ok := s.data.Load(tag)
if !ok {
return nil, nil
}
return data.(*LimitData), nil
}
func (s *InMemRequestsMapStorage) Set(data *LimitData) error {
if data == nil {
return fmt.Errorf("data is nil")
}
s.data.Store(data.Tag, data)
return nil
}
func (s *InMemRequestsMapStorage) Delete(tag string) error {
s.data.Delete(tag)
return nil
}
type LimitsDBStorage struct {
db *RPCLimiterDB
}
func NewLimitsDBStorage(db *sql.DB) *LimitsDBStorage {
return &LimitsDBStorage{
db: NewRPCLimiterDB(db),
}
}
func (s *LimitsDBStorage) Get(tag string) (*LimitData, error) {
return s.db.GetRPCLimit(tag)
}
func (s *LimitsDBStorage) Set(data *LimitData) error {
if data == nil {
return fmt.Errorf("data is nil")
}
limit, err := s.db.GetRPCLimit(data.Tag)
if err != nil && err != sql.ErrNoRows {
return err
}
if limit == nil {
return s.db.CreateRPCLimit(*data)
}
return s.db.UpdateRPCLimit(*data)
}
func (s *LimitsDBStorage) Delete(tag string) error {
return s.db.DeleteRPCLimit(tag)
}
type LimitData struct {
Tag string
CreatedAt time.Time
Period time.Duration
MaxReqs int
NumReqs int
}
type RequestLimiter interface {
SetLimit(tag string, maxRequests int, interval time.Duration) error
GetLimit(tag string) (*LimitData, error)
DeleteLimit(tag string) error
Allow(tag string) (bool, error)
}
type RPCRequestLimiter struct {
storage LimitsStorage
mu sync.Mutex
}
func NewRequestLimiter(storage LimitsStorage) *RPCRequestLimiter {
return &RPCRequestLimiter{
storage: storage,
}
}
func (rl *RPCRequestLimiter) SetLimit(tag string, maxRequests int, interval time.Duration) error {
err := rl.saveToStorage(tag, maxRequests, interval, 0, time.Now())
if err != nil {
log.Error("Failed to save request data to storage", "error", err)
return err
}
return nil
}
func (rl *RPCRequestLimiter) GetLimit(tag string) (*LimitData, error) {
data, err := rl.storage.Get(tag)
if err != nil {
return nil, err
}
return data, nil
}
func (rl *RPCRequestLimiter) DeleteLimit(tag string) error {
err := rl.storage.Delete(tag)
if err != nil {
log.Error("Failed to delete request data from storage", "error", err)
return err
}
return nil
}
func (rl *RPCRequestLimiter) saveToStorage(tag string, maxRequests int, interval time.Duration, numReqs int, timestamp time.Time) error {
data := &LimitData{
Tag: tag,
CreatedAt: timestamp,
Period: interval,
MaxReqs: maxRequests,
NumReqs: numReqs,
}
err := rl.storage.Set(data)
if err != nil {
log.Error("Failed to save request data to storage", "error", err)
return err
}
return nil
}
func (rl *RPCRequestLimiter) Allow(tag string) (bool, error) {
rl.mu.Lock()
defer rl.mu.Unlock()
data, err := rl.storage.Get(tag)
if err != nil {
return true, err
}
if data == nil {
return true, nil
}
// Check if the interval has passed and reset the number of requests
if time.Since(data.CreatedAt) >= data.Period && data.Period.Milliseconds() != LimitInfinitely {
err = rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now())
if err != nil {
return true, err
}
return true, nil
}
// Check if a number of requests is over the limit within the interval
if time.Since(data.CreatedAt) < data.Period || data.Period.Milliseconds() == LimitInfinitely {
if data.NumReqs >= data.MaxReqs {
log.Info("Number of requests over limit",
"tag", tag,
"numReqs", data.NumReqs,
"maxReqs", data.MaxReqs,
"period", data.Period,
"createdAt", data.CreatedAt.UTC())
return false, ErrRequestsOverLimit
}
return true, rl.saveToStorage(tag, data.MaxReqs, data.Period, data.NumReqs+1, data.CreatedAt)
}
// Reset the number of requests if the interval has passed
return true, rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now()) // still allow the request if failed to save as not critical
}
type RPCRpsLimiter struct {
uuid uuid.UUID
maxRequestsPerSecond int
maxRequestsPerSecondMutex sync.RWMutex
requestsMadeWithinSecond int
requestsMadeWithinSecondMutex sync.RWMutex
callersOnWaitForRequests []callerOnWait
callersOnWaitForRequestsMutex sync.RWMutex
quit chan bool
}
func NewRPCRpsLimiter() *RPCRpsLimiter {
limiter := RPCRpsLimiter{
uuid: uuid.New(),
maxRequestsPerSecond: defaultMaxRequestsPerSecond,
quit: make(chan bool),
}
limiter.start()
return &limiter
}
func (rl *RPCRpsLimiter) ReduceLimit() {
rl.maxRequestsPerSecondMutex.Lock()
defer rl.maxRequestsPerSecondMutex.Unlock()
if rl.maxRequestsPerSecond <= minRequestsPerSecond {
return
}
rl.maxRequestsPerSecond = rl.maxRequestsPerSecond - requestsPerSecondStep
}
func (rl *RPCRpsLimiter) start() {
ticker := time.NewTicker(tickerInterval)
go func() {
defer gocommon.LogOnPanic()
for {
select {
case <-ticker.C:
{
rl.requestsMadeWithinSecondMutex.Lock()
oldrequestsMadeWithinSecond := rl.requestsMadeWithinSecond
if rl.requestsMadeWithinSecond != 0 {
rl.requestsMadeWithinSecond = 0
}
rl.requestsMadeWithinSecondMutex.Unlock()
if oldrequestsMadeWithinSecond == 0 {
continue
}
}
rl.callersOnWaitForRequestsMutex.Lock()
numOfRequestsToMakeAvailable := rl.maxRequestsPerSecond
for {
if numOfRequestsToMakeAvailable == 0 || len(rl.callersOnWaitForRequests) == 0 {
break
}
var index = -1
for i := 0; i < len(rl.callersOnWaitForRequests); i++ {
if rl.callersOnWaitForRequests[i].requests <= numOfRequestsToMakeAvailable {
index = i
break
}
}
if index == -1 {
break
}
callerOnWait := rl.callersOnWaitForRequests[index]
numOfRequestsToMakeAvailable -= callerOnWait.requests
rl.callersOnWaitForRequests = append(rl.callersOnWaitForRequests[:index], rl.callersOnWaitForRequests[index+1:]...)
callerOnWait.ch <- true
}
rl.callersOnWaitForRequestsMutex.Unlock()
case <-rl.quit:
ticker.Stop()
return
}
}
}()
}
func (rl *RPCRpsLimiter) Stop() {
rl.quit <- true
close(rl.quit)
for _, callerOnWait := range rl.callersOnWaitForRequests {
close(callerOnWait.ch)
}
rl.callersOnWaitForRequests = nil
}
func (rl *RPCRpsLimiter) WaitForRequestsAvailability(requests int) error {
if requests > rl.maxRequestsPerSecond {
return ErrRequestsOverLimit
}
{
rl.requestsMadeWithinSecondMutex.Lock()
if rl.requestsMadeWithinSecond+requests <= rl.maxRequestsPerSecond {
rl.requestsMadeWithinSecond += requests
rl.requestsMadeWithinSecondMutex.Unlock()
return nil
}
rl.requestsMadeWithinSecondMutex.Unlock()
}
callerOnWait := callerOnWait{
requests: requests,
ch: make(chan bool),
}
{
rl.callersOnWaitForRequestsMutex.Lock()
rl.callersOnWaitForRequests = append(rl.callersOnWaitForRequests, callerOnWait)
rl.callersOnWaitForRequestsMutex.Unlock()
}
<-callerOnWait.ch
close(callerOnWait.ch)
rl.requestsMadeWithinSecondMutex.Lock()
rl.requestsMadeWithinSecond += requests
rl.requestsMadeWithinSecondMutex.Unlock()
return nil
}

View File

@ -1,57 +0,0 @@
package rpclimiter
import (
"database/sql"
"time"
)
type RPCLimiterDB struct {
db *sql.DB
}
func NewRPCLimiterDB(db *sql.DB) *RPCLimiterDB {
return &RPCLimiterDB{
db: db,
}
}
func (r *RPCLimiterDB) CreateRPCLimit(limit LimitData) error {
query := `INSERT INTO rpc_limits (tag, created_at, period, max_requests, counter) VALUES (?, ?, ?, ?, ?)`
_, err := r.db.Exec(query, limit.Tag, limit.CreatedAt.Unix(), limit.Period, limit.MaxReqs, limit.NumReqs)
if err != nil {
return err
}
return nil
}
func (r *RPCLimiterDB) GetRPCLimit(tag string) (*LimitData, error) {
query := `SELECT tag, created_at, period, max_requests, counter FROM rpc_limits WHERE tag = ?`
row := r.db.QueryRow(query, tag)
limit := &LimitData{}
createdAtSecs := int64(0)
err := row.Scan(&limit.Tag, &createdAtSecs, &limit.Period, &limit.MaxReqs, &limit.NumReqs)
if err != nil {
return nil, err
}
limit.CreatedAt = time.Unix(createdAtSecs, 0)
return limit, nil
}
func (r *RPCLimiterDB) UpdateRPCLimit(limit LimitData) error {
query := `UPDATE rpc_limits SET created_at = ?, period = ?, max_requests = ?, counter = ? WHERE tag = ?`
_, err := r.db.Exec(query, limit.CreatedAt.Unix(), limit.Period, limit.MaxReqs, limit.NumReqs, limit.Tag)
if err != nil {
return err
}
return nil
}
func (r *RPCLimiterDB) DeleteRPCLimit(tag string) error {
query := `DELETE FROM rpc_limits WHERE tag = ?`
_, err := r.db.Exec(query, tag)
if err != nil && err != sql.ErrNoRows {
return err
}
return nil
}

View File

@ -1,195 +0,0 @@
package rpclimiter
import (
"testing"
"time"
"github.com/stretchr/testify/require"
)
func setupTest() (*InMemRequestsMapStorage, RequestLimiter) {
storage := NewInMemRequestsMapStorage()
rl := NewRequestLimiter(storage)
return storage, rl
}
func TestSetLimit(t *testing.T) {
storage, rl := setupTest()
// Define test inputs
tag := "testTag"
maxRequests := 10
interval := time.Second
// Call the SetLimit method
err := rl.SetLimit(tag, maxRequests, interval)
require.NoError(t, err)
// Verify that the data was saved to storage correctly
data, err := storage.Get(tag)
require.NoError(t, err)
require.Equal(t, tag, data.Tag)
require.Equal(t, interval, data.Period)
require.Equal(t, maxRequests, data.MaxReqs)
require.Equal(t, 0, data.NumReqs)
}
func TestGetLimit(t *testing.T) {
storage, rl := setupTest()
// Define test inputs
data := &LimitData{
Tag: "testTag",
Period: time.Second,
MaxReqs: 10,
NumReqs: 1,
}
err := storage.Set(data)
require.NoError(t, err)
// Call the GetLimit method
ret, err := rl.GetLimit(data.Tag)
require.NoError(t, err)
// Verify the returned data
require.Equal(t, data, ret)
}
func TestDeleteLimit(t *testing.T) {
storage, rl := setupTest()
// Define test inputs
tag := "testTag"
data := &LimitData{
Tag: tag,
Period: time.Second,
MaxReqs: 10,
NumReqs: 1,
}
err := storage.Set(data)
require.NoError(t, err)
// Call the DeleteLimit method
err = rl.DeleteLimit(tag)
require.NoError(t, err)
// Verify that the data was deleted from storage
limit, _ := storage.Get(tag)
require.Nil(t, limit)
// Test double delete
err = rl.DeleteLimit(tag)
require.NoError(t, err)
}
func TestAllowWithinPeriod(t *testing.T) {
storage, rl := setupTest()
// Define test inputs
tag := "testTag"
maxRequests := 10
interval := time.Second
// Set up the storage with test data
data := &LimitData{
Tag: tag,
Period: interval,
CreatedAt: time.Now(),
MaxReqs: maxRequests,
}
err := storage.Set(data)
require.NoError(t, err)
// Call the Allow method
for i := 0; i < maxRequests; i++ {
allow, err := rl.Allow(tag)
require.NoError(t, err)
// Verify the result
require.True(t, allow)
}
// Call the Allow method again
allow, err := rl.Allow(tag)
require.ErrorIs(t, err, ErrRequestsOverLimit)
require.False(t, allow)
}
func TestAllowWhenPeriodPassed(t *testing.T) {
storage, rl := setupTest()
// Define test inputs
tag := "testTag"
maxRequests := 10
interval := time.Second
// Set up the storage with test data
data := &LimitData{
Tag: tag,
Period: interval,
CreatedAt: time.Now().Add(-interval),
MaxReqs: maxRequests,
NumReqs: maxRequests,
}
err := storage.Set(data)
require.NoError(t, err)
// Call the Allow method
allow, err := rl.Allow(tag)
require.NoError(t, err)
// Verify the result
require.True(t, allow)
}
func TestAllowRestrictInfinitelyWhenLimitReached(t *testing.T) {
storage, rl := setupTest()
// Define test inputs
tag := "testTag"
maxRequests := 10
// Set up the storage with test data
data := &LimitData{
Tag: tag,
Period: LimitInfinitely,
CreatedAt: time.Now(),
MaxReqs: maxRequests,
NumReqs: maxRequests,
}
err := storage.Set(data)
require.NoError(t, err)
// Call the Allow method
allow, err := rl.Allow(tag)
require.ErrorIs(t, err, ErrRequestsOverLimit)
// Verify the result
require.False(t, allow)
}
func TestAllowWhenLimitNotReachedForInfinitePeriod(t *testing.T) {
storage, rl := setupTest()
// Define test inputs
tag := "testTag"
maxRequests := 10
// Set up the storage with test data
data := &LimitData{
Tag: tag,
Period: LimitInfinitely,
CreatedAt: time.Now(),
MaxReqs: maxRequests,
NumReqs: maxRequests - 1,
}
err := storage.Set(data)
require.NoError(t, err)
// Call the Allow method
allow, err := rl.Allow(tag)
require.NoError(t, err)
// Verify the result
require.True(t, allow)
}

View File

@ -25,7 +25,6 @@ import (
"github.com/status-im/status-go/params" "github.com/status-im/status-go/params"
"github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/rpc/chain/ethclient" "github.com/status-im/status-go/rpc/chain/ethclient"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
"github.com/status-im/status-go/rpc/network" "github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/services/rpcstats" "github.com/status-im/status-go/services/rpcstats"
"github.com/status-im/status-go/services/wallet/common" "github.com/status-im/status-go/services/wallet/common"
@ -97,11 +96,10 @@ type Client struct {
UpstreamChainID uint64 UpstreamChainID uint64
local *gethrpc.Client local *gethrpc.Client
rpcClientsMutex sync.RWMutex rpcClientsMutex sync.RWMutex
rpcClients map[uint64]chain.ClientInterface rpcClients map[uint64]chain.ClientInterface
rpsLimiterMutex sync.RWMutex rpsLimiterMutex sync.RWMutex
limiterPerProvider map[string]*rpclimiter.RPCRpsLimiter
router *router router *router
NetworkManager *network.Manager NetworkManager *network.Manager
@ -150,15 +148,14 @@ func NewClient(config ClientConfig) (*Client, error) {
} }
c := Client{ c := Client{
local: config.Client, local: config.Client,
NetworkManager: networkManager, NetworkManager: networkManager,
handlers: make(map[string]Handler), handlers: make(map[string]Handler),
rpcClients: make(map[uint64]chain.ClientInterface), rpcClients: make(map[uint64]chain.ClientInterface),
limiterPerProvider: make(map[string]*rpclimiter.RPCRpsLimiter), log: log,
log: log, providerConfigs: config.ProviderConfigs,
providerConfigs: config.ProviderConfigs, healthMgr: healthmanager.NewBlockchainHealthManager(),
healthMgr: healthmanager.NewBlockchainHealthManager(), walletFeed: config.WalletFeed,
walletFeed: config.WalletFeed,
} }
c.UpstreamChainID = config.UpstreamChainID c.UpstreamChainID = config.UpstreamChainID
@ -239,17 +236,6 @@ func extractHostFromURL(inputURL string) (string, error) {
return parsedURL.Host, nil return parsedURL.Host, nil
} }
func (c *Client) getRPCRpsLimiter(key string) (*rpclimiter.RPCRpsLimiter, error) {
c.rpsLimiterMutex.Lock()
defer c.rpsLimiterMutex.Unlock()
if limiter, ok := c.limiterPerProvider[key]; ok {
return limiter, nil
}
limiter := rpclimiter.NewRPCRpsLimiter()
c.limiterPerProvider[key] = limiter
return limiter, nil
}
func getProviderConfig(providerConfigs []params.ProviderConfig, providerName string) (params.ProviderConfig, error) { func getProviderConfig(providerConfigs []params.ProviderConfig, providerName string) (params.ProviderConfig, error) {
for _, providerConfig := range providerConfigs { for _, providerConfig := range providerConfigs {
if providerConfig.Name == providerName { if providerConfig.Name == providerName {
@ -321,7 +307,6 @@ func (c *Client) getEthClients(network *params.Network) []ethclient.RPSLimitedEt
ethClients := make([]ethclient.RPSLimitedEthClientInterface, 0) ethClients := make([]ethclient.RPSLimitedEthClientInterface, 0)
for index, key := range keys { for index, key := range keys {
var rpcClient *gethrpc.Client var rpcClient *gethrpc.Client
var rpcLimiter *rpclimiter.RPCRpsLimiter
var err error var err error
var hostPort string var hostPort string
url := urls[key] url := urls[key]
@ -355,12 +340,11 @@ func (c *Client) getEthClients(network *params.Network) []ethclient.RPSLimitedEt
} }
} }
rpcLimiter, err = c.getRPCRpsLimiter(circuitKey)
if err != nil { if err != nil {
c.log.Error("get RPC limiter "+key, "error", err) c.log.Error("get RPC limiter "+key, "error", err)
} }
ethClients = append(ethClients, ethclient.NewRPSLimitedEthClient(rpcClient, rpcLimiter, circuitKey)) ethClients = append(ethClients, ethclient.NewRPSLimitedEthClient(rpcClient, circuitKey))
} }
} }

View File

@ -16,7 +16,6 @@ import (
nodetypes "github.com/status-im/status-go/eth-node/types" nodetypes "github.com/status-im/status-go/eth-node/types"
"github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
"github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/async"
"github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/blockchainstate" "github.com/status-im/status-go/services/wallet/blockchainstate"
@ -1124,13 +1123,6 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn
} }
if len(ranges) > 0 { if len(ranges) > 0 {
storage := rpclimiter.NewLimitsDBStorage(c.db.client)
limiter := rpclimiter.NewRequestLimiter(storage)
chainClient, _ := createChainClientWithLimiter(c.chainClient, account, limiter)
if chainClient == nil {
chainClient = c.chainClient
}
for _, rangeItem := range ranges { for _, rangeItem := range ranges {
log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account) log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account)
@ -1139,7 +1131,7 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn
db: c.db, db: c.db,
accountsDB: c.accountsDB, accountsDB: c.accountsDB,
blockRangeDAO: c.blockRangeDAO, blockRangeDAO: c.blockRangeDAO,
chainClient: chainClient, chainClient: c.chainClient,
balanceCacher: c.balanceCacher, balanceCacher: c.balanceCacher,
feed: c.feed, feed: c.feed,
noLimit: false, noLimit: false,
@ -1311,42 +1303,3 @@ func nextRange(maxRangeSize int, prevFrom, zeroBlockNumber *big.Int) (*big.Int,
return from, to return from, to
} }
func accountLimiterTag(account common.Address) string {
return transferHistoryTag + "_" + account.String()
}
func createChainClientWithLimiter(client chain.ClientInterface, account common.Address, limiter rpclimiter.RequestLimiter) (chain.ClientInterface, error) {
// Each account has its own limit and a global limit for all accounts
accountTag := accountLimiterTag(account)
chainClient := chain.ClientWithTag(client, accountTag, transferHistoryTag)
// Check if limit is already reached, then skip the comamnd
if allow, err := limiter.Allow(accountTag); !allow {
log.Info("fetchHistoryBlocksForAccount limit reached", "account", account, "chain", chainClient.NetworkID(), "error", err)
return nil, err
}
if allow, err := limiter.Allow(transferHistoryTag); !allow {
log.Info("fetchHistoryBlocksForAccount common limit reached", "chain", chainClient.NetworkID(), "error", err)
return nil, err
}
limit, _ := limiter.GetLimit(accountTag)
if limit == nil {
err := limiter.SetLimit(accountTag, transferHistoryLimitPerAccount, rpclimiter.LimitInfinitely)
if err != nil {
log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "accountTag", accountTag)
}
}
// Here total limit per day is overwriten on each app start, that still saves us RPC calls, but allows to proceed
// after app restart if the limit was reached. Currently there is no way to reset the limit from UI
err := limiter.SetLimit(transferHistoryTag, transferHistoryLimit, transferHistoryLimitPeriod)
if err != nil {
log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "groupTag", transferHistoryTag)
}
chainClient.SetLimiter(limiter)
return chainClient, nil
}

View File

@ -38,7 +38,6 @@ import (
statusRpc "github.com/status-im/status-go/rpc" statusRpc "github.com/status-im/status-go/rpc"
ethclient "github.com/status-im/status-go/rpc/chain/ethclient" ethclient "github.com/status-im/status-go/rpc/chain/ethclient"
mock_client "github.com/status-im/status-go/rpc/chain/mock/client" mock_client "github.com/status-im/status-go/rpc/chain/mock/client"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
mock_rpcclient "github.com/status-im/status-go/rpc/mock/client" mock_rpcclient "github.com/status-im/status-go/rpc/mock/client"
"github.com/status-im/status-go/rpc/network" "github.com/status-im/status-go/rpc/network"
"github.com/status-im/status-go/server" "github.com/status-im/status-go/server"
@ -68,7 +67,6 @@ type TestClient struct {
rw sync.RWMutex rw sync.RWMutex
callsCounter map[string]int callsCounter map[string]int
currentBlock uint64 currentBlock uint64
limiter rpclimiter.RequestLimiter
tag string tag string
groupTag string groupTag string
} }
@ -724,14 +722,6 @@ func (tc *TestClient) IsConnected() bool {
return true return true
} }
func (tc *TestClient) GetLimiter() rpclimiter.RequestLimiter {
return tc.limiter
}
func (tc *TestClient) SetLimiter(limiter rpclimiter.RequestLimiter) {
tc.limiter = limiter
}
func (tc *TestClient) Close() { func (tc *TestClient) Close() {
if tc.traceAPICalls { if tc.traceAPICalls {
tc.t.Log("Close") tc.t.Log("Close")
@ -1057,17 +1047,6 @@ func setupFindBlocksCommand(t *testing.T, accountAddress common.Address, fromBlo
// Reimplement the common function that is called from every method to check for the limit // Reimplement the common function that is called from every method to check for the limit
countAndlog = func(tc *TestClient, method string, params ...interface{}) error { countAndlog = func(tc *TestClient, method string, params ...interface{}) error {
if tc.GetLimiter() != nil {
if allow, _ := tc.GetLimiter().Allow(tc.tag); !allow {
t.Log("ERROR: requests over limit")
return rpclimiter.ErrRequestsOverLimit
}
if allow, _ := tc.GetLimiter().Allow(tc.groupTag); !allow {
t.Log("ERROR: requests over limit for group tag")
return rpclimiter.ErrRequestsOverLimit
}
}
tc.incCounter(method) tc.incCounter(method)
if tc.traceAPICalls { if tc.traceAPICalls {
if len(params) > 0 { if len(params) > 0 {
@ -1193,34 +1172,7 @@ func TestFindBlocksCommand(t *testing.T) {
} }
} }
func TestFindBlocksCommandWithLimiter(t *testing.T) { func TestFindBlocksCommandTagDifferentThanTransfers(t *testing.T) {
maxRequests := 1
rangeSize := 20
accountAddress := common.HexToAddress("0x1234")
balances := map[common.Address][][]int{accountAddress: {{5, 1, 0}, {20, 2, 0}, {45, 1, 1}, {46, 50, 0}, {75, 0, 1}}}
fbc, tc, blockChannel, _ := setupFindBlocksCommand(t, accountAddress, big.NewInt(0), big.NewInt(20), rangeSize, balances, nil, nil, nil, nil)
limiter := rpclimiter.NewRequestLimiter(rpclimiter.NewInMemRequestsMapStorage())
err := limiter.SetLimit(transferHistoryTag, maxRequests, time.Hour)
require.NoError(t, err)
tc.SetLimiter(limiter)
tc.tag = transferHistoryTag
ctx := context.Background()
group := async.NewAtomicGroup(ctx)
group.Add(fbc.Command(1 * time.Millisecond))
select {
case <-ctx.Done():
t.Log("ERROR")
case <-group.WaitAsync():
close(blockChannel)
require.Error(t, rpclimiter.ErrRequestsOverLimit, group.Error())
require.Equal(t, maxRequests, tc.getCounter())
}
}
func TestFindBlocksCommandWithLimiterTagDifferentThanTransfers(t *testing.T) {
rangeSize := 20 rangeSize := 20
maxRequests := 1 maxRequests := 1
accountAddress := common.HexToAddress("0x1234") accountAddress := common.HexToAddress("0x1234")
@ -1229,11 +1181,6 @@ func TestFindBlocksCommandWithLimiterTagDifferentThanTransfers(t *testing.T) {
incomingERC20Transfers := map[common.Address][]testERC20Transfer{accountAddress: {{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}}} incomingERC20Transfers := map[common.Address][]testERC20Transfer{accountAddress: {{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}}}
fbc, tc, blockChannel, _ := setupFindBlocksCommand(t, accountAddress, big.NewInt(0), big.NewInt(20), rangeSize, balances, outgoingERC20Transfers, incomingERC20Transfers, nil, nil) fbc, tc, blockChannel, _ := setupFindBlocksCommand(t, accountAddress, big.NewInt(0), big.NewInt(20), rangeSize, balances, outgoingERC20Transfers, incomingERC20Transfers, nil, nil)
limiter := rpclimiter.NewRequestLimiter(rpclimiter.NewInMemRequestsMapStorage())
err := limiter.SetLimit("some-other-tag-than-transfer-history", maxRequests, time.Hour)
require.NoError(t, err)
tc.SetLimiter(limiter)
ctx := context.Background() ctx := context.Background()
group := async.NewAtomicGroup(ctx) group := async.NewAtomicGroup(ctx)
group.Add(fbc.Command(1 * time.Millisecond)) group.Add(fbc.Command(1 * time.Millisecond))
@ -1248,58 +1195,6 @@ func TestFindBlocksCommandWithLimiterTagDifferentThanTransfers(t *testing.T) {
} }
} }
func TestFindBlocksCommandWithLimiterForMultipleAccountsSameGroup(t *testing.T) {
rangeSize := 20
maxRequestsTotal := 5
limit1 := 3
limit2 := 3
account1 := common.HexToAddress("0x1234")
account2 := common.HexToAddress("0x5678")
balances := map[common.Address][][]int{account1: {{5, 1, 0}, {20, 2, 0}, {45, 1, 1}, {46, 50, 0}, {75, 0, 1}}, account2: {{5, 1, 0}, {20, 2, 0}, {45, 1, 1}, {46, 50, 0}, {75, 0, 1}}}
outgoingERC20Transfers := map[common.Address][]testERC20Transfer{account1: {{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}}}
incomingERC20Transfers := map[common.Address][]testERC20Transfer{account2: {{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}}}
// Limiters share the same storage
storage := rpclimiter.NewInMemRequestsMapStorage()
// Set up the first account
fbc, tc, blockChannel, _ := setupFindBlocksCommand(t, account1, big.NewInt(0), big.NewInt(20), rangeSize, balances, outgoingERC20Transfers, nil, nil, nil)
tc.tag = transferHistoryTag + account1.String()
tc.groupTag = transferHistoryTag
limiter1 := rpclimiter.NewRequestLimiter(storage)
err := limiter1.SetLimit(transferHistoryTag, maxRequestsTotal, time.Hour)
require.NoError(t, err)
err = limiter1.SetLimit(transferHistoryTag+account1.String(), limit1, time.Hour)
require.NoError(t, err)
tc.SetLimiter(limiter1)
// Set up the second account
fbc2, tc2, _, _ := setupFindBlocksCommand(t, account2, big.NewInt(0), big.NewInt(20), rangeSize, balances, nil, incomingERC20Transfers, nil, nil)
tc2.tag = transferHistoryTag + account2.String()
tc2.groupTag = transferHistoryTag
limiter2 := rpclimiter.NewRequestLimiter(storage)
err = limiter2.SetLimit(transferHistoryTag, maxRequestsTotal, time.Hour)
require.NoError(t, err)
err = limiter2.SetLimit(transferHistoryTag+account2.String(), limit2, time.Hour)
require.NoError(t, err)
tc2.SetLimiter(limiter2)
fbc2.blocksLoadedCh = blockChannel
ctx := context.Background()
group := async.NewGroup(ctx)
group.Add(fbc.Command(1 * time.Millisecond))
group.Add(fbc2.Command(1 * time.Millisecond))
select {
case <-ctx.Done():
t.Log("ERROR")
case <-group.WaitAsync():
close(blockChannel)
require.LessOrEqual(t, tc.getCounter(), maxRequestsTotal)
}
}
type MockETHClient struct { type MockETHClient struct {
mock.Mock mock.Mock
} }

View File

@ -15,7 +15,6 @@ import (
gocommon "github.com/status-im/status-go/common" gocommon "github.com/status-im/status-go/common"
statusaccounts "github.com/status-im/status-go/multiaccounts/accounts" statusaccounts "github.com/status-im/status-go/multiaccounts/accounts"
"github.com/status-im/status-go/rpc" "github.com/status-im/status-go/rpc"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
"github.com/status-im/status-go/services/accounts/accountsevent" "github.com/status-im/status-go/services/accounts/accountsevent"
"github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/services/wallet/balance"
"github.com/status-im/status-go/services/wallet/blockchainstate" "github.com/status-im/status-go/services/wallet/blockchainstate"
@ -263,12 +262,6 @@ func (c *Controller) cleanUpRemovedAccount(address common.Address) {
if err != nil { if err != nil {
log.Error("Failed to delete multitransactions", "error", err) log.Error("Failed to delete multitransactions", "error", err)
} }
rpcLimitsStorage := rpclimiter.NewLimitsDBStorage(c.db.client)
err = rpcLimitsStorage.Delete(accountLimiterTag(address))
if err != nil {
log.Error("Failed to delete limits", "error", err)
}
} }
func (c *Controller) cleanupAccountsLeftovers() error { func (c *Controller) cleanupAccountsLeftovers() error {

View File

@ -9,7 +9,6 @@ import (
"github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain"
"github.com/status-im/status-go/rpc/chain/ethclient" "github.com/status-im/status-go/rpc/chain/ethclient"
"github.com/status-im/status-go/rpc/chain/rpclimiter"
"github.com/stretchr/testify/suite" "github.com/stretchr/testify/suite"
"go.uber.org/mock/gomock" "go.uber.org/mock/gomock"
@ -75,7 +74,7 @@ func (s *TransactorSuite) SetupTest() {
rpcClient.UpstreamChainID = chainID rpcClient.UpstreamChainID = chainID
ethClients := []ethclient.RPSLimitedEthClientInterface{ ethClients := []ethclient.RPSLimitedEthClientInterface{
ethclient.NewRPSLimitedEthClient(s.client, rpclimiter.NewRPCRpsLimiter(), "local-1-chain-id-1"), ethclient.NewRPSLimitedEthClient(s.client, "local-1-chain-id-1"),
} }
localClient := chain.NewClient(ethClients, chainID, nil) localClient := chain.NewClient(ethClients, chainID, nil)
rpcClient.SetClient(chainID, localClient) rpcClient.SetClient(chainID, localClient)