diff --git a/healthmanager/provider_errors/provider_errors.go b/healthmanager/provider_errors/provider_errors.go index bec09d0a1..1ac986535 100644 --- a/healthmanager/provider_errors/provider_errors.go +++ b/healthmanager/provider_errors/provider_errors.go @@ -11,7 +11,6 @@ import ( "github.com/ethereum/go-ethereum/accounts/abi/bind" "github.com/ethereum/go-ethereum/core/vm" "github.com/ethereum/go-ethereum/rpc" - "github.com/status-im/status-go/rpc/chain/rpclimiter" ) // ProviderErrorType defines the type of non-RPC error for JSON serialization. @@ -106,10 +105,6 @@ func IsRateLimitError(err error) bool { return true } - if errors.Is(err, rpclimiter.ErrRequestsOverLimit) { - return true - } - errMsg := strings.ToLower(err.Error()) if strings.Contains(errMsg, "backoff_seconds") || strings.Contains(errMsg, "has exceeded its throughput limit") || diff --git a/healthmanager/rpcstatus/provider_status_test.go b/healthmanager/rpcstatus/provider_status_test.go index 3a4f73b80..425e7be59 100644 --- a/healthmanager/rpcstatus/provider_status_test.go +++ b/healthmanager/rpcstatus/provider_status_test.go @@ -4,8 +4,6 @@ import ( "errors" "testing" "time" - - "github.com/status-im/status-go/rpc/chain/rpclimiter" ) func TestNewRpcProviderStatus(t *testing.T) { @@ -39,18 +37,6 @@ func TestNewRpcProviderStatus(t *testing.T) { Status: StatusDown, }, }, - { - name: "Non-critical RPC error, should be up", - res: RpcProviderCallStatus{ - Name: "Provider2", - Timestamp: time.Now(), - Err: rpclimiter.ErrRequestsOverLimit, // Assuming this is non-critical - }, - expected: ProviderStatus{ - Name: "Provider2", - Status: StatusUp, - }, - }, } for _, tt := range tests { diff --git a/rpc/chain/blockchain_health_test.go b/rpc/chain/blockchain_health_test.go index 7665ee7f9..9179134e9 100644 --- a/rpc/chain/blockchain_health_test.go +++ b/rpc/chain/blockchain_health_test.go @@ -51,7 +51,6 @@ func (s *BlockchainHealthManagerSuite) setupClients(chainIDs []uint64) { for _, chainID := range chainIDs { mockEthClient := mockEthclient.NewMockRPSLimitedEthClientInterface(s.mockCtrl) mockEthClient.EXPECT().GetName().AnyTimes().Return(fmt.Sprintf("test_client_chain_%d", chainID)) - mockEthClient.EXPECT().GetLimiter().AnyTimes().Return(nil) phm := healthmanager.NewProvidersHealthManager(chainID) client := NewClient([]ethclient.RPSLimitedEthClientInterface{mockEthClient}, chainID, phm) diff --git a/rpc/chain/client.go b/rpc/chain/client.go index ae414d911..f898dc102 100644 --- a/rpc/chain/client.go +++ b/rpc/chain/client.go @@ -22,7 +22,6 @@ import ( "github.com/status-im/status-go/healthmanager" "github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/status-im/status-go/rpc/chain/ethclient" - "github.com/status-im/status-go/rpc/chain/rpclimiter" "github.com/status-im/status-go/rpc/chain/tagger" "github.com/status-im/status-go/services/rpcstats" "github.com/status-im/status-go/services/wallet/connection" @@ -35,8 +34,6 @@ type ClientInterface interface { GetWalletNotifier() func(chainId uint64, message string) SetWalletNotifier(notifier func(chainId uint64, message string)) connection.Connectable - GetLimiter() rpclimiter.RequestLimiter - SetLimiter(rpclimiter.RequestLimiter) } type HealthMonitor interface { @@ -66,7 +63,6 @@ func ClientWithTag(chainClient ClientInterface, tag, groupTag string) ClientInte type ClientWithFallback struct { ChainID uint64 ethClients []ethclient.RPSLimitedEthClientInterface - commonLimiter rpclimiter.RequestLimiter // FIXME: remove from RPC client https://github.com/status-im/status-go/issues/5942 circuitbreaker *circuitbreaker.CircuitBreaker providersHealthManager *healthmanager.ProvidersHealthManager @@ -83,7 +79,6 @@ func (c *ClientWithFallback) Copy() interface{} { return &ClientWithFallback{ ChainID: c.ChainID, ethClients: c.ethClients, - commonLimiter: c.commonLimiter, circuitbreaker: c.circuitbreaker, WalletNotifier: c.WalletNotifier, isConnected: c.isConnected, @@ -765,7 +760,7 @@ func (c *ClientWithFallback) SetWalletNotifier(notifier func(chainId uint64, mes func (c *ClientWithFallback) toggleConnectionState(err error) { connected := true if err != nil { - if !isNotFoundError(err) && !isVMError(err) && !errors.Is(err, rpclimiter.ErrRequestsOverLimit) && !errors.Is(err, context.Canceled) { + if !isNotFoundError(err) && !isVMError(err) && !errors.Is(err, context.Canceled) { log.Warn("Error not in chain call", "error", err, "chain", c.ChainID) connected = false } else { @@ -796,14 +791,6 @@ func (c *ClientWithFallback) DeepCopyTag() tagger.Tagger { return © } -func (c *ClientWithFallback) GetLimiter() rpclimiter.RequestLimiter { - return c.commonLimiter -} - -func (c *ClientWithFallback) SetLimiter(limiter rpclimiter.RequestLimiter) { - c.commonLimiter = limiter -} - func (c *ClientWithFallback) GetCircuitBreaker() *circuitbreaker.CircuitBreaker { return c.circuitbreaker } diff --git a/rpc/chain/client_health_test.go b/rpc/chain/client_health_test.go index ca5ae161b..9864386f3 100644 --- a/rpc/chain/client_health_test.go +++ b/rpc/chain/client_health_test.go @@ -17,7 +17,6 @@ import ( healthManager "github.com/status-im/status-go/healthmanager" "github.com/status-im/status-go/healthmanager/rpcstatus" "github.com/status-im/status-go/rpc/chain/ethclient" - "github.com/status-im/status-go/rpc/chain/rpclimiter" mockEthclient "github.com/status-im/status-go/rpc/chain/ethclient/mock/client/ethclient" ) @@ -45,7 +44,6 @@ func (s *ClientWithFallbackSuite) setupClients(numClients int) { for i := 0; i < numClients; i++ { ethClient := mockEthclient.NewMockRPSLimitedEthClientInterface(s.mockCtrl) ethClient.EXPECT().GetName().AnyTimes().Return("test" + strconv.Itoa(i)) - ethClient.EXPECT().GetLimiter().AnyTimes().Return(nil) s.mockEthClients = append(s.mockEthClients, ethClient) ethClients = append(ethClients, ethClient) @@ -104,7 +102,8 @@ func (s *ClientWithFallbackSuite) TestRPSLimitErrorDoesNotMarkChainDown() { hash := common.HexToHash("0x1234") // WHEN - s.mockEthClients[0].EXPECT().BlockByHash(ctx, hash).Return(nil, rpclimiter.ErrRequestsOverLimit).Times(1) + rpsError := errors.New("Request rate exceeded") + s.mockEthClients[0].EXPECT().BlockByHash(ctx, hash).Return(nil, rpsError).Times(1) _, err := s.client.BlockByHash(ctx, hash) require.Error(s.T(), err) @@ -186,7 +185,7 @@ func (s *ClientWithFallbackSuite) TestAllClientsDifferentErrors() { // GIVEN s.mockEthClients[0].EXPECT().BlockByHash(ctx, hash).Return(nil, errors.New("no such host")).Times(1) - s.mockEthClients[1].EXPECT().BlockByHash(ctx, hash).Return(nil, rpclimiter.ErrRequestsOverLimit).Times(1) + s.mockEthClients[1].EXPECT().BlockByHash(ctx, hash).Return(nil, errors.New("request rate exceeded")).Times(1) s.mockEthClients[2].EXPECT().BlockByHash(ctx, hash).Return(nil, vm.ErrOutOfGas).Times(1) // WHEN diff --git a/rpc/chain/client_test.go b/rpc/chain/client_test.go index 13a7577a5..c10a62be1 100644 --- a/rpc/chain/client_test.go +++ b/rpc/chain/client_test.go @@ -26,7 +26,6 @@ func setupClientTest(t *testing.T) (*ClientWithFallback, []*mock_ethclient.MockR for i := 0; i < 3; i++ { ethCl := mock_ethclient.NewMockRPSLimitedEthClientInterface(mockCtrl) ethCl.EXPECT().GetName().AnyTimes().Return("test" + strconv.Itoa(i)) - ethCl.EXPECT().GetLimiter().AnyTimes().Return(nil) mockEthClients = append(mockEthClients, ethCl) ethClients = append(ethClients, ethCl) diff --git a/rpc/chain/ethclient/rps_limited_eth_client.go b/rpc/chain/ethclient/rps_limited_eth_client.go index 193f1a0ba..7757d096a 100644 --- a/rpc/chain/ethclient/rps_limited_eth_client.go +++ b/rpc/chain/ethclient/rps_limited_eth_client.go @@ -4,7 +4,6 @@ package ethclient import ( "github.com/ethereum/go-ethereum/rpc" - "github.com/status-im/status-go/rpc/chain/rpclimiter" ) // RPSLimitedEthClientInterface extends EthClientInterface with additional @@ -14,33 +13,26 @@ import ( // PRS limiting. fallback mechanisms or caching. type RPSLimitedEthClientInterface interface { EthClientInterface - GetLimiter() *rpclimiter.RPCRpsLimiter GetName() string CopyWithName(name string) RPSLimitedEthClientInterface } type RPSLimitedEthClient struct { *EthClient - limiter *rpclimiter.RPCRpsLimiter - name string + name string } -func NewRPSLimitedEthClient(rpcClient *rpc.Client, limiter *rpclimiter.RPCRpsLimiter, name string) *RPSLimitedEthClient { +func NewRPSLimitedEthClient(rpcClient *rpc.Client, name string) *RPSLimitedEthClient { return &RPSLimitedEthClient{ EthClient: NewEthClient(rpcClient), - limiter: limiter, name: name, } } -func (c *RPSLimitedEthClient) GetLimiter() *rpclimiter.RPCRpsLimiter { - return c.limiter -} - func (c *RPSLimitedEthClient) GetName() string { return c.name } func (c *RPSLimitedEthClient) CopyWithName(name string) RPSLimitedEthClientInterface { - return NewRPSLimitedEthClient(c.rpcClient, c.limiter, name) + return NewRPSLimitedEthClient(c.rpcClient, name) } diff --git a/rpc/chain/rpclimiter/rpc_limiter.go b/rpc/chain/rpclimiter/rpc_limiter.go deleted file mode 100644 index e33b10b00..000000000 --- a/rpc/chain/rpclimiter/rpc_limiter.go +++ /dev/null @@ -1,356 +0,0 @@ -package rpclimiter - -import ( - "database/sql" - "errors" - "fmt" - "sync" - "time" - - "github.com/google/uuid" - - "github.com/ethereum/go-ethereum/log" - gocommon "github.com/status-im/status-go/common" -) - -const ( - defaultMaxRequestsPerSecond = 50 - minRequestsPerSecond = 20 - requestsPerSecondStep = 10 - - tickerInterval = 1 * time.Second - LimitInfinitely = 0 -) - -var ( - ErrRequestsOverLimit = errors.New("number of requests over limit") -) - -type callerOnWait struct { - requests int - ch chan bool -} - -type LimitsStorage interface { - Get(tag string) (*LimitData, error) - Set(data *LimitData) error - Delete(tag string) error -} - -type InMemRequestsMapStorage struct { - data sync.Map -} - -func NewInMemRequestsMapStorage() *InMemRequestsMapStorage { - return &InMemRequestsMapStorage{} -} - -func (s *InMemRequestsMapStorage) Get(tag string) (*LimitData, error) { - data, ok := s.data.Load(tag) - if !ok { - return nil, nil - } - - return data.(*LimitData), nil -} - -func (s *InMemRequestsMapStorage) Set(data *LimitData) error { - if data == nil { - return fmt.Errorf("data is nil") - } - - s.data.Store(data.Tag, data) - return nil -} - -func (s *InMemRequestsMapStorage) Delete(tag string) error { - s.data.Delete(tag) - return nil -} - -type LimitsDBStorage struct { - db *RPCLimiterDB -} - -func NewLimitsDBStorage(db *sql.DB) *LimitsDBStorage { - return &LimitsDBStorage{ - db: NewRPCLimiterDB(db), - } -} - -func (s *LimitsDBStorage) Get(tag string) (*LimitData, error) { - return s.db.GetRPCLimit(tag) -} - -func (s *LimitsDBStorage) Set(data *LimitData) error { - if data == nil { - return fmt.Errorf("data is nil") - } - - limit, err := s.db.GetRPCLimit(data.Tag) - if err != nil && err != sql.ErrNoRows { - return err - } - - if limit == nil { - return s.db.CreateRPCLimit(*data) - } - - return s.db.UpdateRPCLimit(*data) -} - -func (s *LimitsDBStorage) Delete(tag string) error { - return s.db.DeleteRPCLimit(tag) -} - -type LimitData struct { - Tag string - CreatedAt time.Time - Period time.Duration - MaxReqs int - NumReqs int -} - -type RequestLimiter interface { - SetLimit(tag string, maxRequests int, interval time.Duration) error - GetLimit(tag string) (*LimitData, error) - DeleteLimit(tag string) error - Allow(tag string) (bool, error) -} - -type RPCRequestLimiter struct { - storage LimitsStorage - mu sync.Mutex -} - -func NewRequestLimiter(storage LimitsStorage) *RPCRequestLimiter { - return &RPCRequestLimiter{ - storage: storage, - } -} - -func (rl *RPCRequestLimiter) SetLimit(tag string, maxRequests int, interval time.Duration) error { - err := rl.saveToStorage(tag, maxRequests, interval, 0, time.Now()) - if err != nil { - log.Error("Failed to save request data to storage", "error", err) - return err - } - - return nil -} - -func (rl *RPCRequestLimiter) GetLimit(tag string) (*LimitData, error) { - data, err := rl.storage.Get(tag) - if err != nil { - return nil, err - } - - return data, nil -} - -func (rl *RPCRequestLimiter) DeleteLimit(tag string) error { - err := rl.storage.Delete(tag) - if err != nil { - log.Error("Failed to delete request data from storage", "error", err) - return err - } - - return nil -} - -func (rl *RPCRequestLimiter) saveToStorage(tag string, maxRequests int, interval time.Duration, numReqs int, timestamp time.Time) error { - data := &LimitData{ - Tag: tag, - CreatedAt: timestamp, - Period: interval, - MaxReqs: maxRequests, - NumReqs: numReqs, - } - - err := rl.storage.Set(data) - if err != nil { - log.Error("Failed to save request data to storage", "error", err) - return err - } - - return nil -} - -func (rl *RPCRequestLimiter) Allow(tag string) (bool, error) { - rl.mu.Lock() - defer rl.mu.Unlock() - - data, err := rl.storage.Get(tag) - if err != nil { - return true, err - } - - if data == nil { - return true, nil - } - - // Check if the interval has passed and reset the number of requests - if time.Since(data.CreatedAt) >= data.Period && data.Period.Milliseconds() != LimitInfinitely { - err = rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now()) - if err != nil { - return true, err - } - - return true, nil - } - - // Check if a number of requests is over the limit within the interval - if time.Since(data.CreatedAt) < data.Period || data.Period.Milliseconds() == LimitInfinitely { - if data.NumReqs >= data.MaxReqs { - log.Info("Number of requests over limit", - "tag", tag, - "numReqs", data.NumReqs, - "maxReqs", data.MaxReqs, - "period", data.Period, - "createdAt", data.CreatedAt.UTC()) - return false, ErrRequestsOverLimit - } - - return true, rl.saveToStorage(tag, data.MaxReqs, data.Period, data.NumReqs+1, data.CreatedAt) - } - - // Reset the number of requests if the interval has passed - return true, rl.saveToStorage(tag, data.MaxReqs, data.Period, 0, time.Now()) // still allow the request if failed to save as not critical -} - -type RPCRpsLimiter struct { - uuid uuid.UUID - - maxRequestsPerSecond int - maxRequestsPerSecondMutex sync.RWMutex - - requestsMadeWithinSecond int - requestsMadeWithinSecondMutex sync.RWMutex - - callersOnWaitForRequests []callerOnWait - callersOnWaitForRequestsMutex sync.RWMutex - - quit chan bool -} - -func NewRPCRpsLimiter() *RPCRpsLimiter { - - limiter := RPCRpsLimiter{ - uuid: uuid.New(), - maxRequestsPerSecond: defaultMaxRequestsPerSecond, - quit: make(chan bool), - } - - limiter.start() - - return &limiter -} - -func (rl *RPCRpsLimiter) ReduceLimit() { - rl.maxRequestsPerSecondMutex.Lock() - defer rl.maxRequestsPerSecondMutex.Unlock() - if rl.maxRequestsPerSecond <= minRequestsPerSecond { - return - } - rl.maxRequestsPerSecond = rl.maxRequestsPerSecond - requestsPerSecondStep -} - -func (rl *RPCRpsLimiter) start() { - ticker := time.NewTicker(tickerInterval) - go func() { - defer gocommon.LogOnPanic() - for { - select { - case <-ticker.C: - { - rl.requestsMadeWithinSecondMutex.Lock() - oldrequestsMadeWithinSecond := rl.requestsMadeWithinSecond - if rl.requestsMadeWithinSecond != 0 { - rl.requestsMadeWithinSecond = 0 - } - rl.requestsMadeWithinSecondMutex.Unlock() - if oldrequestsMadeWithinSecond == 0 { - continue - } - } - - rl.callersOnWaitForRequestsMutex.Lock() - numOfRequestsToMakeAvailable := rl.maxRequestsPerSecond - for { - if numOfRequestsToMakeAvailable == 0 || len(rl.callersOnWaitForRequests) == 0 { - break - } - - var index = -1 - for i := 0; i < len(rl.callersOnWaitForRequests); i++ { - if rl.callersOnWaitForRequests[i].requests <= numOfRequestsToMakeAvailable { - index = i - break - } - } - - if index == -1 { - break - } - - callerOnWait := rl.callersOnWaitForRequests[index] - numOfRequestsToMakeAvailable -= callerOnWait.requests - rl.callersOnWaitForRequests = append(rl.callersOnWaitForRequests[:index], rl.callersOnWaitForRequests[index+1:]...) - - callerOnWait.ch <- true - } - rl.callersOnWaitForRequestsMutex.Unlock() - - case <-rl.quit: - ticker.Stop() - return - } - } - }() -} - -func (rl *RPCRpsLimiter) Stop() { - rl.quit <- true - close(rl.quit) - for _, callerOnWait := range rl.callersOnWaitForRequests { - close(callerOnWait.ch) - } - rl.callersOnWaitForRequests = nil -} - -func (rl *RPCRpsLimiter) WaitForRequestsAvailability(requests int) error { - if requests > rl.maxRequestsPerSecond { - return ErrRequestsOverLimit - } - - { - rl.requestsMadeWithinSecondMutex.Lock() - if rl.requestsMadeWithinSecond+requests <= rl.maxRequestsPerSecond { - rl.requestsMadeWithinSecond += requests - rl.requestsMadeWithinSecondMutex.Unlock() - return nil - } - rl.requestsMadeWithinSecondMutex.Unlock() - } - - callerOnWait := callerOnWait{ - requests: requests, - ch: make(chan bool), - } - - { - rl.callersOnWaitForRequestsMutex.Lock() - rl.callersOnWaitForRequests = append(rl.callersOnWaitForRequests, callerOnWait) - rl.callersOnWaitForRequestsMutex.Unlock() - } - - <-callerOnWait.ch - - close(callerOnWait.ch) - - rl.requestsMadeWithinSecondMutex.Lock() - rl.requestsMadeWithinSecond += requests - rl.requestsMadeWithinSecondMutex.Unlock() - - return nil -} diff --git a/rpc/chain/rpclimiter/rpc_limiter_db.go b/rpc/chain/rpclimiter/rpc_limiter_db.go deleted file mode 100644 index 54789d68a..000000000 --- a/rpc/chain/rpclimiter/rpc_limiter_db.go +++ /dev/null @@ -1,57 +0,0 @@ -package rpclimiter - -import ( - "database/sql" - "time" -) - -type RPCLimiterDB struct { - db *sql.DB -} - -func NewRPCLimiterDB(db *sql.DB) *RPCLimiterDB { - return &RPCLimiterDB{ - db: db, - } -} - -func (r *RPCLimiterDB) CreateRPCLimit(limit LimitData) error { - query := `INSERT INTO rpc_limits (tag, created_at, period, max_requests, counter) VALUES (?, ?, ?, ?, ?)` - _, err := r.db.Exec(query, limit.Tag, limit.CreatedAt.Unix(), limit.Period, limit.MaxReqs, limit.NumReqs) - if err != nil { - return err - } - return nil -} - -func (r *RPCLimiterDB) GetRPCLimit(tag string) (*LimitData, error) { - query := `SELECT tag, created_at, period, max_requests, counter FROM rpc_limits WHERE tag = ?` - row := r.db.QueryRow(query, tag) - limit := &LimitData{} - createdAtSecs := int64(0) - err := row.Scan(&limit.Tag, &createdAtSecs, &limit.Period, &limit.MaxReqs, &limit.NumReqs) - if err != nil { - return nil, err - } - - limit.CreatedAt = time.Unix(createdAtSecs, 0) - return limit, nil -} - -func (r *RPCLimiterDB) UpdateRPCLimit(limit LimitData) error { - query := `UPDATE rpc_limits SET created_at = ?, period = ?, max_requests = ?, counter = ? WHERE tag = ?` - _, err := r.db.Exec(query, limit.CreatedAt.Unix(), limit.Period, limit.MaxReqs, limit.NumReqs, limit.Tag) - if err != nil { - return err - } - return nil -} - -func (r *RPCLimiterDB) DeleteRPCLimit(tag string) error { - query := `DELETE FROM rpc_limits WHERE tag = ?` - _, err := r.db.Exec(query, tag) - if err != nil && err != sql.ErrNoRows { - return err - } - return nil -} diff --git a/rpc/chain/rpclimiter/rpc_limiter_test.go b/rpc/chain/rpclimiter/rpc_limiter_test.go deleted file mode 100644 index 0e0837a14..000000000 --- a/rpc/chain/rpclimiter/rpc_limiter_test.go +++ /dev/null @@ -1,195 +0,0 @@ -package rpclimiter - -import ( - "testing" - "time" - - "github.com/stretchr/testify/require" -) - -func setupTest() (*InMemRequestsMapStorage, RequestLimiter) { - storage := NewInMemRequestsMapStorage() - rl := NewRequestLimiter(storage) - return storage, rl -} - -func TestSetLimit(t *testing.T) { - storage, rl := setupTest() - - // Define test inputs - tag := "testTag" - maxRequests := 10 - interval := time.Second - - // Call the SetLimit method - err := rl.SetLimit(tag, maxRequests, interval) - require.NoError(t, err) - - // Verify that the data was saved to storage correctly - data, err := storage.Get(tag) - require.NoError(t, err) - require.Equal(t, tag, data.Tag) - require.Equal(t, interval, data.Period) - require.Equal(t, maxRequests, data.MaxReqs) - require.Equal(t, 0, data.NumReqs) -} - -func TestGetLimit(t *testing.T) { - storage, rl := setupTest() - - // Define test inputs - data := &LimitData{ - Tag: "testTag", - Period: time.Second, - MaxReqs: 10, - NumReqs: 1, - } - err := storage.Set(data) - require.NoError(t, err) - - // Call the GetLimit method - ret, err := rl.GetLimit(data.Tag) - require.NoError(t, err) - - // Verify the returned data - require.Equal(t, data, ret) -} - -func TestDeleteLimit(t *testing.T) { - storage, rl := setupTest() - - // Define test inputs - tag := "testTag" - data := &LimitData{ - Tag: tag, - Period: time.Second, - MaxReqs: 10, - NumReqs: 1, - } - err := storage.Set(data) - require.NoError(t, err) - - // Call the DeleteLimit method - err = rl.DeleteLimit(tag) - require.NoError(t, err) - - // Verify that the data was deleted from storage - limit, _ := storage.Get(tag) - require.Nil(t, limit) - - // Test double delete - err = rl.DeleteLimit(tag) - require.NoError(t, err) -} - -func TestAllowWithinPeriod(t *testing.T) { - storage, rl := setupTest() - - // Define test inputs - tag := "testTag" - maxRequests := 10 - interval := time.Second - - // Set up the storage with test data - data := &LimitData{ - Tag: tag, - Period: interval, - CreatedAt: time.Now(), - MaxReqs: maxRequests, - } - err := storage.Set(data) - require.NoError(t, err) - - // Call the Allow method - for i := 0; i < maxRequests; i++ { - allow, err := rl.Allow(tag) - require.NoError(t, err) - - // Verify the result - require.True(t, allow) - } - - // Call the Allow method again - allow, err := rl.Allow(tag) - require.ErrorIs(t, err, ErrRequestsOverLimit) - require.False(t, allow) -} - -func TestAllowWhenPeriodPassed(t *testing.T) { - storage, rl := setupTest() - - // Define test inputs - tag := "testTag" - maxRequests := 10 - interval := time.Second - - // Set up the storage with test data - data := &LimitData{ - Tag: tag, - Period: interval, - CreatedAt: time.Now().Add(-interval), - MaxReqs: maxRequests, - NumReqs: maxRequests, - } - err := storage.Set(data) - require.NoError(t, err) - - // Call the Allow method - allow, err := rl.Allow(tag) - require.NoError(t, err) - - // Verify the result - require.True(t, allow) -} - -func TestAllowRestrictInfinitelyWhenLimitReached(t *testing.T) { - storage, rl := setupTest() - - // Define test inputs - tag := "testTag" - maxRequests := 10 - - // Set up the storage with test data - data := &LimitData{ - Tag: tag, - Period: LimitInfinitely, - CreatedAt: time.Now(), - MaxReqs: maxRequests, - NumReqs: maxRequests, - } - err := storage.Set(data) - require.NoError(t, err) - - // Call the Allow method - allow, err := rl.Allow(tag) - require.ErrorIs(t, err, ErrRequestsOverLimit) - - // Verify the result - require.False(t, allow) -} - -func TestAllowWhenLimitNotReachedForInfinitePeriod(t *testing.T) { - storage, rl := setupTest() - - // Define test inputs - tag := "testTag" - maxRequests := 10 - - // Set up the storage with test data - data := &LimitData{ - Tag: tag, - Period: LimitInfinitely, - CreatedAt: time.Now(), - MaxReqs: maxRequests, - NumReqs: maxRequests - 1, - } - err := storage.Set(data) - require.NoError(t, err) - - // Call the Allow method - allow, err := rl.Allow(tag) - require.NoError(t, err) - - // Verify the result - require.True(t, allow) -} diff --git a/rpc/client.go b/rpc/client.go index a4b84770b..f399c85ad 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -25,7 +25,6 @@ import ( "github.com/status-im/status-go/params" "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain/ethclient" - "github.com/status-im/status-go/rpc/chain/rpclimiter" "github.com/status-im/status-go/rpc/network" "github.com/status-im/status-go/services/rpcstats" "github.com/status-im/status-go/services/wallet/common" @@ -97,11 +96,10 @@ type Client struct { UpstreamChainID uint64 - local *gethrpc.Client - rpcClientsMutex sync.RWMutex - rpcClients map[uint64]chain.ClientInterface - rpsLimiterMutex sync.RWMutex - limiterPerProvider map[string]*rpclimiter.RPCRpsLimiter + local *gethrpc.Client + rpcClientsMutex sync.RWMutex + rpcClients map[uint64]chain.ClientInterface + rpsLimiterMutex sync.RWMutex router *router NetworkManager *network.Manager @@ -150,15 +148,14 @@ func NewClient(config ClientConfig) (*Client, error) { } c := Client{ - local: config.Client, - NetworkManager: networkManager, - handlers: make(map[string]Handler), - rpcClients: make(map[uint64]chain.ClientInterface), - limiterPerProvider: make(map[string]*rpclimiter.RPCRpsLimiter), - log: log, - providerConfigs: config.ProviderConfigs, - healthMgr: healthmanager.NewBlockchainHealthManager(), - walletFeed: config.WalletFeed, + local: config.Client, + NetworkManager: networkManager, + handlers: make(map[string]Handler), + rpcClients: make(map[uint64]chain.ClientInterface), + log: log, + providerConfigs: config.ProviderConfigs, + healthMgr: healthmanager.NewBlockchainHealthManager(), + walletFeed: config.WalletFeed, } c.UpstreamChainID = config.UpstreamChainID @@ -239,17 +236,6 @@ func extractHostFromURL(inputURL string) (string, error) { return parsedURL.Host, nil } -func (c *Client) getRPCRpsLimiter(key string) (*rpclimiter.RPCRpsLimiter, error) { - c.rpsLimiterMutex.Lock() - defer c.rpsLimiterMutex.Unlock() - if limiter, ok := c.limiterPerProvider[key]; ok { - return limiter, nil - } - limiter := rpclimiter.NewRPCRpsLimiter() - c.limiterPerProvider[key] = limiter - return limiter, nil -} - func getProviderConfig(providerConfigs []params.ProviderConfig, providerName string) (params.ProviderConfig, error) { for _, providerConfig := range providerConfigs { if providerConfig.Name == providerName { @@ -321,7 +307,6 @@ func (c *Client) getEthClients(network *params.Network) []ethclient.RPSLimitedEt ethClients := make([]ethclient.RPSLimitedEthClientInterface, 0) for index, key := range keys { var rpcClient *gethrpc.Client - var rpcLimiter *rpclimiter.RPCRpsLimiter var err error var hostPort string url := urls[key] @@ -355,12 +340,11 @@ func (c *Client) getEthClients(network *params.Network) []ethclient.RPSLimitedEt } } - rpcLimiter, err = c.getRPCRpsLimiter(circuitKey) if err != nil { c.log.Error("get RPC limiter "+key, "error", err) } - ethClients = append(ethClients, ethclient.NewRPSLimitedEthClient(rpcClient, rpcLimiter, circuitKey)) + ethClients = append(ethClients, ethclient.NewRPSLimitedEthClient(rpcClient, circuitKey)) } } diff --git a/services/wallet/transfer/commands_sequential.go b/services/wallet/transfer/commands_sequential.go index 0ae61eaa7..225f86088 100644 --- a/services/wallet/transfer/commands_sequential.go +++ b/services/wallet/transfer/commands_sequential.go @@ -16,7 +16,6 @@ import ( nodetypes "github.com/status-im/status-go/eth-node/types" "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/rpc/chain" - "github.com/status-im/status-go/rpc/chain/rpclimiter" "github.com/status-im/status-go/services/wallet/async" "github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/services/wallet/blockchainstate" @@ -1124,13 +1123,6 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn } if len(ranges) > 0 { - storage := rpclimiter.NewLimitsDBStorage(c.db.client) - limiter := rpclimiter.NewRequestLimiter(storage) - chainClient, _ := createChainClientWithLimiter(c.chainClient, account, limiter) - if chainClient == nil { - chainClient = c.chainClient - } - for _, rangeItem := range ranges { log.Debug("range item", "r", rangeItem, "n", c.chainClient.NetworkID(), "a", account) @@ -1139,7 +1131,7 @@ func (c *loadBlocksAndTransfersCommand) fetchHistoryBlocksForAccount(group *asyn db: c.db, accountsDB: c.accountsDB, blockRangeDAO: c.blockRangeDAO, - chainClient: chainClient, + chainClient: c.chainClient, balanceCacher: c.balanceCacher, feed: c.feed, noLimit: false, @@ -1311,42 +1303,3 @@ func nextRange(maxRangeSize int, prevFrom, zeroBlockNumber *big.Int) (*big.Int, return from, to } - -func accountLimiterTag(account common.Address) string { - return transferHistoryTag + "_" + account.String() -} - -func createChainClientWithLimiter(client chain.ClientInterface, account common.Address, limiter rpclimiter.RequestLimiter) (chain.ClientInterface, error) { - // Each account has its own limit and a global limit for all accounts - accountTag := accountLimiterTag(account) - chainClient := chain.ClientWithTag(client, accountTag, transferHistoryTag) - - // Check if limit is already reached, then skip the comamnd - if allow, err := limiter.Allow(accountTag); !allow { - log.Info("fetchHistoryBlocksForAccount limit reached", "account", account, "chain", chainClient.NetworkID(), "error", err) - return nil, err - } - - if allow, err := limiter.Allow(transferHistoryTag); !allow { - log.Info("fetchHistoryBlocksForAccount common limit reached", "chain", chainClient.NetworkID(), "error", err) - return nil, err - } - - limit, _ := limiter.GetLimit(accountTag) - if limit == nil { - err := limiter.SetLimit(accountTag, transferHistoryLimitPerAccount, rpclimiter.LimitInfinitely) - if err != nil { - log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "accountTag", accountTag) - } - } - - // Here total limit per day is overwriten on each app start, that still saves us RPC calls, but allows to proceed - // after app restart if the limit was reached. Currently there is no way to reset the limit from UI - err := limiter.SetLimit(transferHistoryTag, transferHistoryLimit, transferHistoryLimitPeriod) - if err != nil { - log.Error("fetchHistoryBlocksForAccount SetLimit", "error", err, "groupTag", transferHistoryTag) - } - chainClient.SetLimiter(limiter) - - return chainClient, nil -} diff --git a/services/wallet/transfer/commands_sequential_test.go b/services/wallet/transfer/commands_sequential_test.go index 426d14668..04f07d041 100644 --- a/services/wallet/transfer/commands_sequential_test.go +++ b/services/wallet/transfer/commands_sequential_test.go @@ -38,7 +38,6 @@ import ( statusRpc "github.com/status-im/status-go/rpc" ethclient "github.com/status-im/status-go/rpc/chain/ethclient" mock_client "github.com/status-im/status-go/rpc/chain/mock/client" - "github.com/status-im/status-go/rpc/chain/rpclimiter" mock_rpcclient "github.com/status-im/status-go/rpc/mock/client" "github.com/status-im/status-go/rpc/network" "github.com/status-im/status-go/server" @@ -68,7 +67,6 @@ type TestClient struct { rw sync.RWMutex callsCounter map[string]int currentBlock uint64 - limiter rpclimiter.RequestLimiter tag string groupTag string } @@ -724,14 +722,6 @@ func (tc *TestClient) IsConnected() bool { return true } -func (tc *TestClient) GetLimiter() rpclimiter.RequestLimiter { - return tc.limiter -} - -func (tc *TestClient) SetLimiter(limiter rpclimiter.RequestLimiter) { - tc.limiter = limiter -} - func (tc *TestClient) Close() { if tc.traceAPICalls { tc.t.Log("Close") @@ -1057,17 +1047,6 @@ func setupFindBlocksCommand(t *testing.T, accountAddress common.Address, fromBlo // Reimplement the common function that is called from every method to check for the limit countAndlog = func(tc *TestClient, method string, params ...interface{}) error { - if tc.GetLimiter() != nil { - if allow, _ := tc.GetLimiter().Allow(tc.tag); !allow { - t.Log("ERROR: requests over limit") - return rpclimiter.ErrRequestsOverLimit - } - if allow, _ := tc.GetLimiter().Allow(tc.groupTag); !allow { - t.Log("ERROR: requests over limit for group tag") - return rpclimiter.ErrRequestsOverLimit - } - } - tc.incCounter(method) if tc.traceAPICalls { if len(params) > 0 { @@ -1193,34 +1172,7 @@ func TestFindBlocksCommand(t *testing.T) { } } -func TestFindBlocksCommandWithLimiter(t *testing.T) { - maxRequests := 1 - rangeSize := 20 - accountAddress := common.HexToAddress("0x1234") - balances := map[common.Address][][]int{accountAddress: {{5, 1, 0}, {20, 2, 0}, {45, 1, 1}, {46, 50, 0}, {75, 0, 1}}} - fbc, tc, blockChannel, _ := setupFindBlocksCommand(t, accountAddress, big.NewInt(0), big.NewInt(20), rangeSize, balances, nil, nil, nil, nil) - - limiter := rpclimiter.NewRequestLimiter(rpclimiter.NewInMemRequestsMapStorage()) - err := limiter.SetLimit(transferHistoryTag, maxRequests, time.Hour) - require.NoError(t, err) - tc.SetLimiter(limiter) - tc.tag = transferHistoryTag - - ctx := context.Background() - group := async.NewAtomicGroup(ctx) - group.Add(fbc.Command(1 * time.Millisecond)) - - select { - case <-ctx.Done(): - t.Log("ERROR") - case <-group.WaitAsync(): - close(blockChannel) - require.Error(t, rpclimiter.ErrRequestsOverLimit, group.Error()) - require.Equal(t, maxRequests, tc.getCounter()) - } -} - -func TestFindBlocksCommandWithLimiterTagDifferentThanTransfers(t *testing.T) { +func TestFindBlocksCommandTagDifferentThanTransfers(t *testing.T) { rangeSize := 20 maxRequests := 1 accountAddress := common.HexToAddress("0x1234") @@ -1229,11 +1181,6 @@ func TestFindBlocksCommandWithLimiterTagDifferentThanTransfers(t *testing.T) { incomingERC20Transfers := map[common.Address][]testERC20Transfer{accountAddress: {{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}}} fbc, tc, blockChannel, _ := setupFindBlocksCommand(t, accountAddress, big.NewInt(0), big.NewInt(20), rangeSize, balances, outgoingERC20Transfers, incomingERC20Transfers, nil, nil) - limiter := rpclimiter.NewRequestLimiter(rpclimiter.NewInMemRequestsMapStorage()) - err := limiter.SetLimit("some-other-tag-than-transfer-history", maxRequests, time.Hour) - require.NoError(t, err) - tc.SetLimiter(limiter) - ctx := context.Background() group := async.NewAtomicGroup(ctx) group.Add(fbc.Command(1 * time.Millisecond)) @@ -1248,58 +1195,6 @@ func TestFindBlocksCommandWithLimiterTagDifferentThanTransfers(t *testing.T) { } } -func TestFindBlocksCommandWithLimiterForMultipleAccountsSameGroup(t *testing.T) { - rangeSize := 20 - maxRequestsTotal := 5 - limit1 := 3 - limit2 := 3 - account1 := common.HexToAddress("0x1234") - account2 := common.HexToAddress("0x5678") - balances := map[common.Address][][]int{account1: {{5, 1, 0}, {20, 2, 0}, {45, 1, 1}, {46, 50, 0}, {75, 0, 1}}, account2: {{5, 1, 0}, {20, 2, 0}, {45, 1, 1}, {46, 50, 0}, {75, 0, 1}}} - outgoingERC20Transfers := map[common.Address][]testERC20Transfer{account1: {{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}}} - incomingERC20Transfers := map[common.Address][]testERC20Transfer{account2: {{big.NewInt(6), tokenTXXAddress, big.NewInt(1), walletcommon.Erc20TransferEventType}}} - - // Limiters share the same storage - storage := rpclimiter.NewInMemRequestsMapStorage() - - // Set up the first account - fbc, tc, blockChannel, _ := setupFindBlocksCommand(t, account1, big.NewInt(0), big.NewInt(20), rangeSize, balances, outgoingERC20Transfers, nil, nil, nil) - tc.tag = transferHistoryTag + account1.String() - tc.groupTag = transferHistoryTag - - limiter1 := rpclimiter.NewRequestLimiter(storage) - err := limiter1.SetLimit(transferHistoryTag, maxRequestsTotal, time.Hour) - require.NoError(t, err) - err = limiter1.SetLimit(transferHistoryTag+account1.String(), limit1, time.Hour) - require.NoError(t, err) - tc.SetLimiter(limiter1) - - // Set up the second account - fbc2, tc2, _, _ := setupFindBlocksCommand(t, account2, big.NewInt(0), big.NewInt(20), rangeSize, balances, nil, incomingERC20Transfers, nil, nil) - tc2.tag = transferHistoryTag + account2.String() - tc2.groupTag = transferHistoryTag - limiter2 := rpclimiter.NewRequestLimiter(storage) - err = limiter2.SetLimit(transferHistoryTag, maxRequestsTotal, time.Hour) - require.NoError(t, err) - err = limiter2.SetLimit(transferHistoryTag+account2.String(), limit2, time.Hour) - require.NoError(t, err) - tc2.SetLimiter(limiter2) - fbc2.blocksLoadedCh = blockChannel - - ctx := context.Background() - group := async.NewGroup(ctx) - group.Add(fbc.Command(1 * time.Millisecond)) - group.Add(fbc2.Command(1 * time.Millisecond)) - - select { - case <-ctx.Done(): - t.Log("ERROR") - case <-group.WaitAsync(): - close(blockChannel) - require.LessOrEqual(t, tc.getCounter(), maxRequestsTotal) - } -} - type MockETHClient struct { mock.Mock } diff --git a/services/wallet/transfer/controller.go b/services/wallet/transfer/controller.go index 4c1a56f83..23dbebe1f 100644 --- a/services/wallet/transfer/controller.go +++ b/services/wallet/transfer/controller.go @@ -15,7 +15,6 @@ import ( gocommon "github.com/status-im/status-go/common" statusaccounts "github.com/status-im/status-go/multiaccounts/accounts" "github.com/status-im/status-go/rpc" - "github.com/status-im/status-go/rpc/chain/rpclimiter" "github.com/status-im/status-go/services/accounts/accountsevent" "github.com/status-im/status-go/services/wallet/balance" "github.com/status-im/status-go/services/wallet/blockchainstate" @@ -263,12 +262,6 @@ func (c *Controller) cleanUpRemovedAccount(address common.Address) { if err != nil { log.Error("Failed to delete multitransactions", "error", err) } - - rpcLimitsStorage := rpclimiter.NewLimitsDBStorage(c.db.client) - err = rpcLimitsStorage.Delete(accountLimiterTag(address)) - if err != nil { - log.Error("Failed to delete limits", "error", err) - } } func (c *Controller) cleanupAccountsLeftovers() error { diff --git a/transactions/transactor_test.go b/transactions/transactor_test.go index b6348023d..a89104b0d 100644 --- a/transactions/transactor_test.go +++ b/transactions/transactor_test.go @@ -9,7 +9,6 @@ import ( "github.com/status-im/status-go/rpc/chain" "github.com/status-im/status-go/rpc/chain/ethclient" - "github.com/status-im/status-go/rpc/chain/rpclimiter" "github.com/stretchr/testify/suite" "go.uber.org/mock/gomock" @@ -75,7 +74,7 @@ func (s *TransactorSuite) SetupTest() { rpcClient.UpstreamChainID = chainID ethClients := []ethclient.RPSLimitedEthClientInterface{ - ethclient.NewRPSLimitedEthClient(s.client, rpclimiter.NewRPCRpsLimiter(), "local-1-chain-id-1"), + ethclient.NewRPSLimitedEthClient(s.client, "local-1-chain-id-1"), } localClient := chain.NewClient(ethClients, chainID, nil) rpcClient.SetClient(chainID, localClient)